In [164]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Build initial spark session

In [165]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("HospitalData")
    .config("spark.sql.adaptive.enabled", "false")
    .config("spark.sql.autoBroadcastJoinThreshold", -1)
    .config("spark.dynamicAllocation.enabled", "true")
    .config("spark.shuffle.service.enabled", "true")
    .config("spark.dynamicAllocation.minExecutors", 2)
    .config("spark.dynamicAllocation.initialExecutors", 4)
    .config("spark.dynamicAllocation.maxExecutors", 10)
    .getOrCreate()
)


In [166]:
spark

# Read data

In [167]:
patients = spark.read.format("csv") \
    .option("header" , "true") \
    .option("inferSchema" , "true") \
    .load("/home/jovyan/csv_datas/patients.csv")

In [168]:
billings = spark.read.format("csv") \
    .option("header" , "true") \
    .option("inferSchema" , "true") \
    .load("/home/jovyan/csv_datas/billing.csv")

In [169]:
appointments = spark.read.format("csv") \
    .option("header" , "true") \
    .option("inferSchema" , "true") \
    .load("/home/jovyan/csv_datas/appointments.csv")

In [170]:
doctors = spark.read.format("csv") \
    .option("header" , "true") \
    .option("inferSchema" , "true") \
    .load("/home/jovyan/csv_datas/doctors.csv")

In [171]:
treatments = spark.read.format("csv") \
    .option("header" , "true") \
    .option("inferSchema" , "true") \
    .load("/home/jovyan/csv_datas/treatmentss.csv")

# Partioning files

In [172]:
patients = patients.repartition(10)
billings = billings.repartition(10)
treatments = treatments.repartition(10)
appointments = appointments.repartition(10)

In [173]:
patients.rdd.getNumPartitions()

10

In [174]:
appointments.rdd.getNumPartitions()

10

In [175]:
treatments.rdd.getNumPartitions()

10

In [176]:
billings.rdd.getNumPartitions()

10

# Save them as parquet and also by partitioning

In [177]:
patients.write.format('parquet') \
    .mode("overwrite") \
    .option("path" , "/home/jovyan/partitioned-data/patients") \
    .partitionBy("insurance_provider") \
    .save()

In [178]:
billings.write.format('parquet') \
    .mode("overwrite") \
    .option("path" , "/home/jovyan/partitioned-data/billings") \
    .partitionBy("payment_status") \
    .save()

In [179]:
treatments.write.format('parquet') \
    .mode("overwrite") \
    .option("path" , "/home/jovyan/partitioned-data/treatments") \
    .partitionBy("treatment_type") \
    .save()

In [180]:
appointments.write.format('parquet') \
    .mode("overwrite") \
    .option("path" , "/home/jovyan/partitioned-data/appointments") \
    .partitionBy("reason_for_visit") \
    .save()

# Read partitioned data

In [233]:
patients = spark.read.format("parquet") \
    .load("/home/jovyan/partitioned-data/patients") \
    .cache()

In [235]:
appointments = spark.read.format("parquet") \
    .load("/home/jovyan/partitioned-data/appointments") \
    .cache()

In [183]:
treatments = spark.read.format("parquet") \
    .load("/home/jovyan/partitioned-data/treatments")

In [184]:
billings = spark.read.format("parquet") \
    .load("/home/jovyan/partitioned-data/billings")

# Join doctors to appointments by broadcast (Cause doctors is too small)

In [185]:
appointments_doctros_join = appointments.join(broadcast(doctors) , appointments["doctor_id"]==doctors["doctor_id"] , 'left')

# dedupe patients

In [186]:
patients = patients.groupBy("patient_id") \
    .agg(count("patient_id").alias("pateint_number")) \
    .filter("pateint_number == 1")

# Scanning optimization

In [202]:
appointments_join_treatments = appointments.filter(col("reason_for_visit")=="Therapy") \
    .join(treatments , appointments["appointment_id"]==treatments["appointment_id"] , 'inner')
# appointments_join_treatments.show()

# Dynamic partition pruning

In [205]:
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled" , "true")     
#Now, If we apply filter on column2 , It worked on df1 too.

# find the most visited patient

In [230]:
appointments.groupBy("patient_id").agg(count(col("patient_id")).alias("most_visited")).sort(col("most_visited").desc()).show(1)

+----------+------------+
|patient_id|most_visited|
+----------+------------+
|      P012|          10|
+----------+------------+
only showing top 1 row

