###  Silver Layer (Clean & Standardise All 3 Sources)

#### **Clean Admissions**

In [8]:
from pyspark.sql import functions as F

df_adm = spark.table("bronze_admissions")

df_silver_admissions = (
    df_adm
    .select(
        F.col("jurisdiction").alias("State"),
        F.col("week_end_date").alias("WeekDate"),
        F.col("avg_inpatient_beds").alias("AvgTotalBeds"),
        F.col("avg_inpatient_beds_used").alias("AvgBedsUsed"),
        F.col("avg_total_icu_beds").alias("AvgTotalICUBeds"),
        F.col("avg_icu_beds_used").alias("AvgICUBedsUsed"),
        F.col("avg_percent_inpatient_beds_occupied").alias("AvgOccupancyRate"),
        F.col("avg_percent_staff_icu_beds_occupied").alias("AvgICUOccupancyRate"),
        F.col("total_admissions_all_covid_confirmed").alias("TotalCovidAdmissions"),
        F.col("total_admissions_all_influenza_confirmed").alias("TotalInfluenzaAdmissions")
    )
    .withColumn("WeekDate", F.to_date("WeekDate"))
    .withColumn("AvgOccupancyRate", F.round(F.col("AvgOccupancyRate").cast("double") * 100, 2))
    .withColumn("AvgICUOccupancyRate", F.round(F.col("AvgICUOccupancyRate").cast("double") * 100, 2))
    .withColumn("AvgBedsUsed", F.col("AvgBedsUsed").cast("double"))
    .withColumn("AvgTotalBeds", F.col("AvgTotalBeds").cast("double"))
    .withColumn("AvgICUBedsUsed", F.col("AvgICUBedsUsed").cast("double"))
    .withColumn("AvgTotalICUBeds", F.col("AvgTotalICUBeds").cast("double"))
    .withColumn("TotalCovidAdmissions", F.col("TotalCovidAdmissions").cast("integer"))
    .withColumn("TotalInfluenzaAdmissions", F.col("TotalInfluenzaAdmissions").cast("integer"))
    .dropna(subset=["State", "WeekDate"])
)

print(f"Row count: {df_silver_admissions.count()}")
display(df_silver_admissions.limit(5))

StatementMeta(, 057ee857-3d11-4e13-aa89-483c5ece3244, 10, Finished, Available, Finished, False)

Row count: 1000


SynapseWidget(Synapse.DataFrame, fbb15c6c-9885-4cd9-96fe-ab4f5813d9be)

In [9]:
df_silver_admissions.write.format("delta").mode("overwrite").saveAsTable("silver_admissions")
print("✅ silver_admissions saved")

StatementMeta(, 057ee857-3d11-4e13-aa89-483c5ece3244, 11, Finished, Available, Finished, False)

✅ silver_admissions saved


#### **Clean Staff**

In [4]:
df_staff = spark.table("bronze_staff_schedule")

df_silver_staff = (
    df_staff
    .withColumn("ShiftDate", F.to_date("ShiftDate"))
    .withColumn("HoursWorked", F.col("HoursWorked").cast("integer"))
    .withColumn("Department", F.trim(F.upper("Department")))
    .withColumn("Role", F.trim(F.initcap("Role")))
    .dropna(subset=["StaffID", "ShiftDate"])
)

df_silver_staff.write.format("delta").mode("overwrite").saveAsTable("silver_staff")
print(f"✅ silver_staff saved — {df_silver_staff.count()} rows")

StatementMeta(, 057ee857-3d11-4e13-aa89-483c5ece3244, 6, Finished, Available, Finished, False)

✅ silver_staff saved — 40 rows


#### **Clean Drugs**

In [5]:
df_drugs = spark.table("bronze_drug_inventory")

df_silver_drugs = (
    df_drugs
    .withColumn("StockLevel", F.col("StockLevel").cast("integer"))
    .withColumn("MaxCapacity", F.col("MaxCapacity").cast("integer"))
    .withColumn("ReorderThreshold", F.col("ReorderThreshold").cast("integer"))
    .withColumn("UnitCost", F.col("UnitCost").cast("double"))
    .withColumn("LastRestocked", F.to_date("LastRestocked"))
    .withColumn("StockPct",
        F.round(F.col("StockLevel") / F.col("MaxCapacity") * 100, 1))
    .withColumn("Department", F.trim(F.upper("Department")))
    .dropna(subset=["DrugID"])
)

df_silver_drugs.write.format("delta").mode("overwrite").saveAsTable("silver_drugs")
print(f"✅ silver_drugs saved — {df_silver_drugs.count()} rows")

StatementMeta(, 057ee857-3d11-4e13-aa89-483c5ece3244, 7, Finished, Available, Finished, False)

✅ silver_drugs saved — 20 rows
