# ðŸ¥ˆ RetailNova - Notebook 02: Silver Transformation & SCD Type 2

This notebook covers:
1. Silver transformation logic (cleaning, standardisation)
2. SCD Type 2 for customer address changes
3. Data Quality checks against Silver tables
4. PII masking demonstration


In [None]:
import sys
sys.path.insert(0, '/home/jovyan')

from pipelines.spark_session import build_spark_session
spark = build_spark_session('RetailNova-Notebook-Silver')
print('âœ“ Spark ready')

In [None]:
# â”€â”€ Run Silver Transformation â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€
from pipelines.silver_transformation import run_silver_transformation
run_silver_transformation(spark)

In [None]:
# â”€â”€ Inspect Silver Customers (SCD2 columns) â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€
from pipelines.config import storage_config

df_silver_cust = spark.read.format('delta').load(
    storage_config.layer_path('silver', 'customers')
)

print(f'Silver Customers count: {df_silver_cust.count()}')

# Show SCD2 columns if they exist
scd2_cols = ['customer_id', 'email', 'city', 'loyalty_tier',
             'effective_start_date', 'effective_end_date', 'is_current']
available = [c for c in scd2_cols if c in df_silver_cust.columns]
df_silver_cust.select(available).show(10, truncate=False)

In [None]:
# â”€â”€ Simulate an SCD2 Change: Customer changes address â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€
# In production: this happens automatically via CDC from SQL Server
# Here we simulate it manually

import psycopg2
from pipelines.config import source_config
import pyodbc  # or just show the SQL that would run

print('In production, a customer address update would look like:')
print('''
UPDATE dbo.customers
SET address_line1 = 'Syntagma 99',
    city          = 'Piraeus',
    last_modified  = GETDATE()
WHERE customer_id = 1;
''')
print('When the pipeline re-runs:')
print('  1. Bronze: new record appended (watermark > previous run)')
print('  2. Silver: SCD2 MERGE detects address_line1 change')
print('  3. Old row: effective_end_date = NOW(), is_current = FALSE')
print('  4. New row: inserted with is_current = TRUE')

In [None]:
# â”€â”€ Silver Products: check margin calculation â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€
df_products = spark.read.format('delta').load(
    storage_config.layer_path('silver', 'products')
)

from pyspark.sql import functions as F

print('Product margin analysis:')
df_products.select(
    'product_name', 'category', 'unit_price', 'cost_price', 'margin_pct'
).orderBy(F.col('margin_pct').desc()).show(10, truncate=False)

In [None]:
# â”€â”€ Silver Orders: check suspicious flag â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€
df_orders = spark.read.format('delta').load(
    storage_config.layer_path('silver', 'sales_orders')
)

print(f'Total orders in Silver: {df_orders.count()}')
if 'is_suspicious' in df_orders.columns:
    suspicious = df_orders.filter(F.col('is_suspicious') == True)
    print(f'Suspicious orders detected: {suspicious.count()}')
    suspicious.select('order_id','order_number','status','total_amount').show()

In [None]:
# â”€â”€ Run Data Quality Checks â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€
from quality_framework.dq_engine import run_quality_checks

results = run_quality_checks(
    spark,
    layer='silver',
    pipeline_name='notebook_quality_check',
    raise_on_critical=False
)

print(f'\nTotal rules checked: {len(results)}')
print(f'PASS:    {sum(1 for r in results if r.status == "PASS")}')
print(f'WARNING: {sum(1 for r in results if r.status == "WARNING")}')
print(f'FAIL:    {sum(1 for r in results if r.status == "FAIL")}')

In [None]:
# â”€â”€ Inject corrupt data and re-run quality (negative test demo) â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€
from quality_framework.dq_engine import inject_corrupt_data

print('Injecting 3 corrupt customer records...')
inject_corrupt_data(spark, 'customers', 'notebook-test-001')

print('Re-running quality checks (expect failures)...')
results_after = run_quality_checks(
    spark,
    layer='silver',
    pipeline_name='notebook_corrupt_test',
    raise_on_critical=False
)

failures = [r for r in results_after if r.status == 'FAIL']
print(f'\nCritical failures detected: {len(failures)}')
for f in failures:
    print(f'  âœ— {f.rule_name}: {f.pass_rate_pct:.1f}%')