In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, md5, current_timestamp
from delta.tables import DeltaTable

spark = SparkSession.builder.appName("SCD1").getOrCreate()

def scd1_upsert(source_df, target_path, join_keys, update_cols="*"):
    """
    Performs SCD1 upsert: inserts new records, updates changed ones.
    
    Args:
        source_df: Incoming source DataFrame
        target_path: Delta table path
        join_keys: List of natural key columns (e.g., ['customer_id'])
        update_cols: Columns to update (default all)
    """
    target_df = spark.read.format("delta").load(target_path)
    
    # Compute hash for change detection (exclude metadata)
    hash_cols = [c for c in source_df.columns if c not in join_keys + ['load_ts']]
    source_df = source_df.withColumn("hash", md5(concat_ws("|", *[col(c) for c in hash_cols])))
    target_df = target_df.withColumn("hash", md5(concat_ws("|", *[col(c) for c in hash_cols])))
    
    # Full outer join to find new/updated records
    join_cond = [col(f"source.{k}") == col(f"target.{k}") for k in join_keys]
    merged_df = target_df.alias("target").join(
        source_df.alias("source"), join_cond, "full_outer"
    )
    
    # New records: target key null
    new_records = merged_df.filter(col("target." + join_keys[0]).isNull()).select([col("source." + c).alias(c) for c in source_df.columns])
    
    # Updated records: hash mismatch
    updated_records = merged_df.filter(
        col("source." + join_keys[0]).isNotNull() & 
        (col("source.hash") != col("target.hash"))
    ).select([col("source." + c).alias(c) for c in source_df.columns])
    
    # Combine new + updated
    upsert_df = new_records.unionByName(updated_records)
    upsert_df = upsert_df.withColumn("load_ts", current_timestamp())
    
    # Write as upsert to Delta
    delta_table = DeltaTable.forPath(spark, target_path)
    delta_table.alias("target").merge(
        upsert_df.alias("source"), join_cond
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

# Usage example
source_df = spark.createDataFrame([
    (1, "John Doe", "new_email@example.com"),
    (2, "Jane Smith", "jane@example.com")
], ["id", "name", "email"])

scd1_upsert(source_df, "/path/to/delta/customer_dim", ["id"])
