In [0]:
dbutils.widgets.text("catalog_name","")
dbutils.widgets.text("schema_name","")
catalog_name=dbutils.widgets.get("catalog_name")
schema_name=dbutils.widgets.get("schema_name")

In [0]:
from pyspark.sql.functions import *

In [0]:
df = spark.read.table(f"{catalog_name}.{schema_name}.patient_data")
df = df.withColumn(
    "Gender",
    when(col("Gender").isin("M", "Male"), "M").when(
        col("Gender").isin("F", "Female"), "F"
    ),
)

In [0]:
bronze_df = df.withColumn(
    "dob",
    when(to_date(col("dob"), "MM-dd-yyyy").isNotNull(), to_date(col("dob"), "MM-dd-yyyy"))
    .when(to_date(col("dob"), "dd-MM-yyyy").isNotNull(), to_date(col("dob"), "dd-MM-yyyy"))
    .when(to_date(col("dob"), "yyyy-MM-dd").isNotNull(), to_date(col("dob"), "yyyy-MM-dd"))
    .when(to_date(col("dob"), "yyyy/MM/dd").isNotNull(), to_date(col("dob"), "yyyy/MM/dd"))
    .when(to_date(col("dob"), "dd/MM/yyyy").isNotNull(), to_date(col("dob"), "dd/MM/yyyy"))
     .when(to_date(col("dob"), "MM/dd/yyyy").isNotNull(), to_date(col("dob"), "MM/dd/yyyy"))
    .when(to_date(col("dob"), "dd MMM, yyyy").isNotNull(), to_date(col("dob"), "dd MMM, yyyy"))
    .when(to_date(col("dob"), "dd MMM yyyy").isNotNull(), to_date(col("dob"), "dd MMM yyyy"))
    .when(to_date(col("dob"), "MMMM d, yyyy").isNotNull(), to_date(col("dob"), "MMMM d, yyyy"))
    .otherwise(None)
).drop("_rescued_data")

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window
from delta.tables import DeltaTable



# Read the existing Silver Table (Delta Table)
try:
    silver_table = DeltaTable.forName(spark, f"{catalog_name}.silver.patient_data")
except:
    silver_table = None

# Define window to keep latest record per `patient_id`
window_spec = Window.partitionBy("patient_id").orderBy(col("source_file").desc())

# Deduplicate latest records
bronze_dedup_df = (
    bronze_df.withColumn("row_num", row_number().over(window_spec))
    .filter(col("row_num") == 1)
    .drop("row_num")
)

# Merge new data into Silver Table
if silver_table:
    silver_table.alias("silver").merge(
        bronze_dedup_df.alias("bronze"),
        "silver.patient_id = bronze.patient_id"
    ).whenMatchedUpdate(set={
        "first_name": col("bronze.first_name"),
        "last_name": col("bronze.last_name"),
        "email_id": col("bronze.email_id"),
        "dob": col("bronze.dob"),
        "Gender": col("bronze.Gender"),
        "address": col("bronze.address"),
        "marital_status": col("bronze.marital_status"),
        "blood_type": col("bronze.blood_type"),
        "contact": col("bronze.contact"),
        "emergency_contact": col("bronze.emergency_contact"),
        "source_file": col("bronze.source_file"),
    }).whenNotMatchedInsert(values={
        "patient_id": col("bronze.patient_id"),
        "first_name": col("bronze.first_name"),
        "last_name": col("bronze.last_name"),
        "email_id": col("bronze.email_id"),
        "dob": col("bronze.dob"),
        "Gender": col("bronze.Gender"),
        "address": col("bronze.address"),
        "marital_status": col("bronze.marital_status"),
        "blood_type": col("bronze.blood_type"),
        "contact": col("bronze.contact"),
        "emergency_contact": col("bronze.emergency_contact"),
        "source_file": col("bronze.source_file"),
    }).execute()
else:
    bronze_dedup_df.write.format("delta").saveAsTable(f"{catalog_name}.silver.patient_data")

print("Incremental load to Silver Layer completed successfully!")

In [0]:
%sql
select * from geekcoders_dev.silver.patient_data