# 03_silver_transform
Purpose: Transform Bronze -> Silver using metadata-driven rules: type casting, deduplication, basic cleaning, and write Silver Delta tables.
Author: Janak
Date: 2025-11-26


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, trim, when
from pyspark.sql.types import *
import json, os
import datetime

spark = SparkSession.builder.getOrCreate()

bronze_base = "/tmp/delta/bronze"
silver_base = "/tmp/delta/silver"
metadata_path = "src/metadata/metadata_schema.json"

# Optional: enable MLflow logging by setting this True and configuring MLflow experiment
USE_MLFLOW = False
if USE_MLFLOW:
    import mlflow
    mlflow.set_experiment("/Users/your.email@example.com/intelligent-etl-metadata-platform")


In [0]:
with open(metadata_path, "r") as f:
    metadata = json.load(f)

print("Tables in metadata:", list(metadata.keys()))


In [0]:
def cast_column_safe(df, col_name, dtype_str):
    # map our simple type strings to Spark types (used by cast)
    mapping = {
        "string": "string",
        "timestamp": "timestamp",
        "date": "date",
        "double": "double",
        "float": "float",
        "int": "int",
        "long": "long",
        "boolean": "boolean"
    }
    tgt = mapping.get(dtype_str.lower(), "string")
    # avoid failing cast exceptions by using when/otherwise for common cases
    # simple cast:
    return df.withColumn(col_name, col(col_name).cast(tgt))
    
def apply_schema_casts(df, schema_dict):
    for cname, dtype in schema_dict.items():
        if cname in df.columns:
            df = cast_column_safe(df, cname, dtype)
        else:
            # create null column with required name & type
            df = df.withColumn(cname, col(lit(None)).cast(dtype if dtype in ["string","timestamp","date","double","int","float","long","boolean"] else "string"))
    return df


In [0]:
def basic_cleaning(df, schema_dict):
    # trim string columns, lowercase maybe for some columns if needed (not forced)
    for cname, dtype in schema_dict.items():
        if cname in df.columns and dtype.lower() == "string":
            df = df.withColumn(cname, trim(col(cname)))
    return df

def deduplicate(df, pk_candidates):
    # pk_candidates: list of columns to dedupe on; fallback to full row dedupe
    if pk_candidates:
        existing_cols = [c for c in pk_candidates if c in df.columns]
        if existing_cols:
            return df.dropDuplicates(existing_cols)
    return df.dropDuplicates()


In [0]:
run_summary = []  # collect metrics for each table

for table, meta in metadata.items():
    print(f"\n=== Processing table: {table} ===")
    bronze_path = f"{bronze_base}/{table}"
    silver_path = f"{silver_base}/{table}"
    
    # load bronze
    try:
        df = spark.read.format("delta").load(bronze_path)
    except Exception as e:
        print(f"Skip {table}: cannot load bronze at {bronze_path} -> {e}")
        continue
    
    before_count = df.count()
    print(f"Bronze rows: {before_count}, columns: {len(df.columns)}")
    
    # 1) Apply casts based on metadata.schema
    schema_def = meta.get("schema", {})
    df_casted = apply_schema_casts(df, schema_def)
    
    # 2) Basic cleaning: trim strings, standardize date string->timestamp if needed
    df_clean = basic_cleaning(df_casted, schema_def)
    
    # 3) Deduplicate: use primary key candidates if present in metadata (we use not_null as proxy)
    pk_candidates = meta.get("quality_rules", {}).get("not_null", [])
    df_dedup = deduplicate(df_clean, pk_candidates)
    
    # 4) Handle obvious issues: convert negative numeric to null for positive-only columns
    pos_cols = meta.get("quality_rules", {}).get("positive_values", [])
    for pc in pos_cols:
        if pc in df_dedup.columns:
            df_dedup = df_dedup.withColumn(pc, when(col(pc) < 0, None).otherwise(col(pc)))
    
    # 5) Partitioning hint: if table has a timestamp/date column called order_date or stock_date -> partition by date
    partition_col = None
    for cand in ["order_date", "stock_date", "signup_date", "payment_date"]:
        if cand in df_dedup.columns:
            # ensure date partition column exists as date (create if timestamp)
            if dict(df_dedup.dtypes).get(cand, "").startswith("timestamp"):
                df_dedup = df_dedup.withColumn(cand + "_date", to_date(col(cand)))
                partition_col = cand + "_date"
            elif dict(df_dedup.dtypes).get(cand, "").startswith("date"):
                partition_col = cand
            break
    
    # 6) Write Silver Delta
    write_mode = "overwrite"
    print(f"Writing Silver to {silver_path} (partition: {partition_col})")
    if partition_col:
        df_dedup.write.format("delta").mode(write_mode).partitionBy(partition_col).save(silver_path)
    else:
        df_dedup.write.format("delta").mode(write_mode).save(silver_path)
    
    after_count = spark.read.format("delta").load(silver_path).count()
    print(f"Silver rows: {after_count}")
    
    # register temp view for downstream notebooks
    spark.read.format("delta").load(silver_path).createOrReplaceTempView(f"{table}_silver")
    print(f"Registered temp view: {table}_silver")
    
    # collect summary
    run_summary.append({
        "table": table,
        "bronze_rows": before_count,
        "silver_rows": after_count,
        "partition_col": partition_col
    })
    
    # Optional MLflow logging
    if USE_MLFLOW:
        mlflow.log_metric(f"{table}_bronze_rows", before_count)
        mlflow.log_metric(f"{table}_silver_rows", after_count)


In [0]:
from pyspark.sql import Row
summary_rows = [Row(**s) for s in run_summary]
if summary_rows:
    summary_df = spark.createDataFrame(summary_rows)
    display(summary_df)
else:
    print("No tables processed (check bronze paths).")


In [0]:
# Re-use metadata validation quick checks for Silver layer (nulls & enum mismatches)
def quick_qc_on_silver(table, meta):
    silver_path = f"{silver_base}/{table}"
    df = spark.read.format("delta").load(silver_path)
    results = {}
    rules = meta.get("quality_rules", {})
    if "not_null" in rules:
        nr = {c: df.filter(df[c].isNull()).count() for c in rules["not_null"] if c in df.columns}
        results["not_null"] = nr
    if "accepted_values" in rules:
        ev = {}
        for col_name, allowed in rules["accepted_values"].items():
            if col_name in df.columns:
                invalid = df.filter(~col(col_name).isin(allowed)).count()
                ev[col_name] = invalid
        results["accepted_values"] = ev
    return results

qc_results = {}
for table, meta in metadata.items():
    try:
        qc_results[table] = quick_qc_on_silver(table, meta)
    except Exception as e:
        qc_results[table] = {"error": str(e)}

import pprint
pprint.pprint(qc_results)


In [0]:
# Save a tiny lineage/manifest describing what we wrote (simple JSON inside repo)
manifest = {
    "project": "intelligent-etl-metadata-platform",
    "run_id": datetime.datetime.now().isoformat(),
    "tables": run_summary
}
manifest_path = "/dbfs/tmp/intelligent_etl_manifest.json"
with open(manifest_path, "w") as f:
    json.dump(manifest, f, indent=2)
print("Manifest written to:", manifest_path)
