In [0]:
from pyspark.sql import SparkSession
from delta.tables import DeltaTable

# Initialize Spark session with Delta support
spark = SparkSession.builder \
    .appName("SCD1_MERGE") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Define Delta Table Path
delta_path = "/tmp/delta_dim_table"

# Sample existing dimension table (dim_table)
dim_data = [
    (1, "Alice", "NY", 30),
    (2, "Bob", "CA", 25),
]
dim_columns = ["id", "name", "state", "age"]
dim_table = spark.createDataFrame(dim_data, dim_columns)

# Save as Delta Table
dim_table.write.format("delta").mode("overwrite").save(delta_path)
dim_table.display()



id,name,state,age
1,Alice,NY,30
2,Bob,CA,25


In [0]:
# Sample staging table (new incoming data)
staging_data = [
    (1, "Alice", "TX", 31),  # Updated state and age
    (3, "Charlie", "FL", 28) # New record
]
staging_table = spark.createDataFrame(staging_data, dim_columns)

# Load Delta Table
delta_dim_table = DeltaTable.forPath(spark, delta_path)

# Perform MERGE (UPSERT)
delta_dim_table.alias("dim").merge(
    staging_table.alias("stg"),
    "dim.id = stg.id"
).whenMatchedUpdate(set={
    "dim.name": "stg.name",
    "dim.state": "stg.state",
    "dim.age": "stg.age"
}).whenNotMatchedInsert(values={
    "id": "stg.id",
    "name": "stg.name",
    "state": "stg.state",
    "age": "stg.age"
}).execute()

# Read and show updated Delta Table
updated_dim_table = spark.read.format("delta").load(delta_path)
updated_dim_table.show()


+---+-------+-----+---+
| id|   name|state|age|
+---+-------+-----+---+
|  3|Charlie|   FL| 28|
|  1|  Alice|   TX| 31|
|  2|    Bob|   CA| 25|
+---+-------+-----+---+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date, lit, col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, BooleanType
from delta.tables import DeltaTable
from datetime import datetime

# Initialize Spark session with Delta support
spark = SparkSession.builder \
    .appName("SCD2_MERGE") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
    .getOrCreate()

# Define Delta Table Path
delta_path = "/tmp/delta_scd2_dim_table"

# Define Schema for Dimension Table (Adding SCD2 columns)
dim_schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("state", StringType(), False),
    StructField("age", IntegerType(), False),
    StructField("effective_date", DateType(), False),
    StructField("end_date", DateType(), True),  # Nullable for active records
    StructField("is_active", BooleanType(), False)
])

# Convert string dates to actual `datetime.date` objects
def to_date(date_str):
    return datetime.strptime(date_str, "%Y-%m-%d").date() if date_str else None

# Sample existing dimension table (dim_table) with correct date format
dim_data = [
    (1, "Alice", "NY", 30, to_date("2023-01-01"), None, True),
    (2, "Bob", "CA", 25, to_date("2023-02-01"), None, True),
]
dim_table = spark.createDataFrame(dim_data, schema=dim_schema)

# Save as Delta Table (initial load)
dim_table.write.format("delta").mode("overwrite").save(delta_path)



In [0]:
# Sample staging table (new incoming data)
staging_data = [
    (1, "Alice", "TX", 31),  # Changed state and age
    (3, "Charlie", "FL", 28) # New record
]

# Create staging DataFrame and explicitly match schema types
staging_table = spark.createDataFrame(staging_data, ["id", "name", "state", "age"]) \
    .withColumn("id", col("id").cast(IntegerType()))\
    .withColumn("age", col("age").cast(IntegerType()))\
    .withColumn("effective_date", current_date().cast(DateType()))\
    .withColumn("end_date", lit(None).cast(DateType()))\
    .withColumn("is_active", lit(True).cast(BooleanType()))

# Load Delta Table
delta_dim_table = DeltaTable.forPath(spark, delta_path)

# Step 1: Mark existing records as inactive if ID exists in staging
delta_dim_table.alias("dim").merge(
    staging_table.alias("stg"),
    "dim.id = stg.id AND dim.is_active = True"
).whenMatchedUpdate(set={
    "end_date": current_date(),
    "is_active": lit(False)  # Mark old record as inactive
}).execute()

# Step 2: Insert new records (both new and updated versions)
staging_table.write.format("delta").mode("append").save(delta_path)

# Read and show updated Delta Table
updated_dim_table = spark.read.format("delta").load(delta_path)
updated_dim_table.show()


+---+-------+-----+---+---------+--------------+----------+
| id|   name|state|age|is_active|effective_date|  end_date|
+---+-------+-----+---+---------+--------------+----------+
|  1|  Alice|   NY| 30|    false|    2023-01-01|2025-03-21|
|  3|Charlie|   FL| 28|     true|    2025-03-21|      null|
|  1|  Alice|   TX| 31|     true|    2025-03-21|      null|
|  2|    Bob|   CA| 25|     true|    2023-02-01|      null|
+---+-------+-----+---+---------+--------------+----------+

