In [0]:
# ============================================
# CELL 1: Setup & Key Vault Secrets
# ============================================
storage_account_name = "adls31tejashree"
container_name       = "healthcare-project"

# Fetch secrets securely from Key Vault
storage_account_key = dbutils.secrets.get(
    scope="healthcare-kv-scope",
    key="adls-account-key"
)

spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net",
    storage_account_key
)

base_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net"
watermark_path = f"{base_path}/silver/watermark/"

print(f"✅ Secrets loaded from Key Vault")
print(f"✅ Base path: {base_path}")

✅ Secrets loaded from Key Vault
✅ Base path: abfss://healthcare-project@adls31tejashree.dfs.core.windows.net


In [0]:
# ============================================
# RESET WATERMARK (Run once to reset)
# ============================================
try:
    dbutils.fs.rm(watermark_path, recurse=True)
    print("✅ Old watermark deleted!")
except:
    print("⚠️ No watermark to delete")

✅ Old watermark deleted!


In [0]:
# ============================================
# CELL 2: Initialize Watermark Table
# (Only runs once on first execution)
# ============================================
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.sql.functions import lit
from datetime import datetime

def initialize_watermark():
    try:
        df_watermark = spark.read.parquet(watermark_path)
        last_watermark = df_watermark.collect()[0]["last_processed_date"]
        print(f"✅ Existing watermark found: {last_watermark}")
        return last_watermark
    except:
        # First run — set watermark to very old date to process all records
        print("⚠️ No watermark found — initializing for first run")
        schema = StructType([
            StructField("table_name",          StringType(),    True),
            StructField("last_processed_date", TimestampType(), True)
        ])
        df_init = spark.createDataFrame(
            [("hospital_records", datetime(2024, 1, 1))],
            schema
        )
        df_init.write.mode("overwrite").parquet(watermark_path)
        return datetime(2024, 1, 1)

last_watermark = initialize_watermark()
print(f"📅 Processing records after: {last_watermark}")

✅ Existing watermark found: 2024-01-01 00:00:00
📅 Processing records after: 2024-01-01 00:00:00


In [0]:
# ============================================
# CELL 3: Read Bronze & Filter New Records Only
# ============================================
from pyspark.sql.functions import col, to_timestamp, lit

bronze_path = f"{base_path}/bronze/hospital_records/healthcare_dataset.csv"

df_bronze = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(bronze_path)

# Parse admission date AND drop original to avoid ambiguity
df_bronze = df_bronze \
    .withColumn(
        "Date_of_Admission",
        to_timestamp(col("Date of Admission"), "yyyy-MM-dd")
    ) \
    .drop("Date of Admission")  # ← drop original column with spaces

# Filter only NEW records since last watermark
df_incremental = df_bronze.filter(
    col("Date_of_Admission") > lit(last_watermark)
)

total_records = df_bronze.count()
new_records   = df_incremental.count()

print(f"📊 Total records in Bronze:  {total_records}")
print(f"✅ New records to process:   {new_records}")
print(f"⏭️  Skipped (already processed): {total_records - new_records}")

📊 Total records in Bronze:  55500
✅ New records to process:   3823
⏭️  Skipped (already processed): 51677


In [0]:
# ============================================
# CELL 4: Transform New Records
# ============================================
from pyspark.sql.functions import (
    datediff, round, upper, trim, when,
    to_date, year, col
)

if new_records == 0:
    print("⚠️ No new records to process — pipeline complete!")
    dbutils.notebook.exit("No new records")

# Clean column names (spaces → underscores)
df_clean = df_incremental \
    .withColumnRenamed("Date of Admission",  "Date_of_Admission") \
    .withColumnRenamed("Blood Type",         "Blood_Type") \
    .withColumnRenamed("Medical Condition",  "Medical_Condition") \
    .withColumnRenamed("Insurance Provider", "Insurance_Provider") \
    .withColumnRenamed("Billing Amount",     "Billing_Amount") \
    .withColumnRenamed("Room Number",        "Room_Number") \
    .withColumnRenamed("Admission Type",     "Admission_Type") \
    .withColumnRenamed("Discharge Date",     "Discharge_Date") \
    .withColumnRenamed("Test Results",       "Test_Results")

# Apply transformations
df_clean = df_clean \
    .withColumn("Date_of_Admission", to_date(col("Date_of_Admission"))) \
    .withColumn("Discharge_Date",    to_date(col("Discharge_Date"), "yyyy-MM-dd")) \
    .withColumn("Length_of_Stay",    datediff(col("Discharge_Date"), col("Date_of_Admission"))) \
    .withColumn("Billing_Amount",    round(col("Billing_Amount"), 2)) \
    .withColumn("Gender",            upper(trim(col("Gender")))) \
    .withColumn("Medical_Condition", upper(trim(col("Medical_Condition")))) \
    .withColumn("Admission_Type",    upper(trim(col("Admission_Type")))) \
    .withColumn("Test_Results",      upper(trim(col("Test_Results")))) \
    .dropDuplicates() \
    .filter(col("Billing_Amount") > 0) \
    .filter(col("Age").isNotNull())

print(f"✅ Transformed {df_clean.count()} new records")

✅ Transformed 3788 new records


In [0]:
# ============================================
# CELL 5: Write Incremental Data to Silver
# (Append mode — preserves existing data!)
# ============================================
from datetime import datetime

process_date = datetime.now().strftime("%Y%m%d")
silver_output_path = f"{base_path}/silver/hospital_records/processed_{process_date}/"

df_clean.write \
    .mode("append") \
    .parquet(silver_output_path)

print(f"✅ {df_clean.count()} new records written to Silver")
print(f"📁 Path: {silver_output_path}")

✅ 3788 new records written to Silver
📁 Path: abfss://healthcare-project@adls31tejashree.dfs.core.windows.net/silver/hospital_records/processed_20260215/


In [0]:
# ============================================
# CELL 6: Update Gold Tables with New Data
# ============================================

# Gold 1: Readmission Rates
from pyspark.sql.functions import count, avg, col, round, when, year
df_readmission = df_clean \
    .groupBy("Medical_Condition", "Admission_Type") \
    .agg(
        count("*").alias("Total_Admissions"),
        avg("Length_of_Stay").alias("Avg_Length_of_Stay"),
        avg("Billing_Amount").alias("Avg_Billing_Amount")
    ) \
    .withColumn("Avg_Length_of_Stay", round(col("Avg_Length_of_Stay"), 2)) \
    .withColumn("Avg_Billing_Amount", round(col("Avg_Billing_Amount"), 2))

df_readmission.write.mode("append").parquet(
    f"{base_path}/gold/hospital_records/readmission_rates/"
)
print("✅ Gold: readmission_rates updated!")

# Gold 2: Diagnosis Trends
df_diagnosis = df_clean \
    .withColumn("Age_Group",
        when(col("Age") < 18,  "0-17")
        .when(col("Age") < 35, "18-34")
        .when(col("Age") < 50, "35-49")
        .when(col("Age") < 65, "50-64")
        .otherwise("65+")
    ) \
    .withColumn("Admission_Year", year(col("Date_of_Admission"))) \
    .groupBy("Medical_Condition", "Age_Group", "Admission_Year") \
    .agg(
        count("*").alias("Case_Count"),
        avg("Billing_Amount").alias("Avg_Cost")
    ) \
    .withColumn("Avg_Cost", round(col("Avg_Cost"), 2))

df_diagnosis.write.mode("append").parquet(
    f"{base_path}/gold/hospital_records/diagnosis_trends/"
)
print("✅ Gold: diagnosis_trends updated!")

# Gold 3: Avg Treatment Costs
df_costs = df_clean \
    .groupBy("Medical_Condition", "Medication", "Insurance_Provider") \
    .agg(
        count("*").alias("Patient_Count"),
        avg("Billing_Amount").alias("Avg_Treatment_Cost"),
        avg("Length_of_Stay").alias("Avg_Stay_Days")
    ) \
    .withColumn("Avg_Treatment_Cost", round(col("Avg_Treatment_Cost"), 2)) \
    .withColumn("Avg_Stay_Days",      round(col("Avg_Stay_Days"), 2))

df_costs.write.mode("append").parquet(
    f"{base_path}/gold/hospital_records/avg_treatment_costs/"
)
print("✅ Gold: avg_treatment_costs updated!")

✅ Gold: readmission_rates updated!
✅ Gold: diagnosis_trends updated!
✅ Gold: avg_treatment_costs updated!


In [0]:
# ============================================
# CELL 7: Update Watermark for Next Run
# ============================================
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from datetime import datetime

# Get max admission date from newly processed records
max_date = df_clean.agg({"Date_of_Admission": "max"}).collect()[0][0]
new_watermark = datetime.now()

schema = StructType([
    StructField("table_name",          StringType(),    True),
    StructField("last_processed_date", TimestampType(), True)
])

df_new_watermark = spark.createDataFrame(
    [("hospital_records", new_watermark)],
    schema
)

df_new_watermark.write.mode("overwrite").parquet(watermark_path)

print(f"✅ Watermark updated to: {new_watermark}")
print(f"📅 Max admission date processed: {max_date}")
print(f"🎉 Incremental load complete!")

✅ Watermark updated to: 2026-02-15 08:18:31.084453
📅 Max admission date processed: 2024-05-07
🎉 Incremental load complete!
