In [0]:
from pyspark.sql.functions import sha2, col, current_timestamp, monotonically_increasing_id

In [0]:
%sql

create schema if not exists rwd.silver;

In [0]:
%sql

select * from rwd.bronze.primary_cancer_condition_raw;
     

Patient_ID,Primary_Cancer_Condition,Diagnosis_Date,Hospital
P001,Breast Carcinoma,2024-08-12,Apollo Hospitals
P002,Lung Adenocarcinoma,2023-11-03,AIIMS Delhi
P003,Colorectal Cancer,2024-05-19,Fortis Chennai
P004,Prostate Cancer,2023-09-25,CMC Vellore
P005,Hepatocellular Carcinoma,2024-07-14,Narayana Health
P006,Ovarian Cancer,2023-12-01,Max Healthcare
P007,Melanoma,2024-06-22,KIMS Hyderabad
P008,Pancreatic Cancer,2024-03-18,Manipal Hospitals
P009,Gastric Cancer,2024-04-10,Sankara Nethralaya
P010,Bladder Cancer,2023-10-05,Tata Memorial


In [0]:
bronze_table = 'rwd.bronze.primary_cancer_condition_raw'
silver_table = 'rwd.silver.primary_cancer_condition'
checkpoint_path = "/Volumes/rwd/silver/my_volume/silver/primary_cancer_condition/checkpoint/"

In [0]:
df_bronze = (
    spark.readStream.table(bronze_table)
)


In [0]:
df_silver_clean = (
    df_bronze
        .dropDuplicates(["Patient_ID"])
        .withColumn("load_timestamp", current_timestamp())
)


In [0]:
from delta.tables import DeltaTable

def merge_primary_cancer_condition(batch_df, batch_id):
    if not spark.catalog.tableExists(silver_table):
        batch_df.write.format("delta").mode("overwrite").saveAsTable(silver_table)
        return

    # Load Delta table by name and upsert
    primary_cancer_condition = DeltaTable.forName(spark, silver_table)

    (primary_cancer_condition.alias("t")
        .merge(
            batch_df.alias("s"),
            "t.Patient_ID = s.Patient_ID"
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute())



(
    df_silver_clean.writeStream
        .foreachBatch(merge_primary_cancer_condition)
        .outputMode("update")
        .trigger(availableNow=True)
        .option("checkpointLocation", checkpoint_path)
        .start()
)

<pyspark.sql.connect.streaming.query.StreamingQuery at 0xff150849ed50>

In [0]:
%sql

select * from rwd.silver.primary_cancer_condition;

Patient_ID,Primary_Cancer_Condition,Diagnosis_Date,Hospital
P005,Hepatocellular Carcinoma,2024-07-14,Narayana Health
P008,Pancreatic Cancer,2024-03-18,Manipal Hospitals
P001,Breast Carcinoma,2024-08-12,Apollo Hospitals
P009,Gastric Cancer,2024-04-10,Sankara Nethralaya
P003,Colorectal Cancer,2024-05-19,Fortis Chennai
P002,Lung Adenocarcinoma,2023-11-03,AIIMS Delhi
P006,Ovarian Cancer,2023-12-01,Max Healthcare
P010,Bladder Cancer,2023-10-05,Tata Memorial
P004,Prostate Cancer,2023-09-25,CMC Vellore
P007,Melanoma,2024-06-22,KIMS Hyderabad
