In [0]:
%sql
create or replace table workspace.default.orders_extract as
select
  *
from
  samples.tpch.orders

In [0]:
%sql
create or replace table workspace.default.table1_P
select * from workspace.default.orders_extract where o_orderstatus = 'P';

create or replace table workspace.default.table1_F
select * from workspace.default.orders_extract where o_orderstatus = 'F';

create or replace table workspace.default.table1_O
select * from workspace.default.orders_extract where o_orderstatus = 'O';

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DateType
from pyspark.sql.functions import col, lit
from delta.tables import DeltaTable
from datetime import date

# Sample data for the updates with customerId, new address, and effective date of the change
updates_data = [
    (1, 'kambar street', date(2023, 6, 17)),
    (3, 'porur street', date(2023, 6, 17)),
    (6, 'thuvarnkuruchi street', date(2023, 6, 17)),
    (7, 'thiruvanmiyur street', date(2023, 6, 17))
]

# Define schema for the updates DataFrame
updates_schema = StructType([
    StructField("customerId", IntegerType(), False),
    StructField("address", StringType(), False),
    StructField("effectiveDate", DateType(), False)
])

# Create DataFrame for updates using the sample data and schema
updates_df = spark.createDataFrame(updates_data, updates_schema)

# Define the Delta table name where customer data is stored
delta_table_name = "customers"

# Check if the Delta table exists in the metastore
if not spark.catalog.tableExists(delta_table_name):
    # If table does not exist, prepare initial data from updates with current flag set to True and no endDate
    initial_data = [(row.customerId, row.address, True, row.effectiveDate, None) for row in updates_df.collect()]

    # Define schema for the initial data including current and endDate fields for SCD Type 2
    initial_schema = StructType([
        StructField("customerId", IntegerType(), False),
        StructField("address", StringType(), False),
        StructField("current", BooleanType(), False),  # Indicates if this record is the current active record
        StructField("effectiveDate", DateType(), False),  # When this record became effective
        StructField("endDate", DateType(), True)  # When this record was superseded (null if current)
    ])

    # Create DataFrame for initial data to seed the Delta table
    initial_df = spark.createDataFrame(initial_data, initial_schema)

    # Write the initial DataFrame to a new Delta table, overwriting if exists
    initial_df.write.format("delta").mode("overwrite").saveAsTable(delta_table_name)

# Load the existing Delta table as a DeltaTable object for merge operations
customersTable = DeltaTable.forName(spark, delta_table_name)

# Identify updates where the address has changed for customers currently marked as current
newAddressesToInsert = updates_df.alias("updates") \
    .join(customersTable.toDF().alias("customers"), "customerId") \
    .where("customers.current = true AND updates.address <> customers.address")

# Prepare staged updates for merge:
# - For changed addresses, create rows with NULL mergeKey to force insert of new records
# - For all updates, create rows with mergeKey as customerId to match existing records
stagedUpdates = newAddressesToInsert.selectExpr("NULL as mergeKey", "updates.*") \
    .union(updates_df.selectExpr("customerId as mergeKey", "*"))

# Perform SCD Type 2 merge:
# - When matched and current record has different address, mark current record as not current and set endDate
# - When not matched, insert new record as current with no endDate
customersTable.alias("customers").merge(
    stagedUpdates.alias("staged_updates"),
    "customers.customerId = mergeKey"
).whenMatchedUpdate(
    condition="customers.current = true AND customers.address <> staged_updates.address",
    set={
        "current": lit(False),  # Mark old record as no longer current
        "endDate": col("staged_updates.effectiveDate")  # Set endDate to effectiveDate of new record
    }
).whenNotMatchedInsert(
    values={
        "customerId": col("staged_updates.customerId"),
        "address": col("staged_updates.address"),
        "current": lit(True),  # New record is current
        "effectiveDate": col("staged_updates.effectiveDate"),
        "endDate": lit(None)  # No endDate for current record
    }
).execute()

# Display the final state of the Delta table after applying updates
display(customersTable.toDF())

In [0]:
%sql
select * from workspace.default.customers