In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from delta.tables import DeltaTable

In [None]:
dbutils.widgets.text("catalog", "")
dbutils.widgets.text("schema", "")

In [None]:
catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")
spark.sql(f"USE CATALOG {catalog}")
spark.sql(f"USE SCHEMA {schema}")

In [0]:
stream_df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaLocation", "/Volumes/gautham/gtk_scm/test_vlm/scm/crm_sales_details/")   # ðŸ”¹ stores inferred schema here
    .option("header", "true")
    .option("inferSchema", "true")                          # infer data types
    .option("cloudFiles.inferColumnTypes", "true")
    .option("cloudFiles.maxFilesPerTrigger", 100)
    .load("/Volumes/gautham/gtk_scm/test_vlm/src/crm_sales_details/")
)

In [0]:
def process_batch(df, batch_id):
    df=df.withColumn('sls_order_dt',expr("""CASE 
				WHEN sls_order_dt = 0 OR LEN(sls_order_dt) != 8 THEN NULL
				ELSE to_date(CAST(sls_order_dt AS STRING), 'yyyyMMdd')
			END"""))
    df=df.withColumn('sls_ship_dt',expr("""CASE 
                WHEN sls_ship_dt  = 0 OR LEN(sls_ship_dt ) != 8 THEN NULL
                ELSE to_date(CAST(sls_ship_dt AS STRING), 'yyyyMMdd')
            END"""))

    df=df.withColumn('sls_due_dt',expr("""CASE 
                WHEN sls_due_dt  = 0 OR LEN(sls_ship_dt ) != 8 THEN NULL
                ELSE to_date(CAST(sls_ship_dt AS STRING), 'yyyyMMdd')
            END"""))
    df = df.withColumn(
    "sls_sales",
    when(
        (col("sls_sales").isNull()) | 
        (col("sls_sales") <= 0) | 
        (col("sls_sales") != col("sls_quantity") * abs(col("sls_price"))),
        col("sls_quantity") * abs(col("sls_price"))
    ).otherwise(col("sls_sales"))
    )

    # Step 2: Recalculate sls_price (use the updated sls_sales)
    df = df.withColumn(
    "sls_price",
    when(
        (col("sls_price").isNull()) | (col("sls_price") <= 0),
        when(col("sls_quantity") == 0, lit(None))  # handle division by zero
        .otherwise(col("sls_sales") / col("sls_quantity"))
    ).otherwise(col("sls_price"))
    )


    src_df=df.withcolumn('audit_checksum',xxhash64(concat(coalesce('sls_cust_id',lit('null')),
                                                      coalesce('sls_order_dt',lit('null')),
                                                      coalesce('sls_ship_dt',lit('null')),
                                                      coalesce('sls_due_dt',lit('null')),
                                                      coalesce('sls_sales',lit('null')),
                                                      coalesce('sls_quantity'.cast("string"),lit('null')),
                                                      coalesce('sls_price'.cast("string"),lit('null'))
                                                     )
                                                )
                     ).withColumn('primary_key',concat('sls_ord_num','sls_prd_key'))
    
    
    tgt_active_df=spark.sql("select concat(sls_ord_num,sls_ord_num) as primary_key,audit_checksum from crm_sales_details where active_flag='Y'")    
    
    # ------------------------------
    # Step 2: Left join source with active target on primary key
    # ------------------------------
    join_df = (
        src_df.alias("src")
        .join(tgt_active_df.alias("tgt"), on="primary_key", how="left")
    )
    
    # ------------------------------
    # Step 3: Drop completely same rows (no change)
    # ------------------------------
    # Rows where checksum is same => unchanged
    changed_df = join_df.filter(
        (F.col("tgt.audit_checksum").isNull()) | 
        (F.col("src.audit_checksum") != F.col("tgt.audit_checksum"))
    )
    
    # ------------------------------
    # Step 4: Handle changed/new records
    # ------------------------------
    
    # Separate new rows and changed rows
    new_rows_df = changed_df.filter(F.col("tgt.primary_key").isNull())
    changed_existing_df = changed_df.filter(F.col("tgt.primary_key").isNotNull())
    
    # Create the new version rows for changed records
    new_version_rows = changed_existing_df.select(
        "src.*"
    ).withColumn("effective_start_date", F.current_timestamp()) \
    .withColumn("effective_end_date", F.lit(None).cast("timestamp")) \
    .withColumn("is_active", F.lit("Y"))
    
    # Old version rows need to be deactivated
    old_version_rows = changed_existing_df.select("tgt.*").withColumn("is_active", F.lit("N")) \
        .withColumn("effective_end_date", F.current_timestamp())
    
    # Combine all three (new inserts + new version + old version)
    final_merge_df = (
        new_rows_df.select("src.*").withColumn("merge_key", F.col("src.primary_key"))
        .unionByName(
            old_version_rows.withColumn("merge_key", F.col("primary_key"))
        )
        .unionByName(
            new_version_rows.withColumn("merge_key", F.lit(None))
        )
    )
    
    # ------------------------------
    # Step 5: Perform MERGE in a single step
    # ------------------------------
    
    from delta.tables import DeltaTable
    
    delta_tgt = DeltaTable.forName(spark, "crm_sales_details")
    
    (
        delta_tgt.alias("tgt")
        .merge(
            final_merge_df.alias("src"),
            "concat(tgt.sls_ord_num,tgt.sls_ord_num) = src.merge_key"
        )
        # update old record to inactive
        .whenMatchedUpdate(set={
            "is_active": "'N'",
            "effective_end_date": "current_timestamp()"
        })
        # insert new or changed version
        .whenNotMatchedInsert(values={
            "sls_ord_num": "src.sls_ord_num",
            "sls_prd_key": "src.sls_prd_key",
            "sls_cust_id": "src.sls_cust_id",
            "sls_order_dt": "src.sls_order_dt",
            "sls_ship_dt": "src.sls_ship_dt",
            "sls_due_dt": "src.sls_due_dt",
            "sls_sales": "src.sls_sales",
            "sls_quantity": "src.sls_quantity",
            "sls_price": "src.sls_price",
            "dwh_create_date": "src.dwh_create_date",
            "audit_checksum": "src.audit_checksum",
            "is_active": "'Y'",
            "effective_start_date": "current_timestamp()",
            "effective_end_date": "NULL"
        })
        .execute()
    )

  

In [0]:
stream_df.writeStream.foreachBatch(process_batch).option("checkpointLocation", "/Volumes/gautham/gtk_scm/test_vlm/chkp/crm_sales_details/").trigger(availableNow=True).start().awaitTermination()