In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, lit
from delta.tables import DeltaTable

# Step 1: Initialize the Spark session
spark = SparkSession.builder.appName("credit_card_scd2_tracking").getOrCreate()

# Step 2: Define the schema for SCD Type 2 with the following columns:
scd2_columns = ["card_id", "customer_id", "status", "credit_limit", "valid_from", "valid_to", "is_current"]

# Step 3: Define the source data (this represents the updated data)
source_data = [
    (101, 1, "active", 6000.0),  # Card upgraded (updated data in source)
    (102, 2, "blocked", 10000.0)  # Card blocked (updated data in source)
]
source_columns = ["card_id", "customer_id", "status", "credit_limit"]

# Create a DataFrame from the source data
source_df = spark.createDataFrame(source_data, source_columns)

# Step 4: Save the source data as a Delta table (if it doesn't exist already)
source_table_name = "incremental_load.default.credit_cards_source"

source_df.write.format("delta").mode("overwrite").saveAsTable(source_table_name)


In [0]:
from delta.tables import DeltaTable

# New or updated records
upsert_data = [
    (101, 1, "blocked", 6000.0),  # Updated status for card_id 101
    (105, 5, "active", 8000.0)    # New card
]
columns = ["card_id", "customer_id", "status", "credit_limit"]

upsert_df = spark.createDataFrame(upsert_data, columns)

# Load the source table
source_delta = DeltaTable.forName(spark, source_table_name)

# Perform merge
source_delta.alias("target").merge(
    upsert_df.alias("source"),
    "target.card_id = source.card_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()


In [0]:
target_table_name = "incremental_load.default.credit_cards_target"
from pyspark.sql import Row
from pyspark.sql.types import *
if not spark.catalog.tableExists(target_table_name):
    # If the table doesn't exist, create it (initialize the table with the defined schema)
    scd2_schema = StructType([
    StructField("card_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("status", StringType(), True),
    StructField("credit_limit", DoubleType(), True),
    StructField("valid_from", TimestampType(), True),
    StructField("valid_to", TimestampType(), True),
    StructField("is_current", BooleanType(), True)
    ])

    empty_df = spark.createDataFrame([], scd2_schema)

    # Save as Delta table
    empty_df.write.format("delta").mode("overwrite").saveAsTable(target_table_name)
else:
    print(f"Table {target_table_name} already exists.")

In [0]:
delta_table = DeltaTable.forName(spark, target_table_name)
delta_table.alias("t").merge(
    source_df.alias("s"),
    "t.card_id = s.card_id AND t.is_current = true"  # Matching records with active status
).whenMatchedUpdate(
    condition="t.status != s.status OR t.credit_limit != s.credit_limit",  # If the data has changed
    set={
        "valid_to": current_timestamp(),  # Expire the old record
        "is_current": lit(False)  # Mark as not current
    }
).execute()
display(delta_table)

In [0]:
delta_table.alias("t").merge(
    source_df.alias("s"),
    "t.card_id = s.card_id AND t.is_current = false"  # Only insert when current is false (i.e., history has expired)
).whenNotMatchedInsert(
    values={
        "card_id": "s.card_id",
        "customer_id": "s.customer_id",
        "status": "s.status",
        "credit_limit": "s.credit_limit",
        "valid_from": current_timestamp(),
        "valid_to": lit(None),  # Set valid_to as NULL for current records
        "is_current": lit(True)  # Mark the new record as current
    }
).execute()

In [0]:
spark.read.table(target_table_name).display()

In [0]:
spark.read.table(target_table_name).display()