In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, lower, when, split, regexp_replace

spark = SparkSession.builder.appName("Healthcare ETL - Silver").getOrCreate()

# Load raw
df_raw = spark.read.csv("/content/patients_raw.csv", header=True, inferSchema=True)


# Basic cleanup: trim strings
df = df_raw
for c, t in df.dtypes:
    if t == "string":
        df = df.withColumn(c, trim(col(c)))

# --- Fix blood_pressure: "130/72" -> systolic=130, diastolic=72
df = df.withColumn("bp_clean", regexp_replace(col("blood_pressure"), " ", ""))
df = df.withColumn("systolic_bp", split(col("bp_clean"), "/").getItem(0).cast("int")) \
       .withColumn("diastolic_bp", split(col("bp_clean"), "/").getItem(1).cast("int")) \
       .drop("bp_clean")

# --- Convert Yes/No flags to 1/0
yn_cols = ["diabetes", "hypertension", "readmitted_30_days"]
for c in yn_cols:
    df = df.withColumn(
        c,
        when(lower(col(c)) == "yes", 1)
        .when(lower(col(c)) == "no", 0)
        .otherwise(None)
    )

# --- Cast numeric columns cleanly (even if inferSchema did it, make it explicit)
df = df.withColumn("age", col("age").cast("int")) \
       .withColumn("cholesterol", col("cholesterol").cast("int")) \
       .withColumn("bmi", col("bmi").cast("double")) \
       .withColumn("medication_count", col("medication_count").cast("int")) \
       .withColumn("length_of_stay", col("length_of_stay").cast("int"))

# --- Drop rows with missing critical fields (simple quality rule)
critical = ["patient_id", "age", "gender", "length_of_stay", "discharge_destination"]
df_silver = df.dropna(subset=critical)

print("Raw count:", df_raw.count())
print("Silver count:", df_silver.count())
df_silver.select("patient_id","age","gender","systolic_bp","diastolic_bp","bmi","diabetes","hypertension","length_of_stay","discharge_destination","readmitted_30_days").show(5)


Raw count: 30000
Silver count: 30000
+----------+---+------+-----------+------------+----+--------+------------+--------------+---------------------+------------------+
|patient_id|age|gender|systolic_bp|diastolic_bp| bmi|diabetes|hypertension|length_of_stay|discharge_destination|readmitted_30_days|
+----------+---+------+-----------+------------+----+--------+------------+--------------+---------------------+------------------+
|         1| 74| Other|        130|          72|31.5|       1|           0|             1|     Nursing_Facility|                 1|
|         2| 46|Female|        120|          92|36.3|       0|           0|             3|     Nursing_Facility|                 0|
|         3| 89| Other|        135|          78|30.3|       0|           1|             1|                 Home|                 0|
|         4| 84|Female|        123|          80|31.5|       0|           1|            10|                 Home|                 0|
|         5| 32| Other|        135|    

I designed a Silver layer that standardizes healthcare data, enforces data types, and prepares analytics-ready datasets using PySpark

In [5]:
output_path = "/content/processed/silver"

(df_silver
 .write
 .mode("overwrite")
 .partitionBy("discharge_destination")
 .parquet(output_path))

print("Silver layer saved at:", output_path)


Silver layer saved at: /content/processed/silver


In [7]:
# =======================
# GOLD LAYER â€“ ANALYTICS
# =======================

from pyspark.sql.functions import when, avg, count

# Use the same df_silver already created above
df_gold_base = df_silver

# ---- Create age groups
df_gold = df_gold_base.withColumn(
    "age_group",
    when(df_gold_base.age < 30, "<30")
    .when((df_gold_base.age >= 30) & (df_gold_base.age <= 50), "30-50")
    .when((df_gold_base.age >= 51) & (df_gold_base.age <= 70), "51-70")
    .otherwise("70+")
)

# ---- Readmission rate by discharge destination
gold_readmission_by_destination = (
    df_gold.groupBy("discharge_destination")
           .agg(
               count("*").alias("total_patients"),
               avg("readmitted_30_days").alias("readmission_rate")
           )
)

gold_readmission_by_destination.show()

# ---- Readmission rate by age group
gold_readmission_by_age = (
    df_gold.groupBy("age_group")
           .agg(
               count("*").alias("total_patients"),
               avg("readmitted_30_days").alias("readmission_rate")
           )
)

gold_readmission_by_age.show()


+---------------------+--------------+-------------------+
|discharge_destination|total_patients|   readmission_rate|
+---------------------+--------------+-------------------+
|                 Home|         20877|0.10044546630262968|
|     Nursing_Facility|          3027|0.16848364717542122|
|                Rehab|          6096|0.17503280839895013|
+---------------------+--------------+-------------------+

+---------+--------------+-------------------+
|age_group|total_patients|   readmission_rate|
+---------+--------------+-------------------+
|    30-50|          8588|0.12086632510479739|
|      70+|          8144| 0.1237721021611002|
|    51-70|          8262|0.12297264584846285|
|      <30|          5006| 0.1222532960447463|
+---------+--------------+-------------------+



In [9]:
gold_path = "/content/processed/gold"

gold_readmission_by_destination.write.mode("overwrite").parquet(
    f"{gold_path}/readmission_by_destination"
)

gold_readmission_by_age.write.mode("overwrite").parquet(
    f"{gold_path}/readmission_by_age_group"
)

print("Gold layer saved at:", gold_path)


Gold layer saved at: /content/processed/gold


In [10]:
gold_path = "/content/processed/gold"

gold_readmission_by_destination.write.mode("overwrite").parquet(
    f"{gold_path}/readmission_by_destination"
)

gold_readmission_by_age.write.mode("overwrite").parquet(
    f"{gold_path}/readmission_by_age_group"
)

print("Gold layer saved at:", gold_path)


Gold layer saved at: /content/processed/gold


In [12]:
!ls /content/processed/gold


readmission_by_age_group  readmission_by_destination


In [11]:
!ls /content/processed/silver


'discharge_destination=Home'		  'discharge_destination=Rehab'
'discharge_destination=Nursing_Facility'   _SUCCESS
