In [1]:
lakehouse_silverTable = "abfss://Fabric_E2E@onelake.dfs.fabric.microsoft.com/Lakehouse_Silver_.Lakehouse/Tables"
spark.conf.set("spark.executorEnv.lakehouse_silverTable", lakehouse_silverTable)
lakehouse_silver_table = spark.conf.get("spark.executorEnv.lakehouse_silverTable")
print(lakehouse_silver_table)


StatementMeta(, 211bdfae-650a-4de3-bace-517657e18b86, 3, Finished, Available, Finished)

abfss://Fabric_E2E@onelake.dfs.fabric.microsoft.com/Lakehouse_Silver_.Lakehouse/Tables


In [2]:
try:
    from pyspark.sql.functions import col, unix_timestamp, to_timestamp, row_number, regexp_extract
    from pyspark.sql.window import Window

    # Set parser policy
    spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

    # Load medication source CSV
    med_df = spark.read.option("header", True).csv("Files/raw/Data_LabResults.csv")

    # Load encounters for provider mapping
    encounters_df = spark.read.option("header", True).csv("Files/raw/encounters.csv")
    enc_provider_df = encounters_df.select(
        col("Id").alias("encounter_id"),
        col("ORGANIZATION").alias("DIM_ProviderId")
    )

    # Join to get DIM_ProviderId
    med_joined = med_df.join(
        enc_provider_df,
        med_df["DIM_EncounterId"] == enc_provider_df["encounter_id"],
        how="left"
    ).drop("encounter_id")

    # Filter essential fields
    med_filtered = med_joined.filter(
        col("DIM_patientId").isNotNull() &
        col("DIM_EncounterId").isNotNull() &
        col("Medication").isNotNull() &
        col("Dosage").isNotNull() &
        col("Cost").isNotNull() &
        col("prescribed_StartDate").isNotNull() &
        col("prescribed_EndDate").isNotNull()
    )

    # Extract numeric dosage
    med_cleaned = med_filtered.withColumn(
        "clean_dose", regexp_extract(col("Dosage"), r"([0-9]+\.?[0-9]*)", 1).cast("double")
    )

    # Convert to timestamps and calculate duration
    med_typed = med_cleaned \
        .withColumn("start_ts", to_timestamp("prescribed_StartDate", "M/d/yyyy")) \
        .withColumn("end_ts", to_timestamp("prescribed_EndDate", "M/d/yyyy")) \
        .withColumn("duration_minutes", (unix_timestamp("end_ts") - unix_timestamp("start_ts")) / 60)

    # Generate surrogate key
    windowSpec = Window.orderBy("DIM_patientId", "DIM_EncounterId")
    med_ranked = med_typed.withColumn("Fact_MedicationId", row_number().over(windowSpec))

    # Final column selection
    fact_med_df = med_ranked.select(
        col("Fact_MedicationId").cast("int"),
        col("DIM_patientId").cast("string"),
        col("DIM_EncounterId").cast("string"),
        col("Medication").alias("DIM_MedicationCode").cast("string"),
        col("DIM_ProviderId").cast("string"),
        col("DIM_DateId").cast("string"),
        col("clean_dose").alias("dose"),
        col("Cost").cast("double").alias("medication_cost"),
        col("duration_minutes").cast("double")
    )

    # Save to Silver Layer
    fact_med_df.write.mode("overwrite").format("delta").save(f"{lakehouse_silver_table}/FactMedications")

except Exception as e:
    print(f"❌ Notebook 'Fact_Medications' failed: {str(e)} — Skipping to next item in pipeline.")

StatementMeta(, 211bdfae-650a-4de3-bace-517657e18b86, 4, Finished, Available, Finished)