# DLT pipeline for training data

This Delta Live Tables (DLT) definition is executed using a pipeline defined in resources/customer_segmentation_dlt.yml. It contains the DLT Steps for creating valid training data

In [0]:
import dlt
from pyspark.sql.functions import col, when, current_timestamp
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

from dlt_utils import load_schema_from_yaml


In [0]:
# Retrieve settings from the Spark configuration (set in the YAML)
schema_path = spark.conf.get("bronze_schema_path")
bronze_schema_name = spark.conf.get("bronze_schema_name")
input_path = spark.conf.get("cloudFiles.inputPath")
file_format = spark.conf.get("cloudFiles.format")
max_files_per_trigger = spark.conf.get("cloudFiles.maxFilesPerTrigger")
header_option = spark.conf.get("cloudFiles.header")

In [0]:
raw_data_schema = load_schema_from_yaml(schema_path, bronze_schema_name)

In [0]:

# Bronze Table: Raw data ingestion from DBFS using Auto Loader

@dlt.table
def bronze_training_customer_data():
    return (
        spark.readStream.format("cloudFiles")  # Use Auto Loader
        .option("cloudFiles.format", file_format)
        .option("cloudFiles.header", header_option)  # Ensure header is recognized
        .option("cloudFiles.maxFilesPerTrigger", max_files_per_trigger)
        .schema(raw_data_schema)
        .load(input_path)  # Path and other options are configured in the YAML
        .withColumn("ingest_timestamp", current_timestamp())  # Add ingest timestamp
    )




In [0]:
# Silver Table: Cleaned and transformed data
@dlt.table(
    comment = "silver table with valid customer data for training"
)
@dlt.expect_all_or_drop({
    "valid_id": "id IS NOT NULL",  # Ensure ID is not null
    "valid_segmentation": "segmentation IS NOT NULL AND segmentation IS IN ('A', 'B', 'C', 'D')"  # Ensure Segmentation (target) is not null
    "non_negative_age": "age >= 0",
    "valid_age": "age <= 120","
    "valid_family_size": "family_size >= 0 AND family_size <= 15",
    "valid_work_experience": "work_experience >= 0 AND work_experience <= 50"
})
def silver_training_customer_data():
    bronze_df = dlt.read("bronze_training_customer_data")
    # Standardize column names (lowercase, underscores instead of spaces)
    df = bronze_df.toDF(*[col.lower().replace(' ', '_') for col in bronze_df.columns])

    # Convert Age and Family Size to integers, Work Experience to float
    df = (df.withColumn("age", col("age").cast("int"))
          .withColumn("family_size", col("family_size").cast("int"))
          .withColumn("work_experience", col("work_experience").cast("float"))
         )
    return df



In [0]:
# Define categorical columns for mode imputation
categorical_columns_custom = ["ever_married", "graduated", "gender", "spending_score", 'profession', 'var_1']

# Define numerical columns for median imputation
numerical_columns = ['age', 'family_size', 'work_experience'] 

# Define the columns for encoding
categorical_columns_onehot = ['profession', 'var_1']
categorical_columns_ordinal = ['spending_score']
categorical_columns_custom = ["ever_married", "graduated", "gender"]

# Custom mapping for categorical columns
custom_mapping = [
    ({'Yes': 1, 'No': 0}, ["ever_married", "graduated"]),
    ({'Male': 1, 'Female': 0}, ["gender"]),
    ({'Low': 0, 'Average': 1, 'High': 2}, ["spending_score"])
]
custom_mapping_target = {"A":1, "B":2, "C":3, "D":4}

In [0]:
# Function to generate a DataFrame for each mapping
def create_lookup_df(mapping, column_name):
    lookup_list = [(k, v) for k, v in mapping.items()]
    return spark.createDataFrame(lookup_list, schema=["string_value", "numeric_value"]).withColumn("column", F.lit(column_name))

@dlt.table
def gold_mapping_lookup():
    # Create an empty DataFrame to store all lookup mappings
    lookup_df = None
    
    # Iterate through custom mappings and create a DataFrame for each, then union them
    for mapping, columns in custom_mapping:
        for column in columns:
            if lookup_df is None:
                lookup_df = create_lookup_df(mapping, column)
            else:
                lookup_df = lookup_df.union(create_lookup_df(mapping, column))
    
    return lookup_df

In [0]:
# Gold Table: Business-ready or aggregated data
@dlt.table
def gold_customer_features():
    silver_df = dlt.read("silver_training_customer_data")
    gold_df = silver_df.drop("id", "segmentation","inserted_at")
    # custom imputation for missing values of profession and ever_married
    gold_df.fillna("Other", subset=["profession"]).fillna("No", subset=["ever_married"])
    
    # encoding of categorical variables
    # Custom encoding for Ever_Married, Graduated, Gender, and Spending_Score
    for mapping, columns in custom_mapping:
        for column in columns:
            gold_df = gold_df.withColumn(column, 
                F.when(F.col(column).isin(mapping.keys()), 
                       F.create_map([F.lit(k), F.lit(v) for k, v in mapping.items()])[F.col(column)]
                      ).otherwise(F.lit(None)))  # Handle invalid values by setting to None
    # Mode Imputation for all categorical columns
    for column in categorical_columns_custom:
        mode_value = df.groupBy(column).count().orderBy(F.desc("count")).first()[0]  # Calculate mode
        df = df.withColumn(column, F.when(F.col(column).isNull(), F.lit(mode_value)).otherwise(F.col(column)))
    # Median Imputation for all numerical columns
    for column in numerical_columns:
        median_value = df.approxQuantile(column, [0.5], 0.01)[0]  # Calculate median
        df = df.withColumn(column, F.when(F.col(column).isNull(), F.lit(median_value)).otherwise(F.col(column)))

    return gold_df

@dlt.table
def gold_customer_ml_target():
    gold_df_target = dlt.read("silver_training_customer_data").select("segmentation")
    gold_df_target = gold_df_target.withColumn("segmentation", 
                       F.when(F.col("segmentation") == "A", segmentation_mapping["A"])
                        .when(F.col("segmentation") == "B", segmentation_mapping["B"])
                        .when(F.col("segmentation") == "C", segmentation_mapping["C"])
                        .otherwise(segmentation_mapping["D"])
    )
    return gold_df_target
    