In [25]:
import os
import glob
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pprint
import pyspark
import pyspark.sql.functions as F

from pyspark.sql.functions import col, row_number, to_date, udf, count, when
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, ArrayType, DoubleType
from pyspark.sql.window import Window
from pyspark.sql.functions import regexp_extract, regexp_replace, when, trim, initcap
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator

import utils.features_attributes_bronze_table
import utils.features_attributes_silver_table
import utils.features_attributes_gold_table

In [30]:
# Initialize SparkSession
spark = pyspark.sql.SparkSession.builder \
    .appName("dev") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")

In [31]:
# set up config
snapshot_date_str = "2023-01-01"

start_date_str = "2023-01-01"
end_date_str = "2024-12-01"

In [32]:
# generate list of dates to process
def generate_first_of_month_dates(start_date_str, end_date_str):
    # Convert the date strings to datetime objects
    start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
    end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
    
    # List to store the first of month dates
    first_of_month_dates = []

    # Start from the first of the month of the start_date
    current_date = datetime(start_date.year, start_date.month, 1)

    while current_date <= end_date:
        # Append the date in yyyy-mm-dd format
        first_of_month_dates.append(current_date.strftime("%Y-%m-%d"))
        
        # Move to the first of the next month
        if current_date.month == 12:
            current_date = datetime(current_date.year + 1, 1, 1)
        else:
            current_date = datetime(current_date.year, current_date.month + 1, 1)

    return first_of_month_dates

dates_str_lst = generate_first_of_month_dates(start_date_str, end_date_str)
dates_str_lst

['2023-01-01',
 '2023-02-01',
 '2023-03-01',
 '2023-04-01',
 '2023-05-01',
 '2023-06-01',
 '2023-07-01',
 '2023-08-01',
 '2023-09-01',
 '2023-10-01',
 '2023-11-01',
 '2023-12-01',
 '2024-01-01',
 '2024-02-01',
 '2024-03-01',
 '2024-04-01',
 '2024-05-01',
 '2024-06-01',
 '2024-07-01',
 '2024-08-01',
 '2024-09-01',
 '2024-10-01',
 '2024-11-01',
 '2024-12-01']

In [33]:
# connect to source back end - IRL connect to back end source system
csv_file_path = "data/features_attributes.csv"

# load data - IRL ingest from back end source system
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

In [34]:
df.show()

+-----------+-----------------+---+-----------+-------------+-------------+
|Customer_ID|             Name|Age|        SSN|   Occupation|snapshot_date|
+-----------+-----------------+---+-----------+-------------+-------------+
| CUS_0x1000|   Alistair Barrf| 18|913-74-1218|       Lawyer|   2023-05-01|
| CUS_0x1009|           Arunah| 26|063-67-6938|     Mechanic|   2025-01-01|
| CUS_0x100b|         Shirboni| 19|  #F%$D@*&8|Media_Manager|   2024-03-01|
| CUS_0x1011|        Schneyerh| 44|793-05-8223|       Doctor|   2023-11-01|
| CUS_0x1013|         Cameront| 44|930-49-9615|     Mechanic|   2023-12-01|
| CUS_0x1015|          Holtono| 27|810-97-7024|   Journalist|   2023-08-01|
| CUS_0x1018|      Felsenthalq| 15|731-19-8119|   Accountant|   2023-11-01|
| CUS_0x1026|          Josephv| 52|500-62-9044|      Manager|   2023-10-01|
| CUS_0x102d| Neil Chatterjeex| 31|692-71-7552| Entrepreneur|   2024-01-01|
| CUS_0x102e|            Rhysn| 26|  #F%$D@*&8|    Scientist|   2024-04-01|
| CUS_0x1032

### Clean up age, SSN, Occupation for silver table

In [19]:
# 1. Clean Age: Extract digits and cast to Integer
df_cleaned = df.withColumn(
    "Age",
    regexp_extract(col("Age"), r"(\d+)", 1).cast(IntegerType())
)

# 2. Clean SSN: Match valid SSNs, else null
valid_ssn_pattern = r"^\d{3}-\d{2}-\d{4}$"
df_cleaned = df_cleaned.withColumn(
    "SSN",
    when(col("SSN").rlike(valid_ssn_pattern), col("SSN")).otherwise(None)
)

# 3. Clean Occupation: Remove placeholder, null if empty or invalid
df_cleaned = df_cleaned.withColumn(
    "Occupation",
    when((trim(col("Occupation")) == "") | (col("Occupation") == "_______"), None)
    .otherwise(initcap(regexp_replace(col("Occupation"), "_", " ")))  # Optional: beautify names
)

# Show result
df_cleaned.show(truncate=False)

+-----------+-----------------+---+-----------+-------------+-------------+
|Customer_ID|Name             |Age|SSN        |Occupation   |snapshot_date|
+-----------+-----------------+---+-----------+-------------+-------------+
|CUS_0x1000 |Alistair Barrf   |18 |913-74-1218|Lawyer       |2023-05-01   |
|CUS_0x1009 |Arunah           |26 |063-67-6938|Mechanic     |2025-01-01   |
|CUS_0x100b |Shirboni         |19 |NULL       |Media Manager|2024-03-01   |
|CUS_0x1011 |Schneyerh        |44 |793-05-8223|Doctor       |2023-11-01   |
|CUS_0x1013 |Cameront         |44 |930-49-9615|Mechanic     |2023-12-01   |
|CUS_0x1015 |Holtono          |27 |810-97-7024|Journalist   |2023-08-01   |
|CUS_0x1018 |Felsenthalq      |15 |731-19-8119|Accountant   |2023-11-01   |
|CUS_0x1026 |Josephv          |52 |500-62-9044|Manager      |2023-10-01   |
|CUS_0x102d |Neil Chatterjeex |31 |692-71-7552|Entrepreneur |2024-01-01   |
|CUS_0x102e |Rhysn            |26 |NULL       |Scientist    |2024-04-01   |
|CUS_0x1032 

In [20]:
# clean data: enforce schema / data type
# Dictionary specifying columns and their desired datatypes
column_type_map = {
    "Customer_ID": StringType(),
    "Name": StringType(),
    "Age": IntegerType(),
    "SSN": StringType(),
    "Occupation": StringType(),
    "snapshot_date": DateType(),
}

#change to new dtype
for column, new_type in column_type_map.items():
    df_cleaned = df_cleaned.withColumn(column, col(column).cast(new_type))

### Gold

### One hot encode & keep only latest result for gold table

In [22]:
# Impute occupation with most freq value and age with median val

# Occupation
# Find the most frequent (mode) value
mode_value = (
    df_cleaned.filter(col("Occupation").isNotNull())
      .groupBy("Occupation")
      .count()
      .orderBy(col("count").desc())
      .first()[0]
)

#  Fill nulls in the column with the mode value
df_cleaned = df_cleaned.fillna({ "Occupation": mode_value })

# Age
# Filter out nulls and select the column as a list of values
non_null_values = df_cleaned.select("Age").where(col("Age").isNotNull())

# Compute the approximate median using approxQuantile
# [0.5] is for the median (50th percentile), and 0.01 is the relative error
median_value = non_null_values.approxQuantile("Age", [0.5], 0.01)[0]

# Fill nulls with the median
df_cleaned = df_cleaned.fillna({ "Age": median_value })

In [23]:
# Index and encode
indexer = StringIndexer(inputCol="Occupation", outputCol="Occupation_Indexed", handleInvalid="keep")
encoder = OneHotEncoder(inputCol="Occupation_Indexed", outputCol="Occupation_OHE")  # dropLast=True by default

pipeline = Pipeline(stages=[indexer, encoder])
model = pipeline.fit(df_cleaned)
df_encoded = model.transform(df_cleaned)

# Get occupation labels
occupation_labels = model.stages[0].labels  # StringIndexer labels

#  Convert OHE vector to array
def vector_to_array(v):
    return v.toArray().tolist()

vec_to_array_udf = udf(vector_to_array, ArrayType(DoubleType()))
df_encoded = df_encoded.withColumn("Occupation_OHE_Array", vec_to_array_udf(col("Occupation_OHE")))

# Create individual binary columns, but DROP the first category (index 0)
for i, label in enumerate(occupation_labels[1:], start=1):  # Skip first label
    col_name = f"is_{label.replace(' ', '_')}"
    df_encoded = df_encoded.withColumn(col_name, col("Occupation_OHE_Array")[i - 1])  # Index i-1 due to dropped one

# Clean up intermediate columns
df_encoded = df_encoded.drop("Occupation_Indexed", "Occupation_OHE", "Occupation_OHE_Array", "Occupation")

In [24]:
df_encoded.show()

+-----------+-----------------+---+-----------+-------------+------------+-----------+-------------+------------+----------+------------+-----------+----------------+---------------+-------------+---------+-----------+----------+---------+
|Customer_ID|             Name|Age|        SSN|snapshot_date|is_Architect|is_Engineer|is_Accountant|is_Scientist|is_Teacher|is_Developer|is_Mechanic|is_Media_Manager|is_Entrepreneur|is_Journalist|is_Doctor|is_Musician|is_Manager|is_Writer|
+-----------+-----------------+---+-----------+-------------+------------+-----------+-------------+------------+----------+------------+-----------+----------------+---------------+-------------+---------+-----------+----------+---------+
| CUS_0x1000|   Alistair Barrf| 18|913-74-1218|   2023-05-01|         1.0|        0.0|          0.0|         0.0|       0.0|         0.0|        0.0|             0.0|            0.0|          0.0|      0.0|        0.0|       0.0|      0.0|
| CUS_0x1009|           Arunah| 26|063-6

                                                                                

In [55]:
df_cleaned = df_encoded

# Ensure snapshot_date is in date format
df_cleaned = df_cleaned.withColumn("snapshot_date", to_date("snapshot_date", "yyyy-MM-dd"))

# Define a window partitioned by Customer_ID, ordered by snapshot_date descending
window_spec = Window.partitionBy("Customer_ID").orderBy(col("snapshot_date").desc())

# Add row_number to rank records
ranked_df = df_cleaned.withColumn("rn", row_number().over(window_spec))

# Filter only latest records (row_number == 1)
latest_df = ranked_df.filter(col("rn") == 1).drop("rn")

# Show final result
latest_df.show(truncate=False)

[Stage 492:>                                                        (0 + 1) / 1]

+-----------+-----------------+---+-----------+-------------+-------------+------------+-----------+-------------+------------+----------+------------+-----------+----------------+---------------+-------------+---------+-----------+----------+---------+
|Customer_ID|Name             |Age|SSN        |Occupation   |snapshot_date|is_Architect|is_Engineer|is_Accountant|is_Scientist|is_Teacher|is_Developer|is_Mechanic|is_Media_Manager|is_Entrepreneur|is_Journalist|is_Doctor|is_Musician|is_Manager|is_Writer|
+-----------+-----------------+---+-----------+-------------+-------------+------------+-----------+-------------+------------+----------+------------+-----------+----------------+---------------+-------------+---------+-----------+----------+---------+
|CUS_0x1000 |Alistair Barrf   |18 |913-74-1218|Lawyer       |2023-05-01   |1.0         |0.0        |0.0          |0.0         |0.0       |0.0         |0.0        |0.0             |0.0            |0.0          |0.0      |0.0        |0.0   

                                                                                

### Construct pipeline to build bronze, silver and gold tables

### Bronze

In [26]:
# create bronze datalake
bronze_features_attributes_directory = "datamart/bronze/features_attributes/"

if not os.path.exists(bronze_features_attributes_directory):
    os.makedirs(bronze_features_attributes_directory)

In [24]:
# run bronze backfill
for date_str in dates_str_lst:
    utils.features_attributes_bronze_table.features_attributes_bronze_table(date_str, bronze_features_attributes_directory, spark)

2023-01-01row count: 530
saved to: datamart/bronze/features_attributes/bronze_features_attributes_daily_2023_01_01.csv
2023-02-01row count: 501
saved to: datamart/bronze/features_attributes/bronze_features_attributes_daily_2023_02_01.csv
2023-03-01row count: 506
saved to: datamart/bronze/features_attributes/bronze_features_attributes_daily_2023_03_01.csv
2023-04-01row count: 510
saved to: datamart/bronze/features_attributes/bronze_features_attributes_daily_2023_04_01.csv
2023-05-01row count: 521
saved to: datamart/bronze/features_attributes/bronze_features_attributes_daily_2023_05_01.csv
2023-06-01row count: 517
saved to: datamart/bronze/features_attributes/bronze_features_attributes_daily_2023_06_01.csv
2023-07-01row count: 471
saved to: datamart/bronze/features_attributes/bronze_features_attributes_daily_2023_07_01.csv
2023-08-01row count: 481
saved to: datamart/bronze/features_attributes/bronze_features_attributes_daily_2023_08_01.csv
2023-09-01row count: 454
saved to: datamart/bron

### Silver

In [27]:
# create silver datalake
silver_features_attributes_directory = "datamart/silver/features_attributes/"

if not os.path.exists(silver_features_attributes_directory):
    os.makedirs(silver_features_attributes_directory)

In [27]:
# run silver backfill
for date_str in dates_str_lst:
    utils.features_attributes_silver_table.features_attributes_silver_table(date_str, bronze_features_attributes_directory, silver_features_attributes_directory, spark)

loaded from: datamart/bronze/features_attributes/bronze_features_attributes_daily_2023_01_01.csv row count: 530


                                                                                

saved to: datamart/silver/features_attributes/silver_features_attributes_daily_2023_01_01.parquet
loaded from: datamart/bronze/features_attributes/bronze_features_attributes_daily_2023_02_01.csv row count: 501
saved to: datamart/silver/features_attributes/silver_features_attributes_daily_2023_02_01.parquet
loaded from: datamart/bronze/features_attributes/bronze_features_attributes_daily_2023_03_01.csv row count: 506
saved to: datamart/silver/features_attributes/silver_features_attributes_daily_2023_03_01.parquet
loaded from: datamart/bronze/features_attributes/bronze_features_attributes_daily_2023_04_01.csv row count: 510
saved to: datamart/silver/features_attributes/silver_features_attributes_daily_2023_04_01.parquet
loaded from: datamart/bronze/features_attributes/bronze_features_attributes_daily_2023_05_01.csv row count: 521
saved to: datamart/silver/features_attributes/silver_features_attributes_daily_2023_05_01.parquet
loaded from: datamart/bronze/features_attributes/bronze_featur

                                                                                

saved to: datamart/silver/features_attributes/silver_features_attributes_daily_2023_07_01.parquet
loaded from: datamart/bronze/features_attributes/bronze_features_attributes_daily_2023_08_01.csv row count: 481
saved to: datamart/silver/features_attributes/silver_features_attributes_daily_2023_08_01.parquet
loaded from: datamart/bronze/features_attributes/bronze_features_attributes_daily_2023_09_01.csv row count: 454
saved to: datamart/silver/features_attributes/silver_features_attributes_daily_2023_09_01.parquet
loaded from: datamart/bronze/features_attributes/bronze_features_attributes_daily_2023_10_01.csv row count: 487
saved to: datamart/silver/features_attributes/silver_features_attributes_daily_2023_10_01.parquet
loaded from: datamart/bronze/features_attributes/bronze_features_attributes_daily_2023_11_01.csv row count: 491
saved to: datamart/silver/features_attributes/silver_features_attributes_daily_2023_11_01.parquet
loaded from: datamart/bronze/features_attributes/bronze_featur

                                                                                

saved to: datamart/silver/features_attributes/silver_features_attributes_daily_2024_02_01.parquet
loaded from: datamart/bronze/features_attributes/bronze_features_attributes_daily_2024_03_01.csv row count: 511
saved to: datamart/silver/features_attributes/silver_features_attributes_daily_2024_03_01.parquet
loaded from: datamart/bronze/features_attributes/bronze_features_attributes_daily_2024_04_01.csv row count: 513
saved to: datamart/silver/features_attributes/silver_features_attributes_daily_2024_04_01.parquet
loaded from: datamart/bronze/features_attributes/bronze_features_attributes_daily_2024_05_01.csv row count: 491
saved to: datamart/silver/features_attributes/silver_features_attributes_daily_2024_05_01.parquet
loaded from: datamart/bronze/features_attributes/bronze_features_attributes_daily_2024_06_01.csv row count: 498
saved to: datamart/silver/features_attributes/silver_features_attributes_daily_2024_06_01.parquet
loaded from: datamart/bronze/features_attributes/bronze_featur

                                                                                

saved to: datamart/silver/features_attributes/silver_features_attributes_daily_2024_11_01.parquet
loaded from: datamart/bronze/features_attributes/bronze_features_attributes_daily_2024_12_01.csv row count: 515
saved to: datamart/silver/features_attributes/silver_features_attributes_daily_2024_12_01.parquet


In [31]:
utils.features_attributes_silver_table.features_attributes_silver_table(date_str, bronze_features_attributes_directory, silver_features_attributes_directory, spark).toPandas()

loaded from: datamart/bronze/features_attributes/bronze_features_attributes_daily_2024_12_01.csv row count: 515
saved to: datamart/silver/features_attributes/silver_features_attributes_daily_2024_12_01.parquet


Unnamed: 0,Customer_ID,Name,Age,SSN,Occupation,snapshot_date
0,CUS_0x103e,Tim Kellyf,40,155-72-8070,Scientist,2024-12-01
1,CUS_0x1195,Alexk,31,822-48-3629,Manager,2024-12-01
2,CUS_0x1197,Nayako,28,799-23-8283,,2024-12-01
3,CUS_0x11e2,Valetkevitchr,34,809-04-1419,Musician,2024-12-01
4,CUS_0x11ec,William Schombergh,34,417-74-2163,Journalist,2024-12-01
...,...,...,...,...,...,...
510,CUS_0xe6c,Doris Frankelh,26,172-24-1577,Entrepreneur,2024-12-01
511,CUS_0xe99,Moone,48,164-90-3178,Mechanic,2024-12-01
512,CUS_0xf55,Tarmo Virkip,39,025-54-8593,Entrepreneur,2024-12-01
513,CUS_0xfd1,Frewy,32,389-55-6408,Architect,2024-12-01


### Gold

In [35]:
# create gold datalake
gold_features_attributes_directory = "datamart/gold/features_attributes/"

if not os.path.exists(gold_features_attributes_directory):
    os.makedirs(gold_features_attributes_directory)

In [37]:
# run gold backfill
for date_str in dates_str_lst:
    utils.features_attributes_gold_table.features_attributes_gold_table(date_str, silver_features_attributes_directory, gold_features_attributes_directory, spark)

loaded from: datamart/silver/features_attributes/silver_features_attributes_daily_2023_01_01.parquet row count: 530
saved to: datamart/gold/features_attributes/gold_features_attributes_daily_2023_01_01.parquet
loaded from: datamart/silver/features_attributes/silver_features_attributes_daily_2023_02_01.parquet row count: 501
saved to: datamart/gold/features_attributes/gold_features_attributes_daily_2023_02_01.parquet
loaded from: datamart/silver/features_attributes/silver_features_attributes_daily_2023_03_01.parquet row count: 506
saved to: datamart/gold/features_attributes/gold_features_attributes_daily_2023_03_01.parquet
loaded from: datamart/silver/features_attributes/silver_features_attributes_daily_2023_04_01.parquet row count: 510
saved to: datamart/gold/features_attributes/gold_features_attributes_daily_2023_04_01.parquet
loaded from: datamart/silver/features_attributes/silver_features_attributes_daily_2023_05_01.parquet row count: 521
saved to: datamart/gold/features_attributes/