# Silver Layer - SCD2 Data Transformation

This notebook transforms bronze layer data into SCD2-compliant silver layer data.

## Features:
- **SCD2 for all dimensions**: Surrogate keys, effective dates, is_current
- **Config-driven**: Uses `config/ingestion_config.json`
- **Control tables**: Audit log, watermark, error records
- **Idempotent, production-ready**

## SCD2 Implementation:
- Business keys for change detection
- Effective start/end dates
- is_current flag
- Surrogate keys for dimensions

In [None]:
# Imports
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
import os, json, uuid, datetime

spark = SparkSession.builder \
    .appName("Silver Layer - SCD2") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

def now(): return datetime.datetime.now().isoformat()

In [None]:
# Load config
with open('../config/ingestion_config.json') as f:
    config = json.load(f)
silver_cfg = config['silver_layer']
gold_cfg = config['gold_layer']
control_cfg = config['control_tables']

BRONZE_DATA_PATH = silver_cfg['bronze_data_path']
SILVER_DATA_PATH = silver_cfg['silver_data_path']
os.makedirs(SILVER_DATA_PATH, exist_ok=True)

def log_audit(table, layer, status, row_count, error=None):
    run_id = str(uuid.uuid4())
    try:
        with open(control_cfg['audit_log'], 'a') as f:
            f.write(f'{run_id},{table},{layer},{now()},,{status},{row_count},{error or ""}\n')
        return run_id
    except Exception as e:
        print(f"Warning: Could not log audit: {e}")
        return None

def log_error(run_id, table, layer, error_type, error_message, record):
    try:
        with open(control_cfg['error_records'], 'a') as f:
            f.write(f'{run_id},{table},{layer},{error_type},{error_message},{json.dumps(record)},{now()}\n')
    except Exception as e:
        print(f"Warning: Could not log error: {e}")

def update_watermark(table, layer, key, date):
    try:
        with open(control_cfg['watermark'], 'a') as f:
            f.write(f'{table},{layer},{key},{date}\n')
    except Exception as e:
        print(f"Warning: Could not update watermark: {e}")

In [None]:
# SCD2 Upsert Function
def scd2_upsert(df_new, dim_name, dim_cfg):
    """Perform SCD2 upsert for dimension tables"""
    print(f"🔄 Processing SCD2 for {dim_name}...")
    
    # Load existing dimension if exists
    dim_path = f'{SILVER_DATA_PATH}/{dim_name}'
    try:
        df_existing = spark.read.format('delta').load(dim_path)
        print(f"📖 Loaded existing dimension: {df_existing.count()} rows")
    except:
        df_existing = spark.createDataFrame([], df_new.schema)
        print(f"📝 Creating new dimension")
    
    # SCD2 configuration
    business_key = dim_cfg['business_key']
    scd2_fields = dim_cfg['scd2_fields']
    surrogate_key_col = f'{dim_name}_sk'
    
    # Add SCD2 columns to new data
    df_new = df_new.withColumn('effective_start_date', current_date()) \
                   .withColumn('effective_end_date', lit('9999-12-31')) \
                   .withColumn('is_current', lit(True)) \
                   .withColumn(surrogate_key_col, monotonically_increasing_id())
    
    if df_existing.count() == 0:
        # First time load
        result = df_new
    else:
        # Join on business key, find changes
        join_cond = [df_new[business_key] == df_existing[business_key], df_existing['is_current'] == True]
        df_joined = df_new.join(df_existing, join_cond, 'left_outer')
        
        # Detect changes in SCD2 fields
        change_conditions = []
        for field in scd2_fields:
            if field in df_new.columns and field in df_existing.columns:
                change_conditions.append(f"df_new.{field} != df_existing.{field}")
        
        if change_conditions:
            change_expr = " OR ".join(change_conditions)
            changed = df_joined.filter(change_expr)
            
            # Expire old records
            expired = df_existing.join(changed, business_key, 'inner') \
                .withColumn('effective_end_date', current_date()) \
                .withColumn('is_current', lit(False))
            
            # New records with new surrogate keys
            new_records = changed.withColumn(surrogate_key_col, monotonically_increasing_id())
            
            # Union unchanged, expired, and new
            unchanged = df_existing.join(changed, business_key, 'left_anti')
            result = unchanged.unionByName(expired).unionByName(new_records)
        else:
            # No changes detected
            result = df_existing
    
    # Write to silver
    result.write.mode('overwrite').format('delta').save(dim_path)
    print(f"💾 Written to silver: {dim_path} ({result.count()} rows)")
    return result

In [None]:
# Process all SCD2 dimensions
for dim_name, dim_cfg in gold_cfg['dimensions'].items():
    print(f"\n🔄 Processing {dim_name} (SCD2)...")
    try:
        # Read from bronze
        bronze_table = dim_cfg['source_table']
        df_bronze = spark.read.format('delta').load(f'{BRONZE_DATA_PATH}/{bronze_table}')
        
        # Apply SCD2 transformation
        df_silver = scd2_upsert(df_bronze, dim_name, dim_cfg)
        
        # Log success
        log_audit(dim_name, 'silver', 'SUCCESS', df_silver.count())
        update_watermark(dim_name, 'silver', None, now())
        
        print(f"✅ {dim_name} processed successfully")
        
    except Exception as e:
        error_msg = str(e)
        print(f"❌ Error processing {dim_name}: {error_msg}")
        
        # Log error
        run_id = log_audit(dim_name, 'silver', 'FAIL', 0, error_msg)
        log_error(run_id, dim_name, 'silver', 'SCD2', error_msg, {})
    
    print("-" * 50)

print("\n🎉 Silver layer SCD2 processing completed!")