In [1]:
import os
# get the accessKey and secretKey from Environment
accessKey = os.environ['AWS_ACCESS_KEY_ID']
secretKey = os.environ['AWS_SECRET_ACCESS_KEY']

import pyspark
from pyspark.sql import SparkSession

conf = pyspark.SparkConf()

# point to mesos master or zookeeper entry (e.g., zk://10.10.10.10:2181/mesos)
conf.setMaster("spark://spark-master:7077")

# set other options as desired
conf.set("spark.executor.memory", "8g")
conf.set("spark.executor.cores", "1")
conf.set("spark.core.connection.ack.wait.timeout", "1200")
conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
conf.set("spark.hadoop.fs.s3a.endpoint", "http://minio-1:9000")
conf.set("spark.hadoop.fs.s3a.path.style.access", "true")
conf.set("spark.hadoop.fs.s3a.access.key", accessKey)
conf.set("spark.hadoop.fs.s3a.secret.key", secretKey)
conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
conf.set("spark.sql.catalogImplementation", "hive")
conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
conf.set("spark.sql.catalog.hive", "org.apache.iceberg.spark.SparkCatalog")
conf.set("spark.sql.catalog.spark_catalog.type", "hive")
conf.set("spark.sql.catalog.spark_catalog.uri", "thrift://hive-metastore:9083")
conf.set("spark.sql.catalog.spark_catalog.warehouse", "s3a://admin-bucket/iceberg/warehouse")
conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.0")
conf.set("spark.sql.legacy.allowNonEmptyLocationInCTAS","true")
conf.set("spark.sql.hive.metastore.jars","builtin")

spark = SparkSession.builder.appName('Jupyter').config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("INFO")

sc = spark.sparkContext

In [2]:
def create_full_set(batch_curr, batch_chg, load_date):
    batch_dict = {row[0]: row for row in batch_curr}
    
    #  Step 2: Update/add rows from batch_upd
    for row in batch_chg:
        person_id = row[0]
        action = row[1]
        if action == "DEL":
            # Delete the record if it exists
            batch_dict.pop(person_id, None)
        else:
            # Otherwise, update/add
            batch_dict[person_id] = row + (load_date,)
    
    # Step 3: Convert back to list and replace load_date in each row
    merged_batch = [row[:-1] + (load_date,) for row in batch_dict.values()]
    
    # Optional: sort by person_id
    merged_batch.sort(key=lambda x: x[0])

    return merged_batch


#### Batch 1 – Initial load (T1)
 * All records are new inserts

In [3]:
batch_1 = [
    (1,  "Alice",  "Meyer",   "Zurich",  "alice.meyer@example.com", "ACTIVE"),
    (2,  "Bob",    "Keller",  "Bern",    "bob.keller@example.com", "ACTIVE"),
    (3,  "Clara",  "Schmid",  "Basel",   "clara.schmid@example.com", "ACTIVE"),
    (4,  "David",  "Fuchs",   "Zurich",  "david.fuchs@example.com", "ACTIVE"),
    (5,  "Eva",    "Brunner", "Lucerne", "eva.brunner@example.com", "ACTIVE"),
    (6,  "Frank",  "Weber",   "Bern",    "frank.weber@example.com", "ACTIVE"),
    (7,  "Grace",  "Huber",   "Basel",   "grace.huber@example.com", "ACTIVE"),
    (8,  "Henry",  "Meier",   "Zurich",  "henry.meier@example.com", "ACTIVE"),
    (9,  "Irene",  "Kunz",    "Bern",    "irene.kunz@example.com", "ACTIVE"),
    (10, "Jonas",  "Baum",    "StGallen","jonas.baum@example.com", "ACTIVE"),
]
batch_1_load_date = "2024-01-01"
batch_1 = create_full_set([], batch_1, batch_1_load_date)


In [4]:
print(batch_1)

[(1, 'Alice', 'Meyer', 'Zurich', 'alice.meyer@example.com', 'ACTIVE', '2024-01-01'), (2, 'Bob', 'Keller', 'Bern', 'bob.keller@example.com', 'ACTIVE', '2024-01-01'), (3, 'Clara', 'Schmid', 'Basel', 'clara.schmid@example.com', 'ACTIVE', '2024-01-01'), (4, 'David', 'Fuchs', 'Zurich', 'david.fuchs@example.com', 'ACTIVE', '2024-01-01'), (5, 'Eva', 'Brunner', 'Lucerne', 'eva.brunner@example.com', 'ACTIVE', '2024-01-01'), (6, 'Frank', 'Weber', 'Bern', 'frank.weber@example.com', 'ACTIVE', '2024-01-01'), (7, 'Grace', 'Huber', 'Basel', 'grace.huber@example.com', 'ACTIVE', '2024-01-01'), (8, 'Henry', 'Meier', 'Zurich', 'henry.meier@example.com', 'ACTIVE', '2024-01-01'), (9, 'Irene', 'Kunz', 'Bern', 'irene.kunz@example.com', 'ACTIVE', '2024-01-01'), (10, 'Jonas', 'Baum', 'StGallen', 'jonas.baum@example.com', 'ACTIVE', '2024-01-01')]


#### Batch 2 – First changes (2024-01-05)
 * Alice moves to Bern
 * Clara changes email
 * Frank moves to Zurich

In [5]:
batch_2_chg = [
    (1,  "Alice",  "Meyer",   "Bern",  "alice.meyer@example.com", "ACTIVE"),
    (3, "Clara", "Schmid", "Basel",  "clara.schmid@newmail.com", "ACTIVE"),
    (6, "Frank", "Weber",  "Zurich", "frank.weber@example.com", "ACTIVE"),
]
batch_2_load_date = "2024-01-05"
batch_2 = create_full_set(batch_1, batch_2_chg, batch_2_load_date)
print(batch_2)

[(1, 'Alice', 'Meyer', 'Bern', 'alice.meyer@example.com', 'ACTIVE', '2024-01-05'), (2, 'Bob', 'Keller', 'Bern', 'bob.keller@example.com', 'ACTIVE', '2024-01-05'), (3, 'Clara', 'Schmid', 'Basel', 'clara.schmid@newmail.com', 'ACTIVE', '2024-01-05'), (4, 'David', 'Fuchs', 'Zurich', 'david.fuchs@example.com', 'ACTIVE', '2024-01-05'), (5, 'Eva', 'Brunner', 'Lucerne', 'eva.brunner@example.com', 'ACTIVE', '2024-01-05'), (6, 'Frank', 'Weber', 'Zurich', 'frank.weber@example.com', 'ACTIVE', '2024-01-05'), (7, 'Grace', 'Huber', 'Basel', 'grace.huber@example.com', 'ACTIVE', '2024-01-05'), (8, 'Henry', 'Meier', 'Zurich', 'henry.meier@example.com', 'ACTIVE', '2024-01-05'), (9, 'Irene', 'Kunz', 'Bern', 'irene.kunz@example.com', 'ACTIVE', '2024-01-05'), (10, 'Jonas', 'Baum', 'StGallen', 'jonas.baum@example.com', 'ACTIVE', '2024-01-05')]


#### Batch 3 – New joiners + no-op (2024-01-10)
 * Eva is unchanged
 * Kevin and Luara are new joiners

In [6]:
batch_3_chg = [
    (5,  "Eva",   "Brunner", "Lucerne","eva.brunner@example.com", "ACTIVE"),
    (11, "Kevin", "Loosli",  "Bern",  "kevin.loosli@example.com", "ACTIVE"),
    (12, "Laura", "Graf",    "Basel", "laura.graf@example.com", "ACTIVE"),
]
batch_3_load_date = "2024-01-10"
batch_3 = create_full_set(batch_2, batch_3_chg, batch_3_load_date)
print (batch_3)

[(1, 'Alice', 'Meyer', 'Bern', 'alice.meyer@example.com', 'ACTIVE', '2024-01-10'), (2, 'Bob', 'Keller', 'Bern', 'bob.keller@example.com', 'ACTIVE', '2024-01-10'), (3, 'Clara', 'Schmid', 'Basel', 'clara.schmid@newmail.com', 'ACTIVE', '2024-01-10'), (4, 'David', 'Fuchs', 'Zurich', 'david.fuchs@example.com', 'ACTIVE', '2024-01-10'), (5, 'Eva', 'Brunner', 'Lucerne', 'eva.brunner@example.com', 'ACTIVE', '2024-01-10'), (6, 'Frank', 'Weber', 'Zurich', 'frank.weber@example.com', 'ACTIVE', '2024-01-10'), (7, 'Grace', 'Huber', 'Basel', 'grace.huber@example.com', 'ACTIVE', '2024-01-10'), (8, 'Henry', 'Meier', 'Zurich', 'henry.meier@example.com', 'ACTIVE', '2024-01-10'), (9, 'Irene', 'Kunz', 'Bern', 'irene.kunz@example.com', 'ACTIVE', '2024-01-10'), (10, 'Jonas', 'Baum', 'StGallen', 'jonas.baum@example.com', 'ACTIVE', '2024-01-10'), (11, 'Kevin', 'Loosli', 'Bern', 'kevin.loosli@example.com', 'ACTIVE', '2024-01-10'), (12, 'Laura', 'Graf', 'Basel', 'laura.graf@example.com', 'ACTIVE', '2024-01-10')]


#### Batch 4 – Multi-field changes (2024-01-15)
 * Bob changes city + email
 * Grace moves to Bern 
 * Henry changes email

In [7]:
batch_4_chg = [
    (2, "Bob",   "Keller", "Zurich", "bob.keller@corp.com", "ACTIVE"),
    (7, "Grace", "Huber",  "Bern",   "grace.huber@example.com", "ACTIVE"),
    (8, "Henry", "Meier",  "Zurich", "henry.meier@newmail.com", "ACTIVE"),
]
batch_4_load_date = "2024-01-15"
batch_4 = create_full_set(batch_3, batch_4_chg, batch_4_load_date)
print(batch_4)

[(1, 'Alice', 'Meyer', 'Bern', 'alice.meyer@example.com', 'ACTIVE', '2024-01-15'), (2, 'Bob', 'Keller', 'Zurich', 'bob.keller@corp.com', 'ACTIVE', '2024-01-15'), (3, 'Clara', 'Schmid', 'Basel', 'clara.schmid@newmail.com', 'ACTIVE', '2024-01-15'), (4, 'David', 'Fuchs', 'Zurich', 'david.fuchs@example.com', 'ACTIVE', '2024-01-15'), (5, 'Eva', 'Brunner', 'Lucerne', 'eva.brunner@example.com', 'ACTIVE', '2024-01-15'), (6, 'Frank', 'Weber', 'Zurich', 'frank.weber@example.com', 'ACTIVE', '2024-01-15'), (7, 'Grace', 'Huber', 'Bern', 'grace.huber@example.com', 'ACTIVE', '2024-01-15'), (8, 'Henry', 'Meier', 'Zurich', 'henry.meier@newmail.com', 'ACTIVE', '2024-01-15'), (9, 'Irene', 'Kunz', 'Bern', 'irene.kunz@example.com', 'ACTIVE', '2024-01-15'), (10, 'Jonas', 'Baum', 'StGallen', 'jonas.baum@example.com', 'ACTIVE', '2024-01-15'), (11, 'Kevin', 'Loosli', 'Bern', 'kevin.loosli@example.com', 'ACTIVE', '2024-01-15'), (12, 'Laura', 'Graf', 'Basel', 'laura.graf@example.com', 'ACTIVE', '2024-01-15')]


##### Batch 5 – Moves (2024-01-20)
 * David moves to Bern
 * Irene moves to Zurich

In [8]:
batch_5_chg = [
    (4, "David", "Fuchs", "Bern",   "david.fuchs@example.com", "ACTIVE"),
    (9, "Irene", "Kunz",  "Zurich", "irene.kunz@example.com", "ACTIVE"),
]
batch_5_load_date = "2024-01-20"
batch_5 = create_full_set(batch_4, batch_5_chg, batch_5_load_date)

#### Batch 6 – Move back (2024-01-25)
 * David moves back to Zurich
 * Irene moves back to Bern

In [9]:
batch_6_chg = [
    (4, "David", "Fuchs", "Zurich", "david.fuchs@example.com", "ACTIVE"),
    (9, "Irene", "Kunz",  "Bern",   "irene.kunz@example.com", "ACTIVE"),
]
batch_6_load_date = "2024-01-25"
batch_6 = create_full_set(batch_5, batch_6_chg, batch_6_load_date)

#### Batch 7 – Corrections & casing (2024-02-01)
 * Jonas Correction
 * Kevin eamil change
 * laura frist_name change to lowercase

In [10]:
batch_7_chg = [
    (10, "Jonas", "Bäum",  "StGallen", "jonas.baum@example.com", "ACTIVE"),
    (11, "Kevin", "Loosli","Bern",     "kevin.loosli@company.com", "ACTIVE"),
    (12, "laura", "Graf",  "Basel",    "laura.graf@example.com", "ACTIVE"),
]
batch_7_load_date = "2024-02-01"
batch_7 = create_full_set(batch_6, batch_7_chg, batch_7_load_date)

#### Batch 8 – Disappearance from source (2024-02-10)
 * Grace deleted


In [18]:
batch_8_chg = [
    (7,  "DEL",  "Huber",   "Bern", "grace.huber@example.com", "ACTIVE"),
]
batch_8_load_date = "2024-02-10"
batch_8 = create_full_set(batch_7, batch_8_chg, batch_8_load_date)
print (batch_8)

[(1, 'Alice', 'Meyer', 'Bern', 'alice.meyer@example.com', 'ACTIVE', '2024-02-10'), (2, 'Bob', 'Keller', 'Zurich', 'bob.keller@corp.com', 'ACTIVE', '2024-02-10'), (3, 'Clara', 'Schmid', 'Basel', 'clara.schmid@newmail.com', 'ACTIVE', '2024-02-10'), (4, 'David', 'Fuchs', 'Zurich', 'david.fuchs@example.com', 'ACTIVE', '2024-02-10'), (5, 'Eva', 'Brunner', 'Lucerne', 'eva.brunner@example.com', 'ACTIVE', '2024-02-10'), (6, 'Frank', 'Weber', 'Zurich', 'frank.weber@example.com', 'ACTIVE', '2024-02-10'), (8, 'Henry', 'Meier', 'Zurich', 'henry.meier@newmail.com', 'ACTIVE', '2024-02-10'), (9, 'Irene', 'Kunz', 'Bern', 'irene.kunz@example.com', 'ACTIVE', '2024-02-10'), (10, 'Jonas', 'Bäum', 'StGallen', 'jonas.baum@example.com', 'ACTIVE', '2024-02-10'), (11, 'Kevin', 'Loosli', 'Bern', 'kevin.loosli@company.com', 'ACTIVE', '2024-02-10'), (12, 'laura', 'Graf', 'Basel', 'laura.graf@example.com', 'ACTIVE', '2024-02-10')]


#### Batch 9 – Reappearance + new hire (2024-02-20)
  * rehire Grace 
  * add Markus

In [12]:
batch_9_chg = [
    (7,  "Grace",  "Huber",   "Bern",    "grace.huber@example.com", "ACTIVE"),
    (13, "Markus", "Steiner","Lucerne", "markus.steiner@example.com", "ACTIVE"),
]
batch_9_load_date = "2024-02-20"
batch_9 = create_full_set(batch_8, batch_9_chg, batch_9_load_date)

In [13]:
from pyspark.sql.types import *

schema = StructType([
    StructField("person_id", IntegerType(), False),
    StructField("first_name", StringType(), False),
    StructField("last_name", StringType(), False),
    StructField("city", StringType(), False),
    StructField("email", StringType(), False),
    StructField("status", StringType(), False),
    StructField("load_date", DateType(), False),
])


In [14]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

def replay(batch):
    df = spark.createDataFrame(batch)
    df = df.toDF(*[f.name for f in schema])

    df = df.withColumn("load_date", col("load_date").cast("date"))
    return df    


In [15]:
def apply_scd2(df):
    df.writeTo("raw_person").append()    


In [16]:
for batch in [batch_1, batch_2, batch_3, batch_4, batch_5, batch_6, batch_7, batch_8, batch_9]:
    df = replay(batch)
    apply_scd2(df)   # your

In [17]:
def format_sql(load_date: str):
    stmt = f"""
    WITH changed_records AS (
        SELECT 
            src.*,
            CASE 
                WHEN tgt.person_id IS NULL THEN 'NEW'
                WHEN src.load_date > tgt.source_loaded_at THEN 'CHANGED'
                ELSE 'UNCHANGED'
            END AS change_classification
        FROM raw_person src
        LEFT JOIN (
            SELECT 
                person_id,
                source_loaded_at,
                effective_start_date
            FROM dim_person 
            WHERE is_current_version = true
        ) tgt ON src.person_id = tgt.person_id
        WHERE src.load_date = CAST('{load_date}' as date)
    ),
    records_to_process AS (
        SELECT *
        FROM changed_records
        WHERE change_classification IN ('NEW', 'CHANGED')
    ),
    prepared_source AS (
        -- Original records for matching existing rows (updates)
        SELECT 
            person_id AS merge_key,  -- Used for matching
            person_id,
            first_name,
            last_name,
            city,
            email,
            load_date,
            status,
            'UPDATE_EXISTING' AS operation_type
        FROM records_to_process
        
        UNION ALL
        
        -- Duplicate records with NULL key for insertions
        SELECT 
            NULL AS merge_key,     -- NULL prevents matching, forces insert
            person_id,
            first_name,
            last_name,
            city,
            email,
            load_date,
            status,
            'INSERT_NEW_VERSION' AS operation_type
        FROM records_to_process
        WHERE change_classification = 'CHANGED'  -- Only for updates, not new records
    )
    
    MERGE INTO dim_person target
    USING prepared_source source
    ON target.person_id = source.merge_key 
       AND target.is_current_version = true
    -- Close existing current records for updated entities
    WHEN MATCHED 
        AND source.operation_type = 'UPDATE_EXISTING'
        AND source.load_date > target.source_loaded_at
    THEN UPDATE SET
        effective_end_date = source.load_date,
        is_current_version = false,
        change_type = 'SUPERSEDED',
        pipeline_processed_at = current_timestamp()
    -- Insert new records (both new entities and new versions)
    WHEN NOT MATCHED 
    THEN INSERT (
        person_id,
        first_name,
        last_name,
        city,
        email,
        effective_start_date,
        effective_end_date,
        is_current_version,
        is_active,
        source_loaded_at,
        pipeline_processed_at,
        change_type,
        record_hash
    ) VALUES (
        source.person_id,
        source.first_name,
        source.last_name,
        source.city,
        source.email,
        source.load_date,
        CAST('9999-12-31 23:59:59' AS TIMESTAMP),  -- Far future date
        true,
        CASE WHEN source.status = 'ACTIVE' THEN true ELSE false END,
        source.load_date,
        current_timestamp(),
        CASE 
            WHEN source.operation_type = 'UPDATE_EXISTING' THEN 'NEW'
            ELSE 'NEW_VERSION'
        END,
        sha2(concat_ws('|', 
            cast(source.person_id as string),
            source.first_name,
            source.last_name,
            source.city,
            source.email
        ), 256)
    )
    -- Handle soft deletes for records no longer in source
    WHEN NOT MATCHED BY SOURCE 
        AND target.is_current_version = true 
        AND target.is_active = true
    THEN UPDATE SET
        is_active = false,
        change_type = 'DELETED',
        pipeline_processed_at = current_timestamp();
    """
    return stmt

In [18]:
stmt = format_sql(batch_1_load_date)
print(stmt)
spark.sql(stmt)


    WITH changed_records AS (
        SELECT 
            src.*,
            CASE 
                WHEN tgt.person_id IS NULL THEN 'NEW'
                WHEN src.load_date > tgt.source_loaded_at THEN 'CHANGED'
                ELSE 'UNCHANGED'
            END AS change_classification
        FROM raw_person src
        LEFT JOIN (
            SELECT 
                person_id,
                source_loaded_at,
                effective_start_date
            FROM dim_person 
            WHERE is_current_version = true
        ) tgt ON src.person_id = tgt.person_id
        WHERE src.load_date = CAST('2024-01-01' as date)
    ),
    records_to_process AS (
        SELECT *
        FROM changed_records
        WHERE change_classification IN ('NEW', 'CHANGED')
    ),
    prepared_source AS (
        -- Original records for matching existing rows (updates)
        SELECT 
            person_id AS merge_key,  -- Used for matching
            person_id,
            first_name,
            las

DataFrame[]