# BUILDING CUSTOMER DIMENSION TABLE

# Import libraries 

In [0]:
import pyspark.sql.functions as F
from delta.tables import DeltaTable
from pyspark.sql.functions import trim, col
from pyspark.sql.types import StringType, IntegerType, DateType
from pyspark.sql.window import Window
import logging

# Logging configuration


In [0]:
import logging

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

logger = logging.getLogger("silver_crm_cust_info")
logger.setLevel(logging.INFO)

# Define Allowed Prefixes CONFIG

Later change:

PREFIX_RULES = {
    "AW000": 10
}

In [0]:
ALLOWED_PREFIX = "AW000"
EXPECTED_LENGTH = 10

# Mapping/rules 

In [0]:
# Rename mapping

RENAME_MAP = {
    "cst_id": "customer_id",
    "cst_key": "customer_key",
    "cst_firstname": "first_name",
    "cst_lastname": "last_name",
    "cst_marital_status": "marital_status",
    "cst_gndr": "gender",
    "cst_create_date": "create_date",
}

# Transfomations / All columns arrive as string

## Trimming columns

In [0]:
def trim_string_columns(df):
    logger.info("Running trim_string_columns")
    for field in df.schema.fields:
        if isinstance(field.dataType, StringType):
            df = df.withColumn(field.name, trim(col(field.name)))
    return df

## Renaming columns

In [0]:
def rename_columns(df):
    logger.info("Running rename_columns")
    logger.info(f"Columns before rename: {df.columns}")

    for old_name, new_name in RENAME_MAP.items():
        if old_name in df.columns:
            df = df.withColumnRenamed(old_name, new_name)

    logger.info(f"Columns after rename: {df.columns}")
    return df

## Casting columns

In [0]:
def cast_columns(df):
    logger.info("Running cast_columns")

    df = df \
        .withColumn("customer_id", col("customer_id").cast(IntegerType())) \
        .withColumn("create_date", col("create_date").cast(DateType()))

    logger.info("Schema after casting:")
    df.printSchema()

    return df

## Stadardization/Normalization
## for columns: 'marital_status' AND 'gender' 

In [0]:
def standardize_categorical(df):
    logger.info("Running standardize_categorical")

    df = df \
        .withColumn(
            "marital_status",
            F.when(F.upper(F.col("marital_status")).isin("S", "SINGLE"), "Single")
             .when(F.upper(F.col("marital_status")).isin("M", "MARRIED"), "Married")
             .otherwise(None)
        ) \
        .withColumn(
            "gender",
            F.when(F.upper(F.col("gender")).isin("M", "MALE", "1"), "Male")
             .when(F.upper(F.col("gender")).isin("F", "FEMALE", "2"), "Female")
             .otherwise(None)
        )

    return df

Focus on the business key and check for duplicates and nulls, if exist, solve it with counting a row through items and keep those who have most items in a row. If there are dups take the most complite item and latest on date.

#Business Key Enforcement


# Validate null function

What this function does:

Validates a business key column.
Defines invalid rows as: NULL and Empty string ""

Splits the dataframe into
df_invalid → rows that fail validation
df_valid → rows that pass validation

Prints counts for verification.
Returns only valid rows for further processing.

In [0]:
def validate_not_null_business_key(df,key_col,quarantine_table):

    # identify invalid rows
    invalid_condition = (
        F.col(key_col).isNull() |
        (F.col(key_col)=="")
    )

    # separate valid rows
    df_invalid = df.filter(invalid_condition)
    df_valid = df.filter(~invalid_condition)

    invalid_exists = df_invalid.limit(1).count() > 0

    # write invalid to quarantine
    if invalid_exists:

        df_invalid = (
            df_invalid
            .withColumn("rejection_reason", F.lit("null_or_empty_business_key"))
            .withColumn("rejection_stage", F.lit("silver_null_validation"))
            .withColumn("processed_timestamp",F.current_timestamp())

        )
        # We get access to the existing table / run operations like MERGE, UPDATE, or DELETE on it.
        delta_table = DeltaTable.forName(spark, quarantine_table)
        
        # idempotency
        
        #Compare the existing quarantine table (target) with the new invalid rows (source), and treat them as the same record if:
        #They have the same business key OR both business keys are NULL And they have the same rejection_stage
        
        (
        delta_table.alias("target")
        .merge(
            df_invalid.alias("source"),
            f"""
            (
            target.{key_col} = source.{key_col}
            OR (target.{key_col} IS NULL AND source.{key_col} IS NULL)
            )
            AND target.rejection_stage = source.rejection_stage
            """
    )
    .whenNotMatchedInsertAll()
    .execute()
)
        

    # return clean rows
    return df_valid

   




#Validate Format


### Configuration & Function Design (README Style)

**Principle:**
Logic stays inside the function.
Business rules stay outside the function.

---

### Why?

* Business rules may change.
* Function should not contain hardcoded values.
* Reusable functions must receive rules as parameters.
* Cleaner and easier to maintain.

---

### Current Business Rule

* Prefix = `AW000`
* Total length = `10`
* Suffix = numeric

---

### Configuration (Outside Function)

Define rules once:

* `ALLOWED_PREFIX = "AW000"`
* `EXPECTED_LENGTH = 10`

---

### Function Responsibility

The function should:

1. Validate prefix
2. Validate length
3. Validate numeric suffix
4. Send invalid rows to quarantine
5. Return only valid rows

---

### If Business Rule Changes

Example: length becomes 12.

You only update:

* `EXPECTED_LENGTH = 12`

No change inside validation logic.

---

### Engineering Principle

* Do not hardcode rules inside function.
* Keep validation strict.
* Keep configuration flexible.
* Refactor only when requirements change.


In [0]:

def validate_business_key_format(
    df,
    key_col,
    allowed_prefix,
    expected_length,
    quarantine_table
):
    logger.info("Running validate_business_key_format")

    # Length of prefix (dynamic, no hardcoding 5)
    prefix_length = len(allowed_prefix)

    # Prefix check
    # .startswith() → checks beginning of string
    prefix_match = F.col(key_col).startswith(allowed_prefix)

    # Length check
    # F.length() → returns number of characters in string
    length_match = F.length(F.col(key_col)) == expected_length

    # Extract numeric suffix
    # substr(start_position, length)
    # Spark uses 1-based indexing
    numeric_part = F.col(key_col).substr(
        prefix_length + 1,
        expected_length - prefix_length
    )

    # Numeric check
    # rlike() → regex pattern match
    numeric_match = numeric_part.rlike("^[0-9]+$")

    # Combine all checks (AND)
    valid_condition = prefix_match & length_match & numeric_match

    # Split dataframe
    df_valid = df.filter(valid_condition)
    df_invalid = df.filter(~valid_condition)

    # Check if invalid rows exist
    invalid_exists = df_invalid.limit(1).count() > 0

    if invalid_exists:

        df_invalid = (
            df_invalid
            .withColumn("rejection_reason", F.lit("invalid_business_key_format"))
            .withColumn("rejection_stage", F.lit("silver_format_validation"))
            .withColumn("processed_timestamp", F.current_timestamp())
        )

        # Access Delta table
        delta_table = DeltaTable.forName(spark, quarantine_table)

        # Idempotent MERGE
        (
            delta_table.alias("target")
            .merge(
                df_invalid.alias("source"),
                f"""
                target.{key_col} = source.{key_col}
                AND target.rejection_stage = source.rejection_stage
                """
            )
            .whenNotMatchedInsertAll()
            .execute()
        )

    

    return df_valid

# Duplicate Resolution

## Responsibility of This Function:

##Detect duplicate keys
##Rank duplicates (based on completeness + date)
##Keep best record
##Quarantine the others
##Return clean dataframe

In [0]:
def resolve_duplicate_business_keys(
    df,
    key_col,
    important_cols,
    quarantine_table
):
    logger.info("Running resolve_duplicate_business_keys")

    # 1️⃣ Create completeness score
    # Count number of non-null important fields
    df = df.withColumn(
        "completeness_score",
        sum(F.col(c).isNotNull().cast("int") for c in important_cols)
    )

    # 2️⃣ Define ranking window
    # Partition by business key
    # Order by:
    #   - completeness DESC
    #   - ingest_timestamp DESC
    #   - customer_id DESC (tie-breaker)
    w = Window.partitionBy(key_col).orderBy(
        F.col("completeness_score").desc(),
        F.col("ingest_timestamp").desc_nulls_last(),
        F.col("customer_id").desc_nulls_last()
    )

    # Apply row_number ranking
    df_ranked = df.withColumn("rn", F.row_number().over(w))

    # 3️⃣ Split winners and duplicates
    df_winners = df_ranked.filter(F.col("rn") == 1)
    df_duplicates = df_ranked.filter(F.col("rn") > 1)

  
    # 4️⃣ Quarantine duplicates (if exist)
  
    duplicates_exist = df_duplicates.limit(1).count() > 0

    if duplicates_exist:

        df_duplicates = (
            df_duplicates
            .drop("completeness_score", "rn")
            .withColumn("rejection_reason", F.lit("duplicate_business_key"))
            .withColumn("rejection_stage", F.lit("silver_duplicate_resolution"))
            .withColumn("processed_timestamp", F.current_timestamp())
        )

        delta_table = DeltaTable.forName(spark, quarantine_table)

        (
            delta_table.alias("target")
            .merge(
                df_duplicates.alias("source"),
                f"""
                target.{key_col} = source.{key_col}
                AND target.rejection_stage = source.rejection_stage
                """
            )
            .whenNotMatchedInsertAll()
            .execute()
        )

    # 5️⃣ Return clean dataframe
    df_clean = df_winners.drop("completeness_score", "rn")

    logger.info("Duplicate resolution completed")

    return df_clean



# Drop metadata columns

In [0]:
def drop_metadata_columns(df, metadata_cols):
    logger.info("Dropping technical metadata columns")

    return df.drop(*metadata_cols)

# Write Silver Table


In [0]:
def write_to_silver(df, target_table):
    logger.info(f"Writing data to Silver table: {target_table}")

    (
        df.write
        .format("delta")
        .mode("overwrite")   # or append depending on design
        .saveAsTable(target_table)
    )

    logger.info("Write to Silver completed")

# MAN FLOW


In [0]:
# Main transformation flow

logger.info("Reading Bronze table: workspace.bronze.crm_cust_info")

df = spark.table("workspace.bronze.crm_cust_info")

df = trim_string_columns(df)
df = rename_columns(df)
df = cast_columns(df)
df = standardize_categorical(df)


df = validate_not_null_business_key(
    df,
    key_col="customer_key",
    quarantine_table="workspace.quarantine.crm_cust_info"
)
logger.info("Validation nulls completed successfully")



df = validate_business_key_format(
    df,
    key_col="customer_key",
    allowed_prefix="AW000",
    expected_length=10,
    quarantine_table="workspace.quarantine.crm_cust_info"
)
logger.info("Validation business_key_format completed successfully")


df = resolve_duplicate_business_keys(
    df,
    key_col="customer_key",
    important_cols=[
        "first_name",
        "last_name",
        "marital_status",
        "gender"
    ],
    quarantine_table="workspace.quarantine.crm_cust_info")


df = final_business_key_validation(
    df,
    key_col="customer_key"
)


df = drop_metadata_columns(
    df,
    metadata_cols=[
        "source",
        "source_file",
        "ingest_timestamp"
    ]
)


write_to_silver(
    df,
    target_table="workspace.silver.crm_customers"
)




In [0]:
%sql
SELECT * FROM workspace.quarantine.crm_cust_info;

In [0]:
%sql
SELECT * from workspace.quarantine.crm_cust_info;


In [0]:
%sql
SELECT * FROM workspace.silver.crm_customers

In [0]:
%sql
select * from workspace.quarantine.crm_cust_info