### Bronze Layer Implementation

Read data in CSV format from DBFS

In [0]:
df = spark.read.format("csv").option("InferSchema",True).option("header",True).load("/Volumes/workspace/default/healthcare/HealthCareData.csv")
display(df)

PatientId,AppointmentID,Gender,ScheduledDay,AppointmentDay,Age,Neighbourhood,Scholarship,Hipertension,Diabetes,Alcoholism,Handcap,SMS_received,No-show
29872499824296.0,5642903,F,2016-04-29T18:38:08.000Z,2016-04-29T00:00:00.000Z,62,JARDIM DA PENHA,0,1,0,0,0,0,No
558997776694438.0,5642503,M,2016-04-29T16:08:27.000Z,2016-04-29T00:00:00.000Z,56,JARDIM DA PENHA,0,0,0,0,0,0,No
4262962299951.0,5642549,F,2016-04-29T16:19:04.000Z,2016-04-29T00:00:00.000Z,62,MATA DA PRAIA,0,0,0,0,0,0,No
867951213174.0,5642828,F,2016-04-29T17:29:31.000Z,2016-04-29T00:00:00.000Z,8,PONTAL DE CAMBURI,0,0,0,0,0,0,No
8841186448183.0,5642494,F,2016-04-29T16:07:23.000Z,2016-04-29T00:00:00.000Z,56,JARDIM DA PENHA,0,1,1,0,0,0,No
95985133231274.0,5626772,F,2016-04-27T08:36:51.000Z,2016-04-29T00:00:00.000Z,76,REPÚBLICA,0,1,0,0,0,0,No
733688164476661.0,5630279,F,2016-04-27T15:05:12.000Z,2016-04-29T00:00:00.000Z,23,GOIABEIRAS,0,0,0,0,0,0,Yes
3449833394123.0,5630575,F,2016-04-27T15:39:58.000Z,2016-04-29T00:00:00.000Z,39,GOIABEIRAS,0,0,0,0,0,0,Yes
56394729949972.0,5638447,F,2016-04-29T08:02:16.000Z,2016-04-29T00:00:00.000Z,21,ANDORINHAS,0,0,0,0,0,0,No
78124564369297.0,5629123,F,2016-04-27T12:48:25.000Z,2016-04-29T00:00:00.000Z,19,CONQUISTA,0,0,0,0,0,0,No


Create Bronze Layer (Raw Data Load)

In [0]:
df_raw = spark.read.csv("/Volumes/workspace/default/healthcare/HealthCareData.csv", header=True, inferSchema=True)
df_raw.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/healthcare/Bronze")

Read data from Bronze Layer

In [0]:
bronze_df = spark.read.format("delta").load("/Volumes/workspace/default/healthcare/Bronze")

### Silver Layer Implementation

Drop Nulls and Duplicates

In [0]:
cleaned_df = bronze_df.dropna(subset=["PatientId", "AppointmentID", "ScheduledDay", "AppointmentDay"]) \
                      .dropDuplicates(["AppointmentID"])

Trim and Normalize String Columns

In [0]:
from pyspark.sql.functions import lower, trim, col

normalized_df = cleaned_df.withColumn("Gender", lower(trim(col("Gender")))) \
                          .withColumn("Neighbourhood", lower(trim(col("Neighbourhood")))) \
                          .withColumn("NoShow", lower(trim(col("No-show"))))

Convert to Date Format

In [0]:
from pyspark.sql.functions import to_date

dated_df = normalized_df.withColumn("ScheduledDay", to_date("ScheduledDay")) \
                        .withColumn("AppointmentDay", to_date("AppointmentDay"))

Derive Days Between Scheduled and Appointment

In [0]:
from pyspark.sql.functions import datediff

derived_df = dated_df.withColumn("DaysBetween", datediff(col("AppointmentDay"), col("ScheduledDay")))

Handle Invalid Ages

In [0]:
from pyspark.sql.functions import when

valid_age_df = derived_df.withColumn("Age", when(col("Age") < 0, None).otherwise(col("Age")))

Convert Binary Indicators to Yes/No

In [0]:
from pyspark.sql.functions import lit

binary_df = valid_age_df.withColumn("ReceivedSMS", when(col("SMS_received") == 1, lit("yes")).otherwise(lit("no"))) \
                        .withColumn("HasScholarship", when(col("Scholarship") == 1, lit("yes")).otherwise(lit("no"))) \
                        .withColumn("HasHipertension", when(col("Hipertension") == 1, "yes").otherwise("no")) \
                        .withColumn("HasDiabetes", when(col("Diabetes") == 1, "yes").otherwise("no")) \
                        .withColumn("IsAlcoholic", when(col("Alcoholism") == 1, "yes").otherwise("no")) \
                        .withColumn("HasHandicap", when(col("Handcap") > 0, "yes").otherwise("no"))

Rename Columns for Clarity

In [0]:
renamed_df = binary_df.withColumnRenamed("NoShow", "No_Show") \
                      .withColumnRenamed("PatientId", "Patient_ID") \
                      .withColumnRenamed("AppointmentID", "Appointment_ID")

Final Column Selection and Save as Silver Table

In [0]:
silver_df = renamed_df.select(
    "Appointment_ID", "Patient_ID", "Age", "Gender", "Neighbourhood",
    "ScheduledDay", "AppointmentDay", "DaysBetween",
    "HasScholarship", "HasHipertension", "HasDiabetes",
    "IsAlcoholic", "HasHandicap", "ReceivedSMS", "No_Show"
)
silver_df.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/healthcare/Silver")

###**Gold Layer Implementation**

Read the Silver Table

In [0]:
silver_df = spark.read.format("delta").load("/Volumes/workspace/default/healthcare/Silver")
silver_df.createOrReplaceTempView("silver_appointments")

Total Appointments per day

In [0]:
total_appt_per_day = silver_df.groupBy("AppointmentDay") \
    .count() \
    .withColumnRenamed("count", "Total_Appointments")

No-Show Count per Day

In [0]:
from pyspark.sql.functions import col, count, when

no_show_per_day = silver_df.groupBy("AppointmentDay").agg(
    count(when(col("No_Show") == "yes", True)).alias("No_Show_Count"),
    count(when(col("No_Show") == "no", True)).alias("Show_Count"),
    count("*").alias("Total_Appointments")
)

No-Show Rate per Day (% KPI)

In [0]:
from pyspark.sql.functions import col, count, when, sum, round

kpi_df = silver_df.groupBy("AppointmentDay").agg(
    count("*").alias("Total_Appointments"),
    sum(when(col("No_Show") == "no", 1).otherwise(0)).alias("Show_Count"),
    round(
        (sum(when(col("No_Show") == "no", 1).otherwise(0)) / count("*")) * 100,
        2
    ).alias("Show_Rate_Percent")
)

Average Age of Patients by Gender

In [0]:
from pyspark.sql.functions import avg

avg_age_gender = silver_df.groupBy("Gender") \
    .agg(round(avg("Age"), 1).alias("Avg_Age"))

No-Show Rate by Gender

In [0]:
from pyspark.sql.functions import col, count, when, round, sum

gender_kpi_df = silver_df.groupBy("Gender").agg(
    count("*").alias("Total_Appointments"),
    sum(when(col("No_Show") == "no", 1).otherwise(0)).alias("Show_Count"),
    round((sum(when(col("No_Show") == "no", 1).otherwise(0)) / count("*")) * 100, 2).alias("Show_Rate_Percent")
)

No-Show Rate by Age Group

In [0]:
from pyspark.sql.functions import floor

# Add age group column
silver_df = silver_df.withColumn("Age_Group", 
    when(col("Age") < 18, "Child")
    .when((col("Age") >= 18) & (col("Age") < 35), "Youth")
    .when((col("Age") >= 35) & (col("Age") < 60), "Adult")
    .otherwise("Senior")
)

age_kpi_df = silver_df.groupBy("Age_Group").agg(
    count("*").alias("Total_Appointments"),
    sum(when(col("No_Show") == "no", 1).otherwise(0)).alias("Show_Count"),
    round((sum(when(col("No_Show") == "no", 1).otherwise(0)) / count("*")) * 100, 2).alias("Show_Rate_Percent")
)


No-Show Trend Over Time

In [0]:
from pyspark.sql.functions import date_format

monthly_df = silver_df.withColumn("Month", date_format("AppointmentDay", "yyyy-MM"))

monthly_kpi_df = monthly_df.groupBy("Month").agg(
    count("*").alias("Total_Appointments"),
    sum(when(col("No_Show") == "no", 1).otherwise(0)).alias("Show_Count"),
    round((sum(when(col("No_Show") == "no", 1).otherwise(0)) / count("*")) * 100, 2).alias("Show_Rate_Percent")
)
monthly_kpi_df.orderBy("Month")

DataFrame[Month: string, Total_Appointments: bigint, Show_Count: bigint, Show_Rate_Percent: double]

Age Group Wise Disease Prevalence Analysis

In [0]:
#Convert "yes"/"no" Columns to Flags
silver_df = silver_df.withColumn("HasHipertensionFlag", when(col("HasHipertension") == "yes", 1).otherwise(0)) \
                 .withColumn("HasDiabetesFlag", when(col("HasDiabetes") == "yes", 1).otherwise(0)) \
                 .withColumn("IsAlcoholicFlag", when(col("IsAlcoholic") == "yes", 1).otherwise(0)) \
                 .withColumn("HasHandcapFlag", when(col("HasHandicap") == "yes", 1).otherwise(0))

#Aggregate by AgeGroup
age_disease_dist = silver_df.groupBy("Age_Group").agg(
    count("*").alias("Total_Patients"),
    sum("HasHipertensionFlag").alias("Total_Hipertension"),
    sum("HasDiabetesFlag").alias("Total_Diabetes"),
    sum("IsAlcoholicFlag").alias("Total_Alcoholic"),
    sum("HasHandcapFlag").alias("Total_Handicap")
)

#Add Percentage Columns
age_disease_dist = age_disease_dist.withColumn("Hipertension_Rate", round(col("Total_Hipertension") / col("Total_Patients") * 100, 2)) \
                                   .withColumn("Diabetes_Rate", round(col("Total_Diabetes") / col("Total_Patients") * 100, 2)) \
                                   .withColumn("Alcoholic_Rate", round(col("Total_Alcoholic") / col("Total_Patients") * 100, 2)) \
                                   .withColumn("Handicap_Rate", round(col("Total_Handicap") / col("Total_Patients") * 100, 2))

Write to Gold Layer Delta files (in parquet)

In [0]:
# Total appointments and no-show rate per day
kpi_df.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/healthcare/Gold/total_appt_per_day")

# No-show rate % per gender
gender_kpi_df.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/healthcare/Gold/show_rate_percent_gender")

# No-show rate per age group
age_kpi_df.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/healthcare/Gold/show_rate_age_group")

# No-show trend over time (monthly)
monthly_kpi_df.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/healthcare/Gold/show_trend_over_time")

# Age group–wise disease distribution
age_disease_dist.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/healthcare/Gold/age_disease_dist")

Write the data to Delta Live tables in Gold Layer

In [0]:
kpi_df.write.format("delta").mode("overwrite").saveAsTable("gold_total_appointments_per_day")

gender_kpi_df.write.format("delta").mode("overwrite").saveAsTable("gold_no_show_rate_percent_gender")

age_kpi_df.write.format("delta").mode("overwrite").saveAsTable("gold_no_show_rate_age_group")

monthly_kpi_df.write.format("delta").mode("overwrite").saveAsTable("gold_no_show_trend_over_time")

age_disease_dist.write.format("delta").mode("overwrite").saveAsTable("gold_age_disease_distribution")