## Overview

Data of a veterinarian clinic, Their data is dispersed across multiple csv files and they need you to first upload all of them to database and then perform the following analytics:

1. Extract information on pets names and owner names side-by-side.
2. find out wich pets from this clinic had procedures performed.
3. Match up all procedures performed to their descriptions.
4. Same as above but only or pets from the clinic in question.
5. Extract a table of individual costs (procedure prices) incurred by owners of pets from the clinic in question (this table should have owner and procedure price side-by-side)

In [2]:
from pyspark.sql.functions import count, desc , col, max, struct, substring, to_date, asc, sum

In [3]:
# File location and type
file_owners_location = "/FileStore/tables/P9_Owners.csv"
file_pets_location = "/FileStore/tables/P9_Pets.csv"
file_proc_details_location = "/FileStore/tables/P9_ProceduresDetails.csv"
file_proc_history_location = "/FileStore/tables/P9_ProceduresHistory.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
owners_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_owners_location)

pets_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_pets_location)

proc_det_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_proc_details_location)

proc_his_owners_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_proc_history_location)

In [4]:
display(owners_df)

In [5]:
display(pets_df)

In [6]:
display(proc_det_df)

In [7]:
display(proc_his_owners_df)

In [8]:
#Extract information on pets names and owner names side-by-side.

q1 = pets_df.join(owners_df, how='inner', on=['OwnerID']).select(pets_df["Name"], owners_df["Name"])
display(q1)

In [9]:
# find out wich pets from this clinic had procedures performed.
q2 = pets_df.join(proc_his_owners_df, how='inner', on=['PetID']).select(pets_df["Name"], proc_his_owners_df["ProcedureType"])
display(q2)


In [10]:
# Match up all procedures performed to their descriptions.
q3 = proc_his_owners_df.join(proc_det_df, how='inner', on=['ProcedureSubCode']).select(proc_his_owners_df["ProcedureType"], proc_det_df["Description"])
display(q3)


In [11]:
# Extract a table of individual costs (procedure prices) incurred by owners of pets from the clinic in question (this table should have owner and procedure price side-by-side)

q4_0 = pets_df.join(owners_df, how='inner', on=['OwnerID']).select(owners_df["Name"],pets_df["PetID"])

q4_1 = q4_0.join(proc_his_owners_df, how='inner', on=['PetID']).select(q4_0["Name"], proc_his_owners_df["ProcedureType"],proc_his_owners_df["ProcedureSubCode"])

q4_2 = q4_1.join(proc_det_df, [q4_1.ProcedureType == proc_det_df.ProcedureType, q4_1.ProcedureSubCode == proc_det_df.ProcedureSubCode], how='inner').select(q4_1["Name"], q4_1["ProcedureType"],proc_det_df["Price"])

display(q4_2)