# Feature Engineering Pipeline - Medallion Architecture

This notebook implements a comprehensive feature engineering pipeline following the Medallion Architecture:
- **Bronze Layer**: Raw data ingestion with metadata
- **Silver Layer**: Data cleaning and standardization
- **Gold Layer**: ML-ready feature store creation

The pipeline processes three data sources:
1. Clickstream features (behavioral data)
2. Customer attributes (demographic data)
3. Financial features (financial health data)

In [1]:
import os
import glob
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pprint
import pyspark
import pyspark.sql.functions as F

from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType

# Import our feature processing modules
import sys
sys.path.append('..')
import utils.feature_processing_bronze_table
import utils.feature_processing_silver_table
import utils.feature_processing_gold_table

In [2]:
import os
import sys

# Ensure we're working from project root
current_dir = os.getcwd()
if current_dir.endswith('notebooks'):
    os.chdir('..')
    print("Adjusted working directory to project root")

Adjusted working directory to project root


## Setup PySpark Session

In [3]:
# Initialize SparkSession
spark = pyspark.sql.SparkSession.builder \
    .appName("feature_engineering") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")

print("Spark session initialized successfully")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/22 08:02:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark session initialized successfully


## Configuration Setup

In [4]:
# Set up configuration - using same date range as label engineering for consistency
start_date_str = "2023-01-01"
end_date_str = "2024-12-01"

# Feature engineering specific parameters
lookback_months = 6  # How many months of history to use for time-aware features

print(f"Feature engineering date range: {start_date_str} to {end_date_str}")
print(f"Lookback period: {lookback_months} months")

Feature engineering date range: 2023-01-01 to 2024-12-01
Lookback period: 6 months


In [5]:
# Generate list of dates to process
def generate_first_of_month_dates(start_date_str, end_date_str):
    """Generate first-of-month dates for processing"""
    # Convert the date strings to datetime objects
    start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
    end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
    
    # List to store the first of month dates
    first_of_month_dates = []

    # Start from the first of the month of the start_date
    current_date = datetime(start_date.year, start_date.month, 1)

    while current_date <= end_date:
        # Append the date in yyyy-mm-dd format
        first_of_month_dates.append(current_date.strftime("%Y-%m-%d"))
        
        # Move to the first of the next month
        if current_date.month == 12:
            current_date = datetime(current_date.year + 1, 1, 1)
        else:
            current_date = datetime(current_date.year, current_date.month + 1, 1)

    return first_of_month_dates

dates_str_lst = generate_first_of_month_dates(start_date_str, end_date_str)
print(f"Processing {len(dates_str_lst)} monthly snapshots")
print(f"Sample dates: {dates_str_lst[:5]}...{dates_str_lst[-2:]}")

Processing 24 monthly snapshots
Sample dates: ['2023-01-01', '2023-02-01', '2023-03-01', '2023-04-01', '2023-05-01']...['2024-11-01', '2024-12-01']


## Bronze Layer - Raw Data Ingestion

The Bronze layer preserves raw data in its original format while adding metadata for lineage tracking.

In [6]:
# Create bronze layer directories
bronze_clickstream_directory = "datamart/bronze/clickstream/"
bronze_attributes_directory = "datamart/bronze/attributes/"
bronze_financials_directory = "datamart/bronze/financials/"

# Create directories if they don't exist
for directory in [bronze_clickstream_directory, bronze_attributes_directory, bronze_financials_directory]:
    if not os.path.exists(directory):
        os.makedirs(directory)
        print(f"Created directory: {directory}")

In [7]:
# Process Bronze Layer - Clickstream Features
print("=== Processing Bronze Layer - Clickstream ===")
for i, date_str in enumerate(dates_str_lst):
    if i % 6 == 0:  # Print progress every 6 months
        print(f"Processing clickstream bronze for {date_str}...")
    
    utils.feature_processing_bronze_table.process_clickstream_bronze_table(
        date_str, bronze_clickstream_directory, spark)

print(f"Clickstream bronze processing completed for {len(dates_str_lst)} snapshots")

=== Processing Bronze Layer - Clickstream ===
Processing clickstream bronze for 2023-01-01...


                                                                                

Clickstream 2023-01-01 row count: 8974
Clickstream bronze saved to: datamart/bronze/clickstream/bronze_clickstream_2023_01_01.csv
Clickstream 2023-02-01 row count: 8974
Clickstream bronze saved to: datamart/bronze/clickstream/bronze_clickstream_2023_02_01.csv
Clickstream 2023-03-01 row count: 8974
Clickstream bronze saved to: datamart/bronze/clickstream/bronze_clickstream_2023_03_01.csv
Clickstream 2023-04-01 row count: 8974
Clickstream bronze saved to: datamart/bronze/clickstream/bronze_clickstream_2023_04_01.csv
Clickstream 2023-05-01 row count: 8974
Clickstream bronze saved to: datamart/bronze/clickstream/bronze_clickstream_2023_05_01.csv
Clickstream 2023-06-01 row count: 8974
Clickstream bronze saved to: datamart/bronze/clickstream/bronze_clickstream_2023_06_01.csv
Processing clickstream bronze for 2023-07-01...
Clickstream 2023-07-01 row count: 8974
Clickstream bronze saved to: datamart/bronze/clickstream/bronze_clickstream_2023_07_01.csv
Clickstream 2023-08-01 row count: 8974
Cli

                                                                                

Clickstream 2024-06-01 row count: 8974
Clickstream bronze saved to: datamart/bronze/clickstream/bronze_clickstream_2024_06_01.csv
Processing clickstream bronze for 2024-07-01...
Clickstream 2024-07-01 row count: 8974
Clickstream bronze saved to: datamart/bronze/clickstream/bronze_clickstream_2024_07_01.csv
Clickstream 2024-08-01 row count: 8974
Clickstream bronze saved to: datamart/bronze/clickstream/bronze_clickstream_2024_08_01.csv
Clickstream 2024-09-01 row count: 8974
Clickstream bronze saved to: datamart/bronze/clickstream/bronze_clickstream_2024_09_01.csv
Clickstream 2024-10-01 row count: 8974
Clickstream bronze saved to: datamart/bronze/clickstream/bronze_clickstream_2024_10_01.csv
Clickstream 2024-11-01 row count: 8974
Clickstream bronze saved to: datamart/bronze/clickstream/bronze_clickstream_2024_11_01.csv
Clickstream 2024-12-01 row count: 8974
Clickstream bronze saved to: datamart/bronze/clickstream/bronze_clickstream_2024_12_01.csv
Clickstream bronze processing completed fo

In [8]:
# Process Bronze Layer - Customer Attributes
print("=== Processing Bronze Layer - Attributes ===")
for i, date_str in enumerate(dates_str_lst):
    if i % 6 == 0:  # Print progress every 6 months
        print(f"Processing attributes bronze for {date_str}...")
    
    utils.feature_processing_bronze_table.process_attributes_bronze_table(
        date_str, bronze_attributes_directory, spark)

print(f"Attributes bronze processing completed for {len(dates_str_lst)} snapshots")

=== Processing Bronze Layer - Attributes ===
Processing attributes bronze for 2023-01-01...


                                                                                

Attributes 2023-01-01 row count: 530
Attributes bronze saved to: datamart/bronze/attributes/bronze_attributes_2023_01_01.csv
Attributes 2023-02-01 row count: 501
Attributes bronze saved to: datamart/bronze/attributes/bronze_attributes_2023_02_01.csv
Attributes 2023-03-01 row count: 506
Attributes bronze saved to: datamart/bronze/attributes/bronze_attributes_2023_03_01.csv
Attributes 2023-04-01 row count: 510
Attributes bronze saved to: datamart/bronze/attributes/bronze_attributes_2023_04_01.csv
Attributes 2023-05-01 row count: 521
Attributes bronze saved to: datamart/bronze/attributes/bronze_attributes_2023_05_01.csv
Attributes 2023-06-01 row count: 517
Attributes bronze saved to: datamart/bronze/attributes/bronze_attributes_2023_06_01.csv
Processing attributes bronze for 2023-07-01...
Attributes 2023-07-01 row count: 471
Attributes bronze saved to: datamart/bronze/attributes/bronze_attributes_2023_07_01.csv
Attributes 2023-08-01 row count: 481
Attributes bronze saved to: datamart/bron

In [9]:
# Process Bronze Layer - Financial Features
print("=== Processing Bronze Layer - Financials ===")
for i, date_str in enumerate(dates_str_lst):
    if i % 6 == 0:  # Print progress every 6 months
        print(f"Processing financials bronze for {date_str}...")
    
    utils.feature_processing_bronze_table.process_financials_bronze_table(
        date_str, bronze_financials_directory, spark)

print(f"Financials bronze processing completed for {len(dates_str_lst)} snapshots")

=== Processing Bronze Layer - Financials ===
Processing financials bronze for 2023-01-01...
Financials 2023-01-01 row count: 530
Financials bronze saved to: datamart/bronze/financials/bronze_financials_2023_01_01.csv
Financials 2023-02-01 row count: 501
Financials bronze saved to: datamart/bronze/financials/bronze_financials_2023_02_01.csv
Financials 2023-03-01 row count: 506
Financials bronze saved to: datamart/bronze/financials/bronze_financials_2023_03_01.csv
Financials 2023-04-01 row count: 510
Financials bronze saved to: datamart/bronze/financials/bronze_financials_2023_04_01.csv
Financials 2023-05-01 row count: 521
Financials bronze saved to: datamart/bronze/financials/bronze_financials_2023_05_01.csv
Financials 2023-06-01 row count: 517
Financials bronze saved to: datamart/bronze/financials/bronze_financials_2023_06_01.csv
Processing financials bronze for 2023-07-01...
Financials 2023-07-01 row count: 471
Financials bronze saved to: datamart/bronze/financials/bronze_financials_2

In [10]:
# Inspect bronze layer outputs
sample_date = dates_str_lst[-1]  # Use the latest date for inspection

print("=== Bronze Layer Sample Inspection ===")
print(f"Inspecting data for {sample_date}")

# Sample clickstream bronze
clickstream_sample = utils.feature_processing_bronze_table.process_clickstream_bronze_table(
    sample_date, bronze_clickstream_directory, spark)
print(f"\nClickstream bronze shape: {clickstream_sample.count()} rows, {len(clickstream_sample.columns)} columns")
clickstream_sample.show(5)

=== Bronze Layer Sample Inspection ===
Inspecting data for 2024-12-01
Clickstream 2024-12-01 row count: 8974
Clickstream bronze saved to: datamart/bronze/clickstream/bronze_clickstream_2024_12_01.csv

Clickstream bronze shape: 8974 rows, 25 columns
+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----------+-------------+--------------------+-------------------+---------------+
|fe_1|fe_2|fe_3|fe_4|fe_5|fe_6|fe_7|fe_8|fe_9|fe_10|fe_11|fe_12|fe_13|fe_14|fe_15|fe_16|fe_17|fe_18|fe_19|fe_20|Customer_ID|snapshot_date| ingestion_timestamp|        data_source|processing_date|
+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----------+-------------+--------------------+-------------------+---------------+
| 145| 189| 109| 134| 196| -37| 101|  82| 111|   24|  -26|  -17|   65|  249|  200|  185|  -83|  -18|  -76|   30| CUS_0x1037|   2024-12-01|2025-05-22 08:02:...|feat

## Silver Layer - Data Cleaning and Standardization

The Silver layer applies data quality improvements and standardization while maintaining time-awareness.

In [11]:
# Create silver layer directories
silver_clickstream_directory = "../datamart/silver/clickstream/"
silver_attributes_directory = "../datamart/silver/attributes/"
silver_financials_directory = "../datamart/silver/financials/"

# Create directories if they don't exist
for directory in [silver_clickstream_directory, silver_attributes_directory, silver_financials_directory]:
    if not os.path.exists(directory):
        os.makedirs(directory)
        print(f"Created directory: {directory}")

Created directory: ../datamart/silver/clickstream/
Created directory: ../datamart/silver/attributes/
Created directory: ../datamart/silver/financials/


In [12]:
# Process Silver Layer - Clickstream Features
print("=== Processing Silver Layer - Clickstream ===")
for i, date_str in enumerate(dates_str_lst):
    if i % 6 == 0:  # Print progress every 6 months
        print(f"Processing clickstream silver for {date_str}...")
    
    utils.feature_processing_silver_table.process_clickstream_silver_table(
        date_str, bronze_clickstream_directory, silver_clickstream_directory, spark)

print(f"Clickstream silver processing completed for {len(dates_str_lst)} snapshots")

=== Processing Silver Layer - Clickstream ===
Processing clickstream silver for 2023-01-01...
Clickstream silver loaded from: datamart/bronze/clickstream/bronze_clickstream_2023_01_01.csv, row count: 8974
Clickstream silver saved to: ../datamart/silver/clickstream/silver_clickstream_2023_01_01.parquet
Clickstream silver loaded from: datamart/bronze/clickstream/bronze_clickstream_2023_02_01.csv, row count: 8974
Clickstream silver saved to: ../datamart/silver/clickstream/silver_clickstream_2023_02_01.parquet
Clickstream silver loaded from: datamart/bronze/clickstream/bronze_clickstream_2023_03_01.csv, row count: 8974
Clickstream silver saved to: ../datamart/silver/clickstream/silver_clickstream_2023_03_01.parquet
Clickstream silver loaded from: datamart/bronze/clickstream/bronze_clickstream_2023_04_01.csv, row count: 8974
Clickstream silver saved to: ../datamart/silver/clickstream/silver_clickstream_2023_04_01.parquet
Clickstream silver loaded from: datamart/bronze/clickstream/bronze_cli

In [13]:
# Process Silver Layer - Customer Attributes
print("=== Processing Silver Layer - Attributes ===")
for i, date_str in enumerate(dates_str_lst):
    if i % 6 == 0:  # Print progress every 6 months
        print(f"Processing attributes silver for {date_str}...")
    
    utils.feature_processing_silver_table.process_attributes_silver_table(
        date_str, bronze_attributes_directory, silver_attributes_directory, spark)

print(f"Attributes silver processing completed for {len(dates_str_lst)} snapshots")

=== Processing Silver Layer - Attributes ===
Processing attributes silver for 2023-01-01...
Attributes silver loaded from: datamart/bronze/attributes/bronze_attributes_2023_01_01.csv, row count: 530
Attributes silver saved to: ../datamart/silver/attributes/silver_attributes_2023_01_01.parquet
Attributes silver loaded from: datamart/bronze/attributes/bronze_attributes_2023_02_01.csv, row count: 501
Attributes silver saved to: ../datamart/silver/attributes/silver_attributes_2023_02_01.parquet
Attributes silver loaded from: datamart/bronze/attributes/bronze_attributes_2023_03_01.csv, row count: 506
Attributes silver saved to: ../datamart/silver/attributes/silver_attributes_2023_03_01.parquet
Attributes silver loaded from: datamart/bronze/attributes/bronze_attributes_2023_04_01.csv, row count: 510
Attributes silver saved to: ../datamart/silver/attributes/silver_attributes_2023_04_01.parquet
Attributes silver loaded from: datamart/bronze/attributes/bronze_attributes_2023_05_01.csv, row coun

In [14]:
# Process Silver Layer - Financial Features
print("=== Processing Silver Layer - Financials ===")
for i, date_str in enumerate(dates_str_lst):
    if i % 6 == 0:  # Print progress every 6 months
        print(f"Processing financials silver for {date_str}...")
    
    utils.feature_processing_silver_table.process_financials_silver_table(
        date_str, bronze_financials_directory, silver_financials_directory, spark)

print(f"Financials silver processing completed for {len(dates_str_lst)} snapshots")

=== Processing Silver Layer - Financials ===
Processing financials silver for 2023-01-01...
Financials silver loaded from: datamart/bronze/financials/bronze_financials_2023_01_01.csv, row count: 530
Financials silver saved to: ../datamart/silver/financials/silver_financials_2023_01_01.parquet
Financials silver loaded from: datamart/bronze/financials/bronze_financials_2023_02_01.csv, row count: 501
Financials silver saved to: ../datamart/silver/financials/silver_financials_2023_02_01.parquet
Financials silver loaded from: datamart/bronze/financials/bronze_financials_2023_03_01.csv, row count: 506
Financials silver saved to: ../datamart/silver/financials/silver_financials_2023_03_01.parquet
Financials silver loaded from: datamart/bronze/financials/bronze_financials_2023_04_01.csv, row count: 510
Financials silver saved to: ../datamart/silver/financials/silver_financials_2023_04_01.parquet
Financials silver loaded from: datamart/bronze/financials/bronze_financials_2023_05_01.csv, row coun

In [15]:
# Inspect silver layer outputs
print("=== Silver Layer Sample Inspection ===")
print(f"Inspecting cleaned data for {sample_date}")

# Sample financial silver with cleaned features
financial_sample = utils.feature_processing_silver_table.process_financials_silver_table(
    sample_date, bronze_financials_directory, silver_financials_directory, spark)
print(f"\nFinancial silver shape: {financial_sample.count()} rows, {len(financial_sample.columns)} columns")
print("\nSample of cleaned financial features:")
financial_sample.select("Customer_ID", "Annual_Income_clean", "Credit_Mix_clean", 
                       "debt_to_income_ratio", "emi_to_income_ratio").show(5)

=== Silver Layer Sample Inspection ===
Inspecting cleaned data for 2024-12-01
Financials silver loaded from: datamart/bronze/financials/bronze_financials_2024_12_01.csv, row count: 515
Financials silver saved to: ../datamart/silver/financials/silver_financials_2024_12_01.parquet

Financial silver shape: 515 rows, 34 columns

Sample of cleaned financial features:
+-----------+-------------------+----------------+--------------------+--------------------+
|Customer_ID|Annual_Income_clean|Credit_Mix_clean|debt_to_income_ratio| emi_to_income_ratio|
+-----------+-------------------+----------------+--------------------+--------------------+
| CUS_0x103e|            98690.8|            Good|0.007163383459838501|0.006657328996024...|
| CUS_0x1195|           30429.91|        Standard|0.011911964548205489|0.010650028382346001|
| CUS_0x1197|           92300.01|         Unknown|0.008181689263171277|   6.619817318977127|
| CUS_0x11e2|           44986.55|            Good|0.016743004495614444|0.0063

## Gold Layer - ML-Ready Feature Store

The Gold layer creates machine learning-ready features by combining all data sources with time-aware aggregations and interaction features.

In [16]:
# Create gold layer directory
gold_feature_store_directory = "../datamart/gold/feature_store/"

if not os.path.exists(gold_feature_store_directory):
    os.makedirs(gold_feature_store_directory)
    print(f"Created directory: {gold_feature_store_directory}")

Created directory: ../datamart/gold/feature_store/


In [17]:
# Process Gold Layer - Integrated Feature Store
print("=== Processing Gold Layer - Feature Store ===")
print(f"Creating ML-ready features with {lookback_months}-month lookback period")

for i, date_str in enumerate(dates_str_lst):
    # Only process dates that have sufficient lookback history
    current_date = datetime.strptime(date_str, "%Y-%m-%d")
    start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
    
    # Check if we have enough history for this date
    months_since_start = (current_date.year - start_date.year) * 12 + current_date.month - start_date.month
    
    if months_since_start >= lookback_months:
        if i % 6 == 0:  # Print progress every 6 months
            print(f"Processing feature store for {date_str}...")
        
        utils.feature_processing_gold_table.process_features_gold_table(
            date_str, 
            silver_clickstream_directory, 
            silver_attributes_directory, 
            silver_financials_directory, 
            gold_feature_store_directory, 
            spark, 
            lookback_months=lookback_months
        )
    else:
        if i < 5:  # Only print for first few skipped dates
            print(f"Skipping {date_str} - insufficient lookback history")

print(f"Gold feature store processing completed")

=== Processing Gold Layer - Feature Store ===
Creating ML-ready features with 6-month lookback period
Skipping 2023-01-01 - insufficient lookback history
Skipping 2023-02-01 - insufficient lookback history
Skipping 2023-03-01 - insufficient lookback history
Skipping 2023-04-01 - insufficient lookback history
Skipping 2023-05-01 - insufficient lookback history
Processing feature store for 2023-07-01...
Processing features for 2023-07-01 with lookback to 2023-01-01
Found 8974 unique customers across all data sources
Clickstream features processed for 8974 customers
Attributes features processed for 3556 customers
Financial features processed for 3556 customers


                                                                                

Combined features for 8974 customers


                                                                                

Gold feature store saved to: ../datamart/gold/feature_store/gold_feature_store_2023_07_01.parquet
Processing features for 2023-08-01 with lookback to 2023-02-01
Found 8974 unique customers across all data sources
Clickstream features processed for 8974 customers
Attributes features processed for 4037 customers
Financial features processed for 4037 customers
Combined features for 8974 customers


                                                                                

Gold feature store saved to: ../datamart/gold/feature_store/gold_feature_store_2023_08_01.parquet
Processing features for 2023-09-01 with lookback to 2023-03-01
Found 8974 unique customers across all data sources
Clickstream features processed for 8974 customers
Attributes features processed for 4491 customers
Financial features processed for 4491 customers
Combined features for 8974 customers


                                                                                

Gold feature store saved to: ../datamart/gold/feature_store/gold_feature_store_2023_09_01.parquet
Processing features for 2023-10-01 with lookback to 2023-04-01
Found 8974 unique customers across all data sources
Clickstream features processed for 8974 customers
Attributes features processed for 4978 customers
Financial features processed for 4978 customers
Combined features for 8974 customers


                                                                                

Gold feature store saved to: ../datamart/gold/feature_store/gold_feature_store_2023_10_01.parquet
Processing features for 2023-11-01 with lookback to 2023-05-01
Found 8974 unique customers across all data sources
Clickstream features processed for 8974 customers
Attributes features processed for 5469 customers
Financial features processed for 5469 customers
Combined features for 8974 customers


                                                                                

Gold feature store saved to: ../datamart/gold/feature_store/gold_feature_store_2023_11_01.parquet
Processing features for 2023-12-01 with lookback to 2023-06-01
Found 8974 unique customers across all data sources
Clickstream features processed for 8974 customers
Attributes features processed for 5958 customers
Financial features processed for 5958 customers
Combined features for 8974 customers


                                                                                

Gold feature store saved to: ../datamart/gold/feature_store/gold_feature_store_2023_12_01.parquet
Processing feature store for 2024-01-01...
Processing features for 2024-01-01 with lookback to 2023-07-01


                                                                                

Found 8974 unique customers across all data sources
Clickstream features processed for 8974 customers
Attributes features processed for 6443 customers
Financial features processed for 6443 customers
Combined features for 8974 customers


                                                                                

Gold feature store saved to: ../datamart/gold/feature_store/gold_feature_store_2024_01_01.parquet
Processing features for 2024-02-01 with lookback to 2023-08-01
Found 8974 unique customers across all data sources
Clickstream features processed for 8974 customers
Attributes features processed for 6961 customers
Financial features processed for 6961 customers
Combined features for 8974 customers


                                                                                

Gold feature store saved to: ../datamart/gold/feature_store/gold_feature_store_2024_02_01.parquet
Processing features for 2024-03-01 with lookback to 2023-09-01
Found 8974 unique customers across all data sources
Clickstream features processed for 8974 customers
Attributes features processed for 7472 customers
Financial features processed for 7472 customers
Combined features for 8974 customers


                                                                                

Gold feature store saved to: ../datamart/gold/feature_store/gold_feature_store_2024_03_01.parquet
Processing features for 2024-04-01 with lookback to 2023-10-01
Found 8974 unique customers across all data sources
Clickstream features processed for 8974 customers
Attributes features processed for 7985 customers
Financial features processed for 7985 customers
Combined features for 8974 customers


                                                                                

Gold feature store saved to: ../datamart/gold/feature_store/gold_feature_store_2024_04_01.parquet
Processing features for 2024-05-01 with lookback to 2023-11-01


                                                                                

Found 8974 unique customers across all data sources
Clickstream features processed for 8974 customers
Attributes features processed for 8476 customers
Financial features processed for 8476 customers
Combined features for 8974 customers


                                                                                

Gold feature store saved to: ../datamart/gold/feature_store/gold_feature_store_2024_05_01.parquet
Processing features for 2024-06-01 with lookback to 2023-12-01
Found 8974 unique customers across all data sources
Clickstream features processed for 8974 customers
Attributes features processed for 8974 customers
Financial features processed for 8974 customers
Combined features for 8974 customers


                                                                                

Gold feature store saved to: ../datamart/gold/feature_store/gold_feature_store_2024_06_01.parquet
Processing feature store for 2024-07-01...
Processing features for 2024-07-01 with lookback to 2024-01-01
Found 9479 unique customers across all data sources
Clickstream features processed for 8974 customers
Attributes features processed for 9479 customers
Financial features processed for 9479 customers
Combined features for 9479 customers


                                                                                

Gold feature store saved to: ../datamart/gold/feature_store/gold_feature_store_2024_07_01.parquet
Processing features for 2024-08-01 with lookback to 2024-02-01
Found 10022 unique customers across all data sources
Clickstream features processed for 8974 customers
Attributes features processed for 10022 customers
Financial features processed for 10022 customers
Combined features for 10022 customers


                                                                                

Gold feature store saved to: ../datamart/gold/feature_store/gold_feature_store_2024_08_01.parquet
Processing features for 2024-09-01 with lookback to 2024-03-01
Found 10515 unique customers across all data sources
Clickstream features processed for 8974 customers
Attributes features processed for 10515 customers
Financial features processed for 10515 customers
Combined features for 10515 customers


                                                                                

Gold feature store saved to: ../datamart/gold/feature_store/gold_feature_store_2024_09_01.parquet
Processing features for 2024-10-01 with lookback to 2024-04-01
Found 10971 unique customers across all data sources
Clickstream features processed for 8974 customers
Attributes features processed for 10971 customers
Financial features processed for 10971 customers
Combined features for 10971 customers


                                                                                

Gold feature store saved to: ../datamart/gold/feature_store/gold_feature_store_2024_10_01.parquet
Processing features for 2024-11-01 with lookback to 2024-05-01
Found 11459 unique customers across all data sources
Clickstream features processed for 8974 customers
Attributes features processed for 11459 customers
Financial features processed for 11459 customers


                                                                                

Combined features for 11459 customers


                                                                                

Gold feature store saved to: ../datamart/gold/feature_store/gold_feature_store_2024_11_01.parquet
Processing features for 2024-12-01 with lookback to 2024-06-01
Found 11974 unique customers across all data sources
Clickstream features processed for 8974 customers
Attributes features processed for 11974 customers
Financial features processed for 11974 customers
Combined features for 11974 customers


[Stage 5204:>                                                       (0 + 8) / 8]

Gold feature store saved to: ../datamart/gold/feature_store/gold_feature_store_2024_12_01.parquet
Gold feature store processing completed


                                                                                

In [18]:
# Inspect gold layer feature store
print("=== Gold Layer Feature Store Inspection ===")

# Use a date with sufficient history
inspection_date = dates_str_lst[-1]  # Latest date
print(f"Inspecting feature store for {inspection_date}")

# Load and inspect the gold feature store
feature_sample = utils.feature_processing_gold_table.process_features_gold_table(
    inspection_date, 
    silver_clickstream_directory, 
    silver_attributes_directory, 
    silver_financials_directory, 
    gold_feature_store_directory, 
    spark, 
    lookback_months=lookback_months
)

print(f"\nFeature store shape: {feature_sample.count()} rows, {len(feature_sample.columns)} columns")
print(f"\nFeature store schema:")
feature_sample.printSchema()

=== Gold Layer Feature Store Inspection ===
Inspecting feature store for 2024-12-01
Processing features for 2024-12-01 with lookback to 2024-06-01
Found 11974 unique customers across all data sources
Clickstream features processed for 8974 customers


                                                                                

Attributes features processed for 11974 customers
Financial features processed for 11974 customers
Combined features for 11974 customers


                                                                                

Gold feature store saved to: ../datamart/gold/feature_store/gold_feature_store_2024_12_01.parquet


                                                                                


Feature store shape: 11974 rows, 137 columns

Feature store schema:
root
 |-- Customer_ID: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- monthly_salary: float (nullable = true)
 |-- credit_mix: string (nullable = true)
 |-- payment_of_min_amount: string (nullable = true)
 |-- credit_history_months: integer (nullable = true)
 |-- credit_utilization_ratio: float (nullable = true)
 |-- delay_from_due_date: integer (nullable = true)
 |-- num_loans: float (nullable = true)
 |-- num_delayed_payments: float (nullable = true)
 |-- outstanding_debt: float (nullable = true)
 |-- total_emi_monthly: float (nullable = true)
 |-- amount_invested_monthly: float (nullable = true)
 |-- monthly_balance: float (nullable = true)
 |-- debt_to_income_ratio: double (nullable = true)
 |-- emi_to_income_ratio: double (nullable = true)
 |-- high_credit_utilization: integer (nullable = true)
 |-- high_debt_burden: integer (nullable = true)
 |-- payment_issues: integer (nullable = tru

In [19]:
# Display sample of key features
print("=== Sample of Key Features ===")

# Select key features for display
key_features = [
    "Customer_ID", "annual_income", "age", "occupation_category", 
    "credit_utilization_ratio", "financial_health_score", 
    "has_clickstream_data", "has_attributes_data", "has_financial_data",
    "data_completeness_score", "feature_snapshot_date"
]

# Filter to show only columns that exist
existing_key_features = [col for col in key_features if col in feature_sample.columns]

feature_sample.select(*existing_key_features).show(10, truncate=False)

=== Sample of Key Features ===


                                                                                

+-----------+-------------+---+-------------------+------------------------+----------------------+--------------------+-------------------+------------------+-----------------------+---------------------+
|Customer_ID|annual_income|age|occupation_category|credit_utilization_ratio|financial_health_score|has_clickstream_data|has_attributes_data|has_financial_data|data_completeness_score|feature_snapshot_date|
+-----------+-------------+---+-------------------+------------------------+----------------------+--------------------+-------------------+------------------+-----------------------+---------------------+
|CUS_0x3383 |148990.16    |31 |Unknown            |33.24051                |0.4                   |1                   |1                  |1                 |1.0                    |2024-12-01           |
|CUS_0xb8de |17538.28     |30 |Professional       |36.147934               |0.4                   |1                   |1                  |1                 |1.0              

In [20]:
# Feature store statistics and summary
print("=== Feature Store Statistics ===")

# Check data availability distribution
print("Data Availability Summary:")
availability_stats = feature_sample.select(
    F.avg("has_clickstream_data").alias("clickstream_coverage"),
    F.avg("has_attributes_data").alias("attributes_coverage"),
    F.avg("has_financial_data").alias("financial_coverage"),
    F.avg("data_completeness_score").alias("avg_completeness_score")
).collect()[0]

print(f"Clickstream Data Coverage: {availability_stats['clickstream_coverage']:.2%}")
print(f"Attributes Data Coverage: {availability_stats['attributes_coverage']:.2%}")
print(f"Financial Data Coverage: {availability_stats['financial_coverage']:.2%}")
print(f"Average Completeness Score: {availability_stats['avg_completeness_score']:.3f}")

=== Feature Store Statistics ===
Data Availability Summary:
Clickstream Data Coverage: 74.95%
Attributes Data Coverage: 100.00%
Financial Data Coverage: 100.00%
Average Completeness Score: 0.916


In [21]:
# Load and inspect the complete feature store across all time periods
print("=== Complete Feature Store Analysis ===")

# Load all feature store files
folder_path = gold_feature_store_directory
files_list = [folder_path + os.path.basename(f) for f in glob.glob(os.path.join(folder_path, '*'))]

if files_list:
    complete_feature_store = spark.read.option("header", "true").parquet(*files_list)
    print(f"\nComplete feature store shape: {complete_feature_store.count()} rows, {len(complete_feature_store.columns)} columns")
    
    # Show time distribution
    print("\nFeature store time distribution:")
    time_dist = complete_feature_store.groupBy("feature_snapshot_date").count().orderBy("feature_snapshot_date")
    time_dist.show()
    
    print("\nFeature engineering pipeline completed successfully!")
    print(f"Total records processed: {complete_feature_store.count()}")
    print(f"Time periods covered: {time_dist.count()}")
else:
    print("No feature store files found. Please check the processing steps above.")

=== Complete Feature Store Analysis ===

Complete feature store shape: 172108 rows, 137 columns

Feature store time distribution:
+---------------------+-----+
|feature_snapshot_date|count|
+---------------------+-----+
|           2023-07-01| 8974|
|           2023-08-01| 8974|
|           2023-09-01| 8974|
|           2023-10-01| 8974|
|           2023-11-01| 8974|
|           2023-12-01| 8974|
|           2024-01-01| 8974|
|           2024-02-01| 8974|
|           2024-03-01| 8974|
|           2024-04-01| 8974|
|           2024-05-01| 8974|
|           2024-06-01| 8974|
|           2024-07-01| 9479|
|           2024-08-01|10022|
|           2024-09-01|10515|
|           2024-10-01|10971|
|           2024-11-01|11459|
|           2024-12-01|11974|
+---------------------+-----+


Feature engineering pipeline completed successfully!
Total records processed: 172108
Time periods covered: 18


## Feature Engineering Summary

This pipeline has successfully created a comprehensive feature store with customer-centric data integration and time-aware processing:

### Data Processing Results:
- **Total Records Processed**: 172,108 feature records across 18 time periods
- **Customer Coverage**: 11,974 unique customers with complete feature profiles
- **Feature Dimensions**: 137 engineered features per customer per time period
- **Temporal Coverage**: July 2023 to December 2024 with 6-month lookback periods

### Data Processing Layers:
- **Bronze Layer**: Raw data preservation with metadata tracking across 3 data sources (clickstream, attributes, financials)
- **Silver Layer**: Data quality improvements including underscore removal, format standardization, and categorical cleaning
- **Gold Layer**: Customer-centric ML-ready feature aggregations using "most recent available" data strategy

### Key Features:

1. **Behavioral Features (80 features)**:
   - Time-aware clickstream aggregations (fe_1 through fe_20: mean, std, min, max)
   - Behavioral stability metrics (coefficient of variation for each feature)
   - Activity metrics (record count, time span, engagement patterns)

2. **Demographic Features (7 features)**:
   - Cleaned age groups (18-29, 30-39, 40-49, 50-59, 60+)
   - Occupation categories (Professional, Education_Research, Technology_Media, Technical_Services, Other)
   - Data quality indicators (age validity, SSN validity, overall quality score)

3. **Financial Features (20 features)**:
   - Core financial metrics (income, credit utilization, payment history)
   - Derived risk indicators (debt-to-income ratio, EMI-to-income ratio)
   - Financial health score (composite 0-1 scale based on 5 risk factors)
   - Binary risk flags (high credit utilization, high debt burden, payment issues)

4. **Interaction Features (3 features)**:
   - Age-income interaction (normalized)
   - Relative credit history (credit months/age ratio)
   - Age-adjusted debt ratio (debt ratio weighted by age)

5. **Data Quality Features (4 features)**:
   - Source availability flags (has_clickstream_data, has_attributes_data, has_financial_data)
   - Overall completeness score (0-1 scale)