In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable



In [0]:
# Use catalog
spark.sql("USE CATALOG demo_project")

print("‚úÖ Setup complete!")

In [0]:
# Read from Bronze layer
bronze_df = spark.table("demo_project.bronze.customers_bronze")

print(f"üìä Bronze records: {bronze_df.count():,}")
display(bronze_df.limit(10))

In [0]:
# Select relevant columns and add SCD tracking columns
silver_initial = bronze_df.select(
    "customer_id",
    "full_name",
    "email",
    "city",
    "subscription_plan",
    "account_status",
    "updated_at"
) \
.withColumn("valid_from", current_date()) \
.withColumn("valid_to", lit("9999-12-31").cast("date")) \
.withColumn("is_current", lit(True)) \
.withColumn("record_version", lit(1)) \
.withColumn("silver_load_timestamp", current_timestamp())

print("üìä Silver structure with SCD columns:")
silver_initial.printSchema()
display(silver_initial.limit(10))

In [0]:
# Write initial Silver table
silver_initial.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("demo_project.silver.customers")

print("‚úÖ Initial Silver table created!")
print(f"‚úÖ Records: {spark.table('demo_project.silver.customers').count():,}")

In [0]:
# Verify
silver_table = spark.table("demo_project.silver.customers")

print("=" * 70)
print("üìä SILVER LAYER - INITIAL LOAD")
print("=" * 70)
print(f"‚úÖ Total records: {silver_table.count():,}")
print(f"‚úÖ All records have is_current = True: {silver_table.filter('is_current = true').count():,}")
print("\nüìã Columns:")
for col in silver_table.columns:
    print(f"  ‚Ä¢ {col}")

print("\nüìä Sample data:")
display(silver_table.limit(10))

print("\n‚úÖ INITIAL SILVER TABLE READY!")

In [0]:
# Create some updates to test SCD Type 1 and Type 2

# Get some existing customers
test_customers = spark.table("demo_project.silver.customers").limit(5).toPandas()

print("üìä Original 5 customers:")
print(test_customers[['customer_id', 'full_name', 'email', 'city', 'subscription_plan', 'account_status']])

# Modify them to simulate changes
updates_data = [
    # Customer 1: Email change (SCD Type 1 - overwrite)
    ("C00001", "Customer_1", "customer1_updated@example.com", test_customers.iloc[0]['city'], 
     test_customers.iloc[0]['subscription_plan'], test_customers.iloc[0]['account_status']),
    
    # Customer 2: City change (SCD Type 2 - history)
    ("C00002", test_customers.iloc[1]['full_name'], test_customers.iloc[1]['email'], "Mumbai",
     test_customers.iloc[1]['subscription_plan'], test_customers.iloc[1]['account_status']),
    
    # Customer 3: Plan upgrade (SCD Type 2 - history)
    ("C00003", test_customers.iloc[2]['full_name'], test_customers.iloc[2]['email'], 
     test_customers.iloc[2]['city'], "Gold", test_customers.iloc[2]['account_status']),
    
    # Customer 4: Status change (SCD Type 2 - history)
    ("C00004", test_customers.iloc[3]['full_name'], test_customers.iloc[3]['email'],
     test_customers.iloc[3]['city'], test_customers.iloc[3]['subscription_plan'], "INACTIVE"),
    
    # Customer 5: Name correction (SCD Type 1 - overwrite)
    ("C00005", "Customer_5_Corrected", test_customers.iloc[4]['email'], test_customers.iloc[4]['city'],
     test_customers.iloc[4]['subscription_plan'], test_customers.iloc[4]['account_status']),
]


In [0]:
# Create DataFrame with explicit schema
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from datetime import datetime

# Define schema
updates_schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("full_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("city", StringType(), True),
    StructField("subscription_plan", StringType(), True),
    StructField("account_status", StringType(), True),
    StructField("updated_at", TimestampType(), True)
])

# Get current timestamp as Python datetime
now = datetime.now()

# Create DataFrame
updates_df = spark.createDataFrame([
    (r[0], r[1], r[2], r[3], r[4], r[5], now)
    for r in updates_data
], schema=updates_schema)

print("\nüìä Updated records (simulating changes):")
display(updates_df)

# Save as temp view for merge
updates_df.createOrReplaceTempView("customer_updates")

In [0]:
# Get Silver table
silver_table = DeltaTable.forName(spark, "demo_project.silver.customers")

# Define what's Type 1 vs Type 2
# Type 1 (Overwrite): full_name, email
# Type 2 (History): city, subscription_plan, account_status




In [0]:

# STEP 1: Handle SCD Type 1 updates (overwrite name/email only)
silver_table.alias("current").merge(
    updates_df.alias("updates"),
    "current.customer_id = updates.customer_id AND current.is_current = true"
).whenMatchedUpdate(
    condition = """
        (current.full_name != updates.full_name OR current.email != updates.email)
        AND current.city = updates.city
        AND current.subscription_plan = updates.subscription_plan
        AND current.account_status = updates.account_status
    """,
    set = {
        "full_name": "updates.full_name",
        "email": "updates.email",
        "updated_at": "updates.updated_at",
        "silver_load_timestamp": "current_timestamp()"
    }
).execute()

print("‚úÖ Step 1: SCD Type 1 updates applied (name/email corrections)")


In [0]:

# STEP 2: Close old records for SCD Type 2 changes
silver_table.alias("current").merge(
    updates_df.alias("updates"),
    "current.customer_id = updates.customer_id AND current.is_current = true"
).whenMatchedUpdate(
    condition = """
        current.city != updates.city
        OR current.subscription_plan != updates.subscription_plan
        OR current.account_status != updates.account_status
    """,
    set = {
        "is_current": "false",
        "valid_to": "current_date()",
        "silver_load_timestamp": "current_timestamp()"
    }
).execute()

print("‚úÖ Step 2: Old records closed for SCD Type 2 changes")


In [0]:

# STEP 3: Insert new versions for SCD Type 2 changes
new_versions = spark.sql("""
    SELECT 
        u.customer_id,
        u.full_name,
        u.email,
        u.city,
        u.subscription_plan,
        u.account_status,
        u.updated_at,
        current_date() as valid_from,
        cast('9999-12-31' as date) as valid_to,
        true as is_current,
        c.record_version + 1 as record_version,
        current_timestamp() as silver_load_timestamp
    FROM customer_updates u
    INNER JOIN demo_project.silver.customers c
        ON u.customer_id = c.customer_id
        AND c.is_current = false
        AND c.valid_to = current_date()
    WHERE 
        u.city != c.city
        OR u.subscription_plan != c.subscription_plan
        OR u.account_status != c.account_status
""")

# Insert new versions
new_versions.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("demo_project.silver.customers")

print(f"‚úÖ Step 3: {new_versions.count()} new versions inserted for SCD Type 2")

print("\n SCD MERGE COMPLETE!")

In [0]:
# Check the updated records
print("=" * 70)
print("üìä VERIFYING SCD LOGIC")
print("=" * 70)

# Check Customer 1 (email change - Type 1)
print("\nüîç Customer C00001 (Email changed - SCD Type 1):")
display(spark.sql("""
    SELECT customer_id, full_name, email, is_current, record_version, valid_from, valid_to
    FROM demo_project.silver.customers
    WHERE customer_id = 'C00001'
    ORDER BY valid_from
"""))

# Check Customer 2 (city change - Type 2)
print("\nüîç Customer C00002 (City changed - SCD Type 2):")
display(spark.sql("""
    SELECT customer_id, city, subscription_plan, is_current, record_version, valid_from, valid_to
    FROM demo_project.silver.customers
    WHERE customer_id = 'C00002'
    ORDER BY valid_from
"""))

# Check Customer 3 (plan upgrade - Type 2)
print("\nüîç Customer C00003 (Plan upgraded - SCD Type 2):")
display(spark.sql("""
    SELECT customer_id, subscription_plan, account_status, is_current, record_version, valid_from, valid_to
    FROM demo_project.silver.customers
    WHERE customer_id = 'C00003'
    ORDER BY valid_from
"""))

# Overall stats
print("\nüìä Overall Silver Table Stats:")
display(spark.sql("""
    SELECT 
        COUNT(*) as total_records,
        COUNT(DISTINCT customer_id) as unique_customers,
        SUM(CASE WHEN is_current = true THEN 1 ELSE 0 END) as current_records,
        SUM(CASE WHEN is_current = false THEN 1 ELSE 0 END) as historical_records,
        MAX(record_version) as max_version
    FROM demo_project.silver.customers
"""))

print("\n‚úÖ SCD TYPE 1 & TYPE 2 WORKING PERFECTLY! üéâ")