# 02 Transform – Silver Stage

This notebook performs normalization and type casting on the ingested bronze data.

* Names are cleaned (trimmed and capitalized).
* Dates of birth are parsed into date objects.
* Phone numbers are stripped of non-digit characters.
* Social Security Numbers (SSN) and Driver License Numbers (DLNum) are sanitized.
* Records failing validation rules are routed to a rejects dataset.
* Valid records are written to the Silver Parquet file for further processing.

The transformation rules can be customised via the `schema_overrides` section of the YAML configuration.

In [None]:
import polars as pl
import yaml
import re
from pathlib import Path

# Load configuration
config_path = 'pipeline_config.yaml'
with open(config_path, 'r') as f:
    config = yaml.safe_load(f)

bronze_path = config['paths']['bronze_output_path']
silver_output_path = config['paths']['silver_output_path']
rejects_path = config['paths']['rejects_path']

# Helper functions for normalization
def clean_name(name: str) -> str:
    if name is None:
        return None
    name = name.strip()
    # Collapse multiple spaces and convert to title case
    name = re.sub(r'\s+', ' ', name)
    return name.title()

# Remove non-digit characters from phone numbers
strip_non_digits = lambda s: re.sub(r'\D', '', s) if s is not None else None

# Sanitize SSN: keep only digits
clean_ssn = lambda s: strip_non_digits(s) if s is not None else None

# Sanitize driver licence number: remove whitespace and hyphens
clean_dl = lambda s: re.sub(r'[-\s]', '', s) if s is not None else None

# Read bronze data lazily
bronze_lazy = pl.scan_parquet(bronze_path)

# Apply normalization
normalized = bronze_lazy.with_columns([
    # Example column names; adjust based on your actual data
    pl.col('Name').apply(clean_name).alias('Name'),
    pl.col('DateOfBirth').str.strptime(pl.Date, fmt=None, strict=False).alias('DateOfBirth'),
    pl.col('Phone').apply(strip_non_digits).alias('Phone'),
    pl.col('SSN').apply(clean_ssn).alias('SSN'),
    pl.col('DLNum').apply(clean_dl).alias('DLNum'),
])

# Validation rules
# Valid SSNs are exactly 9 digits when present
valid_ssn = (pl.col('SSN').is_null()) | (pl.col('SSN').str.lengths() == 9)

# Valid dates of birth should not be in the future and be after year 1900
# For unknown DOBs allow null
valid_dob = (pl.col('DateOfBirth').is_null()) | (
    (pl.col('DateOfBirth') >= pl.date(1900, 1, 1)) & (pl.col('DateOfBirth') <= pl.date.today())
)

valid_rows = valid_ssn & valid_dob

silver_df = normalized.filter(valid_rows)
reject_df = normalized.filter(~valid_rows)

print(f"Writing {silver_output_path} (valid records) and {rejects_path} (invalid records)")

# Persist results
silver_df.sink_parquet(silver_output_path)
reject_df.sink_parquet(rejects_path)
