#NOTEBOOK 3/6: SILVER TABLE

##1. FIXING THE SCHEMA AND READING BRONZE DATA

In [0]:
# Selecting silver schema and healthcare_analytics catalog
spark.sql("USE CATALOG healthcare_analytics")
spark.sql("USE SCHEMA silver")
spark.sql("SELECT current_catalog(), current_schema()").show()

In [0]:
# read bronze table and verify the number of rows
bronze_df = spark.table("healthcare_analytics.bronze.bronze_events")
print(f"Total records in bronze: {bronze_df.count()}")

In [0]:
# Show sample data
display(bronze_df.limit(10))

##2. IMPORTING FUNCTIONS AND DATA CLEANING

In [0]:
# importing pyspark sql functions
from pyspark.sql.functions import *

In [0]:
# drop duplicates using encounter id which is the 1st column
cleaned_df = bronze_df.dropDuplicates(["encounter_id"])

In [0]:
# Remove nulls in important columns
cleaned_df = cleaned_df.filter(
    col("encounter_id").isNotNull() &
    col("patient_nbr").isNotNull() &
    col("readmitted").isNotNull()
)

print(f"Records after cleaning: {cleaned_df.count()}")

##3. FEATURE CREATION (PART OF FEATURE ENGINEERING)

In [0]:
# Feature 1: Create age groups - Feature Engineering
silver_df = cleaned_df.withColumn(
    "age_group",
    when(col("age").isin("[0-10)", "[10-20)"), "Pediatric")
    .when(col("age").isin("[20-30)", "[30-40)"), "Young Adult")
    .when(col("age").isin("[40-50)", "[50-60)"), "Adult")
    .when(col("age").isin("[60-70)", "[70-80)"), "Senior")
    .when(col("age").isin("[80-90)", "[90-100)"), "Elderly")
    .otherwise("Unknown")
)

In [0]:
# Feature 2: Create target variable (1 = readmitted within 30 days, 0 = not readmitted) This is to know who got readmitted within 30 days
silver_df = silver_df.withColumn(
    "readmitted_30days",
    when(col("readmitted") == "<30", 1).otherwise(0)
)


In [0]:
# Feature 3: Create medication categories
silver_df = silver_df.withColumn(
    "num_medications_category",
    when(col("num_medications") < 10, "Low")
    .when(col("num_medications") < 20, "Medium")
    .otherwise("High")
)

In [0]:
# Feature 4: Create length of stay categories
silver_df = silver_df.withColumn(
    "time_in_hospital_category",
    when(col("time_in_hospital") <= 3, "Short")
    .when(col("time_in_hospital") <= 7, "Medium")
    .otherwise("Long")
)

In [0]:
# Feature 5: Create total procedures
silver_df = silver_df.withColumn(
    "total_procedures",
    coalesce(col("num_procedures"), lit(0)) + 
    coalesce(col("num_lab_procedures"), lit(0))
)

In [0]:
# Feature 6:  Create emergency flag
silver_df = silver_df.withColumn(
    "is_emergency",
    when(col("admission_type_id").isin(1, 2), 1).otherwise(0)
)

##4. HANDLING NULL VALUES

In [0]:
# Fill missing values where we have data for other columns - String Values
silver_df = silver_df.fillna({
    "weight": "Unknown",
    "medical_specialty": "Unknown",
    "race": "Unknown"
})

In [0]:
# Fill numeric nulls with 0
silver_df = silver_df.fillna(0, subset=["num_procedures", "num_lab_procedures", "num_medications"])

In [0]:
#Handling unknown weights
silver_df = silver_df.withColumn(
    "weight",
    when(col("weight") == "?", "Unknown").otherwise(col("weight"))
)

##5. CREATING DATAFRAME, SILVER TABLE AND WRITING INTO IT

In [0]:
from datetime import datetime
# Add silver table processing timestamp
silver_df = silver_df \
    .withColumn("silver_processing_timestamp", current_timestamp()) \
    .withColumn("silver_batch_id", lit(datetime.now().strftime("%Y%m%d_%H%M%S")))

In [0]:
# Define silver table
silver_table = "healthcare_analytics.silver.silver_events"

In [0]:
# Write to silver table
silver_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(silver_table)

print("Silver table created successfully!")


In [0]:
#Verifying we have simply cleaned, added features and not deleted any rows
print(f"Total records in silver table: {silver_df.count()}")

##6. IDENTIFYING HIGH RISK PATIENTS AND CREATING A TABLE

In [0]:
# Identify high-risk patients
high_risk_df = silver_df.filter(
    (col("num_medications") >= 15) |
    (col("number_diagnoses") >= 7) |
    (col("number_inpatient") >= 2)
).withColumn("risk_flag", lit("HIGH_RISK"))

In [0]:
# Create high risk table
high_risk_table = "healthcare_analytics.silver.high_risk_patients"

In [0]:
# Write to high risk table
high_risk_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable(high_risk_table)

print(f"High-risk patients: {high_risk_df.count()}")

##6. OPTIMISING AND COMPARING THE VALUES

In [0]:
# Optimizing tables
spark.sql("OPTIMIZE healthcare_analytics.silver.silver_events")
spark.sql("OPTIMIZE healthcare_analytics.silver.high_risk_patients")
print("Tables optimized!")

In [0]:
#Comparing the number of records in the silver table and high risk table
print("=" * 60)
print("SILVER LAYER COMPLETE")
print("=" * 60)
print(f"Silver events: {spark.table(silver_table).count()}")
print(f"High-risk patients: {spark.table(high_risk_table).count()}")
print("=" * 60)