Load Available Silver Tables

In [0]:
from pyspark.sql import functions as F

patients = spark.table("healthcare.fhir_healthcare_analytics_silver.patient")
encounters = spark.table("healthcare.fhir_healthcare_analytics_silver.encounter")
conditions = spark.table("healthcare.fhir_healthcare_analytics_silver.condition")
medications = spark.table("healthcare.fhir_healthcare_analytics_silver.medication_request")
observations = spark.table("healthcare.fhir_healthcare_analytics_silver.observation")
procedures = spark.table("healthcare.fhir_healthcare_analytics_silver.procedure")
eob = spark.table("healthcare.fhir_healthcare_analytics_silver.explanation_of_benefit")


 Feature Engineering (Patient-Level)

Encounter Utilization

In [0]:
encounter_features = (
    encounters
    .groupBy("patient_id")
    .agg(
        F.count("*").alias("encounter_count"),
        F.countDistinct("encounter_id").alias("unique_encounters")
    )
)

Condition-Based Risk

In [0]:
active_conditions = (
    conditions
    .filter(F.col("clinical_status") == "active")
    .groupBy("patient_id")
    .agg(
        F.count("*").alias("active_condition_count")
    )
)

# Social / behavioral risk codes (mapped to diagnosis column)
risk_diagnosis_codes = [
    "423315002",          # Limited social contact
    "10939881000119105"   # Alcohol risk
]

social_risk = (
    conditions
    .filter(F.col("diagnosis").isin(risk_diagnosis_codes))
    .select("patient_id")
    .distinct()
    .withColumn("social_risk_flag", F.lit(1))
)

Medication Burden

In [0]:
medication_features = (
    medications
    .groupBy("patient_id")
    .agg(
        F.countDistinct("medication_name").alias("medication_count")
    )
)


Procedure Intensity

In [0]:
procedure_features = (
    procedures
    .groupBy("patient_id")
    .agg(
        F.count("*").alias("procedure_count")
    )
)



Cost from ExplanationOfBenefit (EOB)

In [0]:
# Cost from ExplanationOfBenefit (EOB)
cost_features = (
    eob
    .groupBy("patient_id")
    .agg(
        F.sum("total_amount").alias("total_claim_amount"),
        F.avg("total_amount").alias("avg_claim_amount")
    )
)

Assemble Feature Dataset

In [0]:
features = (
    patients
    .select("patient_id", "gender", "birth_date")
    .join(encounter_features, "patient_id", "left")
    .join(active_conditions, "patient_id", "left")
    .join(social_risk, "patient_id", "left")
    .join(medication_features, "patient_id", "left")
    .join(procedure_features, "patient_id", "left")
    .join(cost_features, "patient_id", "left")
    .fillna(0)
)


Validation Rules (Mandatory in Healthcare)

In [0]:
validated = (
    features
    .filter(F.col("encounter_count") >= 0)
    .filter(F.col("active_condition_count") >= 0)
    .filter(F.col("medication_count") >= 0)
    .filter(F.col("procedure_count") >= 0)
    .filter(F.col("total_claim_amount") >= 0)
)


Feature Scoring

In [0]:
scored = (
    validated
    .withColumn(
        "utilization_score",
        F.when(F.col("encounter_count") >= 10, 100)
         .when(F.col("encounter_count") >= 5, 60)
         .otherwise(20)
    )
    .withColumn(
        "chronic_score",
        F.when(F.col("active_condition_count") >= 3, 100)
         .when(F.col("active_condition_count") >= 1, 60)
         .otherwise(10)
    )
    .withColumn(
        "social_score",
        F.when(F.col("social_risk_flag") == 1, 80).otherwise(0)
    )
    .withColumn(
        "medication_score",
        F.when(F.col("medication_count") >= 5, 70).otherwise(20)
    )
    .withColumn(
        "procedure_score",
        F.when(F.col("procedure_count") >= 3, 70).otherwise(20)
    )
    .withColumn(
        "cost_score",
        F.when(F.col("total_claim_amount") >= 5000, 100)
         .when(F.col("total_claim_amount") >= 2000, 60)
         .otherwise(20)
    )
)


Risk Logic Build (Model)

In [0]:
risk_model = (
    scored
    .withColumn(
        "risk_score",
        (
            0.25 * F.col("utilization_score") +
            0.25 * F.col("chronic_score") +
            0.20 * F.col("social_score") +
            0.15 * F.col("medication_score") +
            0.10 * F.col("procedure_score") +
            0.05 * F.col("cost_score")
        )
    )
    .withColumn(
        "risk_category",
        F.when(F.col("risk_score") >= 70, "High")
         .when(F.col("risk_score") >= 40, "Medium")
         .otherwise("Low")
    )
)

Gold Output (Model Result)

In [0]:
patient_risk_gold = (
    risk_model
    .select(
        "patient_id",
        "risk_score",
        "risk_category",
        "utilization_score",
        "chronic_score",
        "social_score",
        "medication_score",
        "procedure_score",
        "cost_score",
        F.current_date().alias("run_date")
    )
)

# -------------------------
# Safe Write to Gold Table
# -------------------------
# Ensure gold database exists
spark.sql("CREATE DATABASE IF NOT EXISTS healthcare.gold")

# Safe write with overwriteSchema
patient_risk_gold.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("healthcare.gold.patient_risk_gold")

# -------------------------
# Quick Verification
# -------------------------
display(spark.table("healthcare.gold.patient_risk_gold").limit(20))


In [0]:
%sql
describe healthcare.gold.patient_risk_gold