In [0]:
from pyspark.sql.types import (
    StructType, StructField,
    StringType, IntegerType, DoubleType, TimestampType
)
from pyspark.sql.functions import (
    col, from_json, to_timestamp, when,
    current_timestamp, floor, rand, expr
)

# ADLS Gen2 configuration

spark.conf.set(
    "fs.azure.account.key.healthcarestoragerk.dfs.core.windows.net",
    "xxxxx"
)

# Bronze (source) and Silver (target) paths
bronze_path = "abfss://bronze@healthcarestoragerk.dfs.core.windows.net/encounters_raw"
silver_path = "abfss://silver@healthcarestoragerk.dfs.core.windows.net/encounters_clean"
silver_checkpoint_path = "abfss://silver@healthcarestoragerk.dfs.core.windows.net/_checkpoints/encounters_clean"


# Read streaming data from Bronze Delta

bronze_df = (
    spark.readStream
         .format("delta")
         .load(bronze_path)
)


# Define schema matching our event JSON

encounter_schema = StructType([
    StructField("encounter_id",        StringType(), True),
    StructField("patient_id",          StringType(), True),
    StructField("gender",              StringType(), True),
    StructField("age",                 IntegerType(), True),
    StructField("department",          StringType(), True),
    StructField("admission_time",      StringType(), True),   # will cast to timestamp
    StructField("discharge_time",      StringType(), True),   # will cast to timestamp
    StructField("organization_id",     StringType(), True),
    StructField("provider_id",         StringType(), True),
    StructField("payer_id",            StringType(), True),
    StructField("base_encounter_cost", DoubleType(), True),
    StructField("total_claim_cost",    DoubleType(), True),
    StructField("payer_coverage",      DoubleType(), True)
])


# Parse JSON and project columns

parsed_df = (
    bronze_df
    .withColumn("data", from_json(col("raw_json"), encounter_schema))
    .select("data.*")
)


# Cast admission/discharge to timestamp

clean_df = (
    parsed_df
    .withColumn("admission_time", to_timestamp("admission_time"))
    .withColumn("discharge_time", to_timestamp("discharge_time"))
)


# Handle invalid / future admission_time
# - if null or in the future -> current_timestamp()

clean_df = clean_df.withColumn(
    "admission_time",
    when(
        col("admission_time").isNull() | (col("admission_time") > current_timestamp()),
        current_timestamp()
    ).otherwise(col("admission_time"))
)


# Handle invalid discharge_time
# - if null or before admission_time -> admission_time + 15 minutes

clean_df = clean_df.withColumn(
    "discharge_time",
    when(
        col("discharge_time").isNull() | (col("discharge_time") < col("admission_time")),
        col("admission_time") + expr("INTERVAL 15 MINUTES")
    ).otherwise(col("discharge_time"))
)


# Handle invalid age
# - if age < 0 or > 110 -> random age between 1 and 90
# (this fixes weird 150+ ages from bad birthdates)

clean_df = clean_df.withColumn(
    "age",
    when((col("age") < 0) | (col("age") > 110),
         floor(rand() * 90 + 1).cast("int"))
    .otherwise(col("age"))
)


# Derive encounter duration (minutes)

clean_df = clean_df.withColumn(
    "encounter_duration_minutes",
    (
        col("discharge_time").cast("long") - col("admission_time").cast("long")
    ) / 60.0
)


# Schema evolution safety:
# ensure all expected columns exist

expected_cols = [
    "encounter_id",
    "patient_id",
    "gender",
    "age",
    "department",
    "admission_time",
    "discharge_time",
    "organization_id",
    "provider_id",
    "payer_id",
    "base_encounter_cost",
    "total_claim_cost",
    "payer_coverage",
    "encounter_duration_minutes"
]

from pyspark.sql.functions import lit

for col_name in expected_cols:
    if col_name not in clean_df.columns:
        clean_df = clean_df.withColumn(col_name, lit(None))

# Re-order columns for consistency
clean_df = clean_df.select(*expected_cols)


# Write to Silver Delta (streaming)

silver_query = (
    clean_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("mergeSchema", "true")
    .option("checkpointLocation", silver_checkpoint_path)
    .start(silver_path)
)

print("Silver streaming query started. ID:", silver_query.id)

Silver streaming query started. ID: b3397b4e-62b9-4b33-b5f7-897f0d747ea1
