In [6]:
# Welcome to your new notebook
# Type here in the cell editor to add code!

workspace = "HospAdmission"
write_lakehouse = "Gold_LH"
table = "claim_features"


def load_tables(table: str, ws='HospAdmission', lh='Silver_LH'):
    path = f"abfss://{ws}@onelake.dfs.fabric.microsoft.com/{lh}.Lakehouse/Tables/{table}"
    df = spark.read.format('delta').load(path)
    return df

StatementMeta(, 0215a3be-f2c5-4ee4-ae93-b973b6dd686e, 8, Finished, Available, Finished)

In [9]:
from pyspark.sql import functions as F
from pyspark.sql import Window

# -----------------------
# Load tables
# -----------------------
df_patients = load_tables('patients')
df_encounters = load_tables('encounters')
df_hospital_costs = load_tables('hospital_costs')
df_labs = load_tables('labs')

# -----------------------
# Filter only inpatient encounters
# -----------------------
df_inpatient = df_encounters.filter(df_encounters["EncounterType"] == "Inpatient")

# -----------------------
# Aggregate labs per patient
# -----------------------
df_labs_agg = df_labs.groupBy("PatientID").agg(
    F.avg("GlucoseLevel").alias("GlucoseLevel"),
    F.avg("Cholesterol").alias("Cholesterol"),
    F.avg(F.split(F.col("BloodPressure"), "/").getItem(0).cast("int")).alias("SystolicBP"),
    F.avg(F.split(F.col("BloodPressure"), "/").getItem(1).cast("int")).alias("DiastolicBP"),
    F.avg("BMI").alias("BMI")
)

# -----------------------
# Join patient info, labs, and costs on PatientID
# -----------------------
df_joined = df_inpatient.join(df_patients, on="PatientID", how="left") \
                        .join(df_labs_agg, on="PatientID", how="left") \
                        .join(df_hospital_costs, on="PatientID", how="left")

# -----------------------
# Sort encounters per patient for lag calculations
# -----------------------
window_spec = Window.partitionBy("PatientID").orderBy("EncounterDate")

df_with_lag = df_joined.withColumn(
    "PrevEncounterDate",
    F.lag("EncounterDate").over(window_spec)
)

# Compute days since previous admission
df_with_lag = df_with_lag.withColumn(
    "DaysSinceLast",
    F.datediff(F.col("EncounterDate"), F.col("PrevEncounterDate"))
)

# Define readmission flag (within 30 days)
df_with_flag = df_with_lag.withColumn(
    "ReadmittedWithin30Days",
    F.when(F.col("DaysSinceLast").isNotNull() & (F.col("DaysSinceLast") <= 30), 1).otherwise(0)
)

# -----------------------
# Past admission history (look back 5 encounters)
# -----------------------
agg_window = Window.partitionBy("PatientID").orderBy("EncounterDate").rowsBetween(-5, -1)
lag_window = Window.partitionBy("PatientID").orderBy("EncounterDate")

df_features = (
    df_with_flag
    .withColumn("PastAdmissions6M", F.count("EncounterDate").over(agg_window))
    .withColumn("AvgPastLengthOfStay", F.avg("LengthOfStayDays").over(agg_window))
    .withColumn("PrevEncounterDate", F.lag("EncounterDate").over(lag_window))
    .withColumn("DaysSinceLastAdmission", F.datediff(F.col("EncounterDate"), F.col("PrevEncounterDate")))
)

# Fill nulls for first admissions
df_features = df_features.fillna({
    "PastAdmissions6M": 0,
    "AvgPastLengthOfStay": 0,
    "DaysSinceLastAdmission": 9999  # large number for "no prior admission"
})

# -----------------------
# Select features for ML
# -----------------------
df_final = df_features.select(
    "PatientID",
    "Age",
    "Gender",
    "ChronicCondition",
    "Department",
    "FundingType",
    "LengthOfStayDays",
    "GlucoseLevel",
    "Cholesterol",
    "SystolicBP",
    "DiastolicBP",
    "BMI",
    "CostPerStay",
    "PastAdmissions6M",
    "AvgPastLengthOfStay",
    "ReadmittedWithin30Days"  # target label
)

# Filter out rows with missing FundingType
df_final = df_final.filter(df_final.FundingType.isNotNull())

# -----------------------
# Save as Gold ML-ready dataset
# -----------------------
df_final.write.format("delta") \
    .mode("overwrite") \
    .option('overwriteSchema', 'true') \
    .save(f"abfss://{workspace}@onelake.dfs.fabric.microsoft.com/{write_lakehouse}.Lakehouse/Tables/hosp_training_data")

print("✅ ML-ready dataset with engineered features created")


StatementMeta(, 0215a3be-f2c5-4ee4-ae93-b973b6dd686e, 11, Finished, Available, Finished)

✅ ML-ready dataset with engineered features created


In [2]:
from pyspark.sql.functions import *

df_p = spark.sql(
    """
    SELECT * FROM Gold_LH.hosp_training_data
    """
)

display(df_p)

StatementMeta(, 444e3385-7a52-4278-a282-ec63743da7af, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f85539aa-81e2-41f1-b904-b034f2586d40)