In [0]:
# 1. Imports
from pyspark.sql.functions import col, when, count, avg, trim

In [0]:
# 2. Load Silver Data
# Using your verified final table name
silver_df = spark.read.table("healthcare.silver.patient_encounters_cleaned_final")

In [0]:
# 3. ICD-9 Diagnosis Grouping Logic
def group_icd9(column_name):
    return (when(col(column_name).rlike('^428|^425|^402|^391|^429'), "Circulatory")
            .when(col(column_name).rlike('^490|^491|^492|^493|^494|^495|^496|^500|^501|^502|^503|^504|^505|^508|^511|^518|^460|^461|^462|^463|^464|^465|^466|^470|^471|^472|^473|^474|^475|^476|^477|^478|^480|^481|^482|^483|^484|^485|^486|^487|^488'), "Respiratory")
            .when(col(column_name).rlike('^530|^531|^532|^533|^534|^535|^536|^537|^538|^539|^540|^541|^542|^543|^550|^551|^552|^553|^555|^556|^557|^558|^560|^562|^564|^565|^566|^567|^568|^569|^570|^571|^572|^573|^574|^575|^576|^577|^578|^579'), "Digestive")
            .when(col(column_name).rlike('^250'), "Diabetes")
            .when(col(column_name).rlike('^800|^999'), "Injury") # Shortened for clarity
            .when(col(column_name).rlike('^710|^739'), "Musculoskeletal")
            .when(col(column_name).rlike('^580|^629'), "Genitourinary")
            .when(col(column_name).rlike('^140|^239'), "Neoplasms")
            .otherwise("Other"))

# Apply grouping and enforce Integer types for numeric features to prevent merge errors
gold_features = silver_df.withColumn("primary_diag_group", group_icd9("diag_1")) \
                         .withColumn("secondary_diag_group", group_icd9("diag_2")) \
                         .withColumn("tertiary_diag_group", group_icd9("diag_3")) \
                         .withColumn("time_in_hospital", col("time_in_hospital").cast("int")) \
                         .withColumn("num_lab_procedures", col("num_lab_procedures").cast("int")) \
                         .withColumn("num_procedures", col("num_procedures").cast("int")) \
                         .withColumn("num_medications", col("num_medications").cast("int"))

In [0]:
# 4. Final Feature Selection
final_ml_df = gold_features.select(
    "patient_nbr", "age", "gender", "race", 
    "time_in_hospital", "num_lab_procedures", "num_procedures", "num_medications",
    "number_outpatient", "number_emergency", "number_inpatient", 
    "number_diagnoses", "primary_diag_group", "secondary_diag_group", 
    "tertiary_diag_group", "label"
)

In [0]:
# 5. Write to Gold Delta Table with overwriteSchema
(
    final_ml_df
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true") # This fixes the [DELTA_FAILED_TO_MERGE_FIELDS] error
    .saveAsTable("healthcare.gold.readmission_features")
)

print("Gold Layer successfully saved with forced schema overwrite.")

Gold Layer successfully saved with forced schema overwrite.


In [0]:
display(spark.sql("SELECT * FROM healthcare.gold.readmission_features LIMIT 5"))

patient_nbr,age,gender,race,time_in_hospital,num_lab_procedures,num_procedures,num_medications,number_outpatient,number_emergency,number_inpatient,number_diagnoses,primary_diag_group,secondary_diag_group,tertiary_diag_group,label
8222157,[0-10),Female,Caucasian,1,41,0,1,0,0,0,1,Diabetes,Other,Other,0
55629189,[10-20),Female,Caucasian,3,59,0,18,0,0,0,9,Other,Diabetes,Other,0
86047875,[20-30),Female,AfricanAmerican,2,11,5,13,2,0,1,6,Other,Diabetes,Other,0
82442376,[30-40),Male,Caucasian,2,44,1,16,0,0,0,7,Other,Diabetes,Other,0
42519267,[40-50),Male,Caucasian,1,51,0,8,0,0,0,5,Other,Other,Diabetes,0


In [0]:
# Check the schema of your Gold table
spark.table("healthcare.gold.readmission_features").printSchema()

# Check a few rows to ensure the diagnosis grouping worked
display(spark.sql("SELECT primary_diag_group, count(*) FROM healthcare.gold.readmission_features GROUP BY 1"))

root
 |-- patient_nbr: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- race: string (nullable = true)
 |-- time_in_hospital: integer (nullable = true)
 |-- num_lab_procedures: integer (nullable = true)
 |-- num_procedures: integer (nullable = true)
 |-- num_medications: integer (nullable = true)
 |-- number_outpatient: string (nullable = true)
 |-- number_emergency: string (nullable = true)
 |-- number_inpatient: string (nullable = true)
 |-- number_diagnoses: string (nullable = true)
 |-- primary_diag_group: string (nullable = true)
 |-- secondary_diag_group: string (nullable = true)
 |-- tertiary_diag_group: string (nullable = true)
 |-- label: integer (nullable = true)



primary_diag_group,count(*)
Diabetes,8661
Other,65308
Circulatory,7226
Respiratory,9084
Injury,30
Digestive,8975
Genitourinary,2
Musculoskeletal,25
Neoplasms,32
