In [0]:
from pyspark.sql import DataFrame, functions as F
from pyspark.sql.types import StringType
from typing import Dict, Optional
from itertools import chain

# ==============================================================================
# 1. HELPER FUNCTIONS (Matches your strict definitions)
# ==============================================================================

def create_map_from_dict(mapping: Dict[str, str]):
    return F.create_map([F.lit(x) for x in chain(*mapping.items())])

def trim_all_string_columns(df: DataFrame) -> DataFrame:
    """Optimized trimming (O(1) overhead)"""
    str_cols = {f.name for f in df.schema.fields if isinstance(f.dataType, StringType)}
    return df.select(
        *[F.trim(F.col(c)).alias(c) if c in str_cols else F.col(c) for c in df.columns]
    )

def standardize_key(df: DataFrame, col: str, new_col: str) -> DataFrame:
    clean_col = F.upper(F.regexp_replace(F.col(col), '[^A-Za-z0-9]', ''))
    return df.withColumn(new_col, clean_col)

def normalize_product_line(df: DataFrame, col: str) -> DataFrame:
    # 1. Externalize the dictionary for readability
    line_rules = {'R': 'Road', 'S': 'Sport', 'M': 'Mountain'}
    map_col = create_map_from_dict(line_rules)
    return df.withColumn(col, F.coalesce(map_col[F.upper(F.col(col))], F.col(col)))

def flag_invalid_numeric(df: DataFrame, col: str, min_val: Optional[int] = None, max_val: Optional[int] = None) -> DataFrame:
    """CRITICAL FIX: Creates a flag ('is_valid_...') instead of filtering."""
    col_val = F.col(col).cast('int')
    is_valid = col_val.isNotNull()
    
    if min_val is not None:
        is_valid = is_valid & (col_val >= min_val)
    if max_val is not None:
        is_valid = is_valid & (col_val <= max_val)
        
    return df.withColumn(f"is_valid_{col}", is_valid)

def flag_invalid_date(df: DataFrame, col: str) -> DataFrame:
    """CRITICAL FIX: Creates a flag instead of filtering."""
    return df.withColumn(f"is_valid_{col}", F.col(col).isNotNull())

In [0]:
from pyspark.sql import DataFrame, functions as F
from pyspark.sql.types import StringType
from typing import Dict
from itertools import chain

# ==============================================================================
# 1. HELPER FUNCTIONS
# ==============================================================================

def create_map_from_dict(mapping: Dict[str, str]):
    """
    Converts a Python Dictionary into a Spark Map column.
    WHY: Decouples business rules (the dict) from the execution logic.
    """
    return F.create_map([F.lit(x) for x in chain(*mapping.items())])

def trim_all_string_columns(df: DataFrame) -> DataFrame:
    """
    Trims whitespace from all string columns in one pass.
    WHY: ' DAG Explosion'. Doing this in a loop kills performance. 
    This list comprehension does it in O(1) planning time.
    """
    str_cols = {f.name for f in df.schema.fields if isinstance(f.dataType, StringType)}
    return df.select(
        *[F.trim(F.col(c)).alias(c) if c in str_cols else F.col(c) for c in df.columns]
    )

def standardize_key(df: DataFrame, col: str, new_col: str) -> DataFrame:
    """
    Cleans keys by uppercasing and removing special characters.
    e.g., "Bike-123" -> "BIKE123"
    """
    clean_col = F.upper(F.regexp_replace(F.col(col), '[^A-Za-z0-9]', ''))
    return df.withColumn(new_col, clean_col)

def normalize_product_line(df: DataFrame, col: str) -> DataFrame:
    """
    Maps codes (R, S, M) to full names (Road, Sport, Mountain).
    """
    line_rules = {'R': 'Road', 'S': 'Sport', 'M': 'Mountain', 'T': 'Touring'}
    map_col = create_map_from_dict(line_rules)
    # Coalesce keeps the original value if the code isn't found in the map
    return df.withColumn(col, F.coalesce(map_col[F.upper(F.col(col))], F.col(col)))

# ==============================================================================
# 2. MAIN PIPELINE
# ==============================================================================

def process_crm_prd_info(bronze_table: str, silver_table: str):
    print(f"Starting processing for {silver_table}...")
    
    # --------------------------------------------------------------------------
    # STEP 1: READ
    # --------------------------------------------------------------------------
    # WHAT: Load the raw bronze data.
    df_bronze = spark.table(bronze_table)
    
    # --------------------------------------------------------------------------
    # STEP 2: TRANSFORM PIPELINE
    # --------------------------------------------------------------------------
    # WHAT: Chain operations cleanly using .transform().
    # WHY:  Avoids intermediate variables (df1, df2, df3) and makes flow readable.
    df_silver = (df_bronze
        # A. Clean Whitespace
        # WHY: " Road " and "Road" should be treated as the same.
        .transform(trim_all_string_columns)
        
        # B. Standardize Key (Part 1: Clean Chars)
        # WHAT: Removes hyphens/underscores. "BIKE-123" -> "BIKE123"
        .transform(lambda df: standardize_key(df, 'prd_key', 'std_prd_key'))
        
        # C. Truncate Key (Part 2: 4-Char Limit)
        # WHAT: Takes first 4 characters. "BIKE123" -> "BIKE"
        # WHY:  Matches the 'std_ID' format in the ERP system for joining.
        .withColumn('std_prd_key', F.col('std_prd_key').substr(1, 4))
        
        # D. Business Logic
        # WHAT: Expand 'R' -> 'Road'.
        .transform(lambda df: normalize_product_line(df, 'prd_line'))
    )
    
    # --------------------------------------------------------------------------
    # STEP 3: PROJECTION (The Contract)
    # --------------------------------------------------------------------------
    # WHAT: Explicitly select and rename columns for the Silver Schema.
    # WHY:  1. Renaming here is cleaner than using 10 .withColumnRenamed calls.
    #       2. Drops all unused columns (like the original dirty 'prd_key').
    df_final = df_silver.select(
        F.col("prd_id").alias("product_id"),
        F.col("prd_key").alias("product_key"),      # Keep original for audit
        F.col("std_prd_key"),                       # The new Join Key (4 chars)
        F.col("prd_nm").alias("product_name"),
        F.col("prd_cost").alias("product_cost"),
        F.col("prd_line").alias("product_line"),
        F.col("prd_start_dt").alias("start_date"),
        F.col("prd_end_dt").alias("end_date")
    )

    # --------------------------------------------------------------------------
    # STEP 4: FILTER (Quality Control)
    # --------------------------------------------------------------------------
    # WHAT: Drop rows where the Key is invalid.
    # WHY:  If the key isn't 4 chars, the Join to Gold will fail or be wrong.
    #       Better to drop it here than pollute the Gold table.
    df_final = df_final.filter(
        F.col("std_prd_key").isNotNull() & (F.length(F.col("std_prd_key")) == 4)
    )

    # --------------------------------------------------------------------------
    # STEP 5: WRITE
    # --------------------------------------------------------------------------
    # WHAT: Overwrite the target table.
    # WHY:  Ensures Silver is an exact clean copy of the latest Bronze data.
    df_final.write.format("delta").mode("overwrite").saveAsTable(silver_table)
    
    print(f"Successfully wrote to {silver_table}")
    display(spark.table(silver_table))

# ==============================================================================
# Execution
# ==============================================================================
process_crm_prd_info('workspace.bronze.crm_prd_info', 'workspace.silver.crm_prd_info')