In [18]:
import os
import glob
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pprint
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType
import sys
from pathlib import Path

from pathlib import Path
import sys

sys.path.append(str(Path.cwd().parent / "utils"))
import data_processing_gold_table

# Initialize SparkSession (local dev)
spark = pyspark.sql.SparkSession.builder \
    .appName("dev") \
    .master("local[*]") \
    .getOrCreate()

# Reduce Spark logs to errors
spark.sparkContext.setLogLevel("ERROR")

# === Gold layer: feature store ===
gold_feature_store_directory = "/app/datamart/gold/feature_store/"
silver_attr_directory = "/app/datamart/silver/attributes/"
silver_fin_directory = "/app/datamart/silver/fin/"
bronze_clickstream_directory = "/app/datamart/bronze/clickstream/"

# NOTE: gold_label_feature_store_directory is referenced but not defined above.
# Ensure dir exists (as written; may raise if var undefined at runtime)
if not os.path.exists(gold_feature_store_directory):
    os.makedirs(gold_feature_store_directory)


In [19]:
df = spark.read.csv(bronze_clickstream_directory, header=True, inferSchema=True)
df.show()


+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----------+-------------+
|fe_1|fe_2|fe_3|fe_4|fe_5|fe_6|fe_7|fe_8|fe_9|fe_10|fe_11|fe_12|fe_13|fe_14|fe_15|fe_16|fe_17|fe_18|fe_19|fe_20|Customer_ID|snapshot_date|
+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----------+-------------+
|  71| 243|  89| 149| 202|  56| 149| 207| 232|  111|  179|  340|  -17|  100|  115|   68|   39|   87|  -74|  240| CUS_0x1037|   2024-03-01|
| -60|  65|  30| -56|  25| 206|  68| 157| -36|   81|   77|   66|  169|   98|   75|  268|   95|   19|  151|  234| CUS_0x1069|   2024-03-01|
|  50|  76| 115| 114| -57| 136|  33|  69| 175|   92|  218|  151|   30|  204|  268|  181|  333|   70|  190|   23| CUS_0x114a|   2024-03-01|
|   1| 245|  72| 247| 212|  80| 144| 138|   7|  -86|  284|   35|  -46|  159|   81|   58|  340|   38|   66|  -36| CUS_0x1184|   2024-03-01|
|   8| 138| 164| 147| 184| 

In [20]:
from pyspark.sql import functions as F

# 2. Get the first user in the dataset
first_user = df.select("Customer_ID").first()[0]
print("First user:", first_user)

df.filter(F.col("Customer_ID") == first_user) \
  .orderBy(F.col("snapshot_date")) \
  .show(30)


First user: CUS_0x1037
+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----------+-------------+
|fe_1|fe_2|fe_3|fe_4|fe_5|fe_6|fe_7|fe_8|fe_9|fe_10|fe_11|fe_12|fe_13|fe_14|fe_15|fe_16|fe_17|fe_18|fe_19|fe_20|Customer_ID|snapshot_date|
+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----------+-------------+
|  63| 118|  80| 121|  55| 193| 111| 112|-101|   83|  164|  105|  -16|  -81| -126|  114|   35|   85|  -73|   76| CUS_0x1037|   2023-01-01|
|  55|  96|  74| 171| -69|-103|  86| 178|  42|  113|  100|  183|   39|  179|  161|   71|  208|   -7|  129|  206| CUS_0x1037|   2023-02-01|
| 232| 203| -71|  86| 243|  12|  79| 122| 103|  178|  273|   96|  201|   37|   96|   63|   98|   69|  257|    3| CUS_0x1037|   2023-03-01|
|   9| -29|  -8| 116| 198|  68|  79|  61| 113|   91|   36|  179|   17|  219|   89|  136|  -25|  180|  109|  118| CUS_0x1037|   2023-04-01|
| 20

In [21]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, to_date, add_months

parquet_files = glob.glob(os.path.join(silver_attr_directory, "*.parquet"))
df_attr = spark.read.parquet(*parquet_files)
df_attr = df_attr.drop("Age", "Name", "SSN")

parquet_files = glob.glob(os.path.join(silver_fin_directory, "*.parquet"))
df_fin = spark.read.parquet(*parquet_files)
df_fin = df_fin.drop("Type_of_Loan")
# Collect null percentages into a dict
nulls = df_fin.select([
    (F.count(F.when(F.col(c).isNull(), c)) / rows * 100).alias(c)
    for c in df_fin.columns
])
nulls_dict = nulls.collect()[0].asDict()
cols_to_drop = [col for col, pct in nulls_dict.items() if pct > 5]
df_fin = df_fin.drop(*cols_to_drop)
print("Dropped columns:", cols_to_drop)

# Drop the rest of the NAs
df_fin = df_fin.dropna()    
# 1️⃣ Join finance + attribute (both share the same snapshot_date)
df_fin_attr = df_fin.join(df_attr, ["Customer_ID", "snapshot_date"], "inner")


# load and ensure date type
df_click = spark.read.csv(bronze_clickstream_directory, header=True, inferSchema=True) \
    .withColumn("snapshot_date", to_date(col("snapshot_date")))

fe = [f"fe_{i}" for i in range(1, 21)]
c, p = df_click.alias("c"), df_click.alias("p")

cond = (
    (col("c.Customer_ID") == col("p.Customer_ID")) &
    (col("p.snapshot_date") >= add_months(col("c.snapshot_date"), -4)) &
    (col("p.snapshot_date") <= col("c.snapshot_date"))
)

j = c.join(p, cond)

avg_expr = [F.avg(col(f"p.{f}")).alias(f"{f}_5m_avg") for f in fe]

click_5m = (
    j.groupBy(col("c.Customer_ID").alias("Customer_ID"), col("c.snapshot_date").alias("snapshot_date"))
     .agg(*avg_expr)
)

# Merge with df_fin_attr keeping clickstream snapshot_date
merged_df = df_fin_attr.join(click_5m, ["Customer_ID", "snapshot_date"], "inner")
merged_df.show(5, truncate=False)


Dropped columns: ['Outstanding_Debt', 'Payment_of_Min_Amount', 'Total_EMI_per_month', 'Amount_invested_monthly', 'Monthly_Balance']
+-----------+-------------+-------------+---------------------+-----------------+---------------+-------------+-----------+-------------------+----------------------+--------------------+--------------------+----------+------------------------+-------------------------------+------------------+----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
|Customer_ID|snapshot_date|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|Credit_Utilization_Ratio|Payment_Behaviour              |Credit_History_Age|Occupation|fe_1

In [None]:
rows = df_fin.count()

nulls = df_fin.select([
    (F.count(F.when(F.col(c).isNull(), c)) / rows * 100).alias(c)
    for c in df_fin.columns
])

# Collect null percentages into a dict
nulls_dict = nulls.collect()[0].asDict()

# Get columns to drop
cols_to_drop = [col for col, pct in nulls_dict.items() if pct > 5]

# Drop columns from DataFrame
df_fin = df_fin.drop(*cols_to_drop)
print("Dropped columns:", cols_to_drop)

# Drop the rest of the NAs
df_fin = df_fin.dropna()    

df_click = spark.read.csv(bronze_clickstream_directory, header=True, inferSchema=True)

# 1️⃣ Join finance + attribute (both share the same snapshot_date)
df_fin_attr = df_fin.join(df_attr, ["Customer_ID", "snapshot_dates"], "inner")
df_fin_attr.show()

Dropped columns: []
+-----------+-------------+---------------------+-----------------+---------------+-------------+-----------+-------------------+----------------------+--------------------+--------------------+----------+------------------------+--------------------+-------------+------------------+------------+-------------+
|Customer_ID|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|Credit_Utilization_Ratio|   Payment_Behaviour|snapshot_date|Credit_History_Age|  Occupation|snapshot_date|
+-----------+-------------+---------------------+-----------------+---------------+-------------+-----------+-------------------+----------------------+--------------------+--------------------+----------+------------------------+--------------------+-------------+------------------+------------+-------------+
| CUS_0x10ac|    16718.645|            1270.

In [35]:
df_click.count()

215376

In [None]:

# 2️⃣ Drop their snapshot_date — we’ll use the one from clickstream
# df_fin_attr = df_fin_attr.drop("snapshot_date")

fe = [f"fe_{i}" for i in range(1, 21)]
c, p = df_click.alias("c"), df_click.alias("p")

cond = (
    (col("c.Customer_ID") == col("p.Customer_ID")) &
    (col("p.snapshot_date") >= add_months(col("c.snapshot_date"), -4)) &
    (col("p.snapshot_date") <= col("c.snapshot_date"))
)

j = c.join(p, cond)

avg_expr = [F.avg(col(f"p.{f}")).alias(f"{f}_5m_avg") for f in fe]

click_5m = (
    j.groupBy(col("c.Customer_ID").alias("Customer_ID"), col("c.snapshot_date").alias("snapshot_date"))
    .agg(*avg_expr)
)
click_5m.show()

                                                                                

215376

In [None]:

# 3️⃣ Join with clickstream (keeping all customers from df_click)
merged_df = df_fin_attr.join(click_5m, ["Customer_ID"], "right")
merged_df.show()




                                                                                

+-----------+-------------+-------------+---------------------+-----------------+---------------+-------------+-----------+-------------------+----------------------+--------------------+--------------------+----------+------------------------+-----------------+------------------+----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
|Customer_ID|snapshot_date|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|Credit_Utilization_Ratio|Payment_Behaviour|Credit_History_Age|Occupation|fe_1_5m_avg|fe_2_5m_avg|fe_3_5m_avg|fe_4_5m_avg|fe_5_5m_avg|fe_6_5m_avg|fe_7_5m_avg|fe_8_5m_avg|fe_9_5m_avg|fe_10_5m_avg|fe_11_5m_avg|fe_12_5m_avg|fe_13_5m_avg|fe_1

In [None]:
merged_df.count()

9784

In [27]:
merged_df.printSchema()

root
 |-- Customer_ID: string (nullable = true)
 |-- snapshot_date: date (nullable = true)
 |-- Annual_Income: float (nullable = true)
 |-- Monthly_Inhand_Salary: float (nullable = true)
 |-- Num_Bank_Accounts: float (nullable = true)
 |-- Num_Credit_Card: float (nullable = true)
 |-- Interest_Rate: float (nullable = true)
 |-- Num_of_Loan: float (nullable = true)
 |-- Delay_from_due_date: float (nullable = true)
 |-- Num_of_Delayed_Payment: float (nullable = true)
 |-- Changed_Credit_Limit: float (nullable = true)
 |-- Num_Credit_Inquiries: float (nullable = true)
 |-- Credit_Mix: string (nullable = true)
 |-- Credit_Utilization_Ratio: float (nullable = true)
 |-- Payment_Behaviour: string (nullable = true)
 |-- Credit_History_Age: integer (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- fe_1_5m_avg: double (nullable = true)
 |-- fe_2_5m_avg: double (nullable = true)
 |-- fe_3_5m_avg: double (nullable = true)
 |-- fe_4_5m_avg: double (nullable = true)
 |-- fe_5_5m_avg:

In [28]:


# write partitioned by snapshot_date (one write, stable schema)
merged_df.repartition("snapshot_date") \
        .write.mode("overwrite") \
        .partitionBy("snapshot_date") \
        .parquet(gold_feature_store_directory)

                                                                                

In [29]:
gold_feature_directory = "/app/datamart/gold/feature_store/"

# Read all CSV files into a single DataFrame

check_df = spark.read.option("header", "true").parquet(gold_feature_directory)
check_df.show()

+-----------+-------------+---------------------+-----------------+---------------+-------------+-----------+-------------------+----------------------+--------------------+--------------------+----------+------------------------+--------------------+------------------+-------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+-------------+
|Customer_ID|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|Credit_Utilization_Ratio|   Payment_Behaviour|Credit_History_Age|   Occupation|fe_1_5m_avg|fe_2_5m_avg|fe_3_5m_avg|fe_4_5m_avg|fe_5_5m_avg|fe_6_5m_avg|fe_7_5m_avg|fe_8_5m_avg|fe_9_5m_avg|fe_10_5m_avg|fe_11_5m_avg|fe_12_5m_avg|fe_13_5m_avg|fe_14_