In [0]:
%run ./raw_data_ingestion

In [0]:
try:
    logger.info("started for creating the silverlayer table")
    create_silvertable = (f'''
        CREATE TABLE IF NOT EXISTS etl_incremental.silver.patient_silver (
        patient_id INT,
        name STRING,
        age INT,
        gender STRING,
        city STRING,
        disease STRING,
        bill_amount INT,
        start_date DATE,
        end_date DATE,
        is_current BOOLEAN
        )
        USING DELTA
        PARTITIONED BY (city)'''

    )
    spark.sql(create_silvertable)
    logger.info("created the silverlayer table")
except Exception as e:
    logger.error(f"Error creating silverlayer table: {e}")

In [0]:
try:
    logger.info("Added housekeeping columns to raw data")
    silver_init = raw_data \
    .withColumn("start_date", current_date()) \
    .withColumn("end_date", lit(None).cast("date")) \
    .withColumn("is_current", lit(True))

    silver_init.write.format("delta") \
        .mode("append") \
        .saveAsTable("etl_incremental.silver.patient_silver")
    logger.info("Added housekeeping columns to raw data")
except Exception as e:
    logger.error("Error while adding housekeeping columns to raw data: {e}")


In [0]:
# inc_df=spark.read.format('csv').option('header', 'true').load(incremental_file)
# 
inc_df = spark.read.option("header", "true") \
    .option("inferSchema", "true") \
    .csv(incremental_file)

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import current_date, lit

# Deduplicate source DataFrame by patient_id
inc_df_dedup = inc_df.dropDuplicates(["patient_id"])

silver_tbl = DeltaTable.forName(spark, "etl_incremental.silver.patient_silver")

# Step 1: Close old records
silver_tbl.alias("t").merge(
    inc_df_dedup.alias("s"),
    "t.patient_id = s.patient_id AND t.is_current = true"
).whenMatchedUpdate(
    condition="t.city <> s.city OR t.bill_amount <> s.bill_amount",
    set={
        "end_date": "current_date()",
        "is_current": "false"
    }
).execute()

# Step 2: Insert new version
new_records = inc_df_dedup \
    .withColumn("start_date", current_date()) \
    .withColumn("end_date", lit(None).cast("date")) \
    .withColumn("is_current", lit(True))

new_records.write.format("delta") \
    .mode("append") \
    .saveAsTable("etl_incremental.silver.patient_silver")