In [1]:
import os
from pathlib import Path
import glob
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pprint
import pyspark
import pyspark.sql.functions as F

from pyspark.sql.functions import col, sum as spark_sum, isnan, when
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType

In [2]:
# Initialize SparkSession
spark = pyspark.sql.SparkSession.builder \
    .appName("dev") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/20 20:39:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Set Up

In [3]:
bronze_dir = Path("datamart/bronze")
silver_dir = Path("datamart/silver")
gold_dir = Path("datamart/gold")

In [4]:
parquet_files = [
    bronze_dir / "features_credit_history.parquet",
    bronze_dir / "features_demographic.parquet",
    bronze_dir / "features_financial.parquet",
    bronze_dir / "features_loan_terms.parquet"
]

In [5]:
# Correcting timestamp resolution to make it PySpark compatible
for file in parquet_files:
    df = pd.read_parquet(file)
    df.to_parquet(file, coerce_timestamps="us")

In [6]:
df['snapshot_date'].info()

<class 'pandas.core.series.Series'>
RangeIndex: 2260668 entries, 0 to 2260667
Series name: snapshot_date
Non-Null Count    Dtype         
--------------    -----         
2260668 non-null  datetime64[us]
dtypes: datetime64[us](1)
memory usage: 17.2 MB


In [7]:
# Converting parquet files to PySpark format
for file in parquet_files:
    df = spark.read.parquet(str(file))
    output_dir = bronze_dir / file.stem  
    df.write.mode("overwrite").parquet(str(output_dir))

                                                                                

In [5]:
parquet_folders = [
    bronze_dir / "features_credit_history",
    bronze_dir / "features_demographic",
    bronze_dir / "features_financial",
    bronze_dir / "features_loan_terms"
]

In [9]:
for dir_path in parquet_folders:
    print(f"\n=== {dir_path.name} ===")
    df = spark.read.parquet(str(dir_path))
    
    if "snapshot_date" in df.columns:
        df.select("snapshot_date").printSchema()  # see data type
        df.select("snapshot_date").show(5, truncate=False)  # preview first 5 values
    else:
        print("⚠️ 'snapshot_date' column not found.")


=== features_credit_history ===
root
 |-- snapshot_date: timestamp_ntz (nullable = true)

+-------------------+
|snapshot_date      |
+-------------------+
|2016-01-01 00:00:00|
|2016-01-01 00:00:00|
|2016-01-01 00:00:00|
|2016-01-01 00:00:00|
|2016-01-01 00:00:00|
+-------------------+
only showing top 5 rows


=== features_demographic ===
root
 |-- snapshot_date: timestamp_ntz (nullable = true)

+-------------------+
|snapshot_date      |
+-------------------+
|2016-01-01 00:00:00|
|2016-01-01 00:00:00|
|2016-01-01 00:00:00|
|2016-01-01 00:00:00|
|2016-01-01 00:00:00|
+-------------------+
only showing top 5 rows


=== features_financial ===
root
 |-- snapshot_date: timestamp_ntz (nullable = true)

+-------------------+
|snapshot_date      |
+-------------------+
|2016-01-01 00:00:00|
|2016-01-01 00:00:00|
|2016-01-01 00:00:00|
|2016-01-01 00:00:00|
|2016-01-01 00:00:00|
+-------------------+
only showing top 5 rows


=== features_loan_terms ===
root
 |-- snapshot_date: timestamp_nt

In [10]:
for dir_path in parquet_folders:
    df = spark.read.parquet(str(dir_path))
    print(f"Schema: {dir_path.name}")
    df.printSchema()

Schema: features_credit_history
root
 |-- member_id: string (nullable = true)
 |-- snapshot_date: timestamp_ntz (nullable = true)
 |-- mort_acc: double (nullable = true)
 |-- num_tl_op_past_12m: double (nullable = true)
 |-- earliest_cr_line: string (nullable = true)
 |-- last_credit_pull_d: string (nullable = true)
 |-- inq_last_6mths: double (nullable = true)
 |-- inq_last_12m: double (nullable = true)
 |-- inq_fi: double (nullable = true)
 |-- mths_since_last_delinq: double (nullable = true)
 |-- mths_since_last_record: double (nullable = true)
 |-- mths_since_last_major_derog: double (nullable = true)
 |-- mths_since_recent_bc_dlq: double (nullable = true)
 |-- mths_since_recent_inq: double (nullable = true)
 |-- mths_since_recent_revol_delinq: double (nullable = true)
 |-- mths_since_rcnt_il: double (nullable = true)
 |-- mths_since_recent_bc: double (nullable = true)
 |-- acc_now_delinq: double (nullable = true)
 |-- delinq_2yrs: double (nullable = true)
 |-- pub_rec: double (nul

## Config

In [4]:
snapshot_date_str = "2024-01-01"

start_date_str = "2024-01-01"
end_date_str = "2024-12-31"

In [5]:
def generate_date_list(start_date_str, end_date_str):
    start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date()
    end_date = datetime.strptime(end_date_str, "%Y-%m-%d").date()

    current_date = start_date
    date_list = []

    while current_date <= end_date:
        date_list.append(current_date)
        current_date += timedelta(days=1)

    return date_list

In [6]:
date_list = generate_date_list(start_date_str, end_date_str)

## Silver Level Processing

In [14]:
from utils.silver_processing import silver_processing

# silver_processing.py automatically splits the data by month and backfills the data accordingly
# data_window refers to the snapshot dates used
silver_processing(bronze_dir, silver_dir, data_window=date_list)

                                                                                

Silver-level processing complete.


In [18]:
sample_parquets = [
    silver_dir / "credit_history/credit_history_2023-01-01",
    silver_dir / "demographic/demographic_2023-01-01",
    silver_dir / "financial/financial_2023-01-01",
    silver_dir / "loan_terms/loan_terms_2023-01-01"
]

In [25]:
for file in sample_parquets:
    df = spark.read.parquet(str(file))
    print(f"Schema: {file.name}")
    df.printSchema()
    df.select("member_id").show(5, truncate=False)

Schema: credit_history_2023-01-01
root
 |-- member_id: string (nullable = true)
 |-- snapshot_date: timestamp_ntz (nullable = true)
 |-- mort_acc: double (nullable = true)
 |-- num_tl_op_past_12m: double (nullable = true)
 |-- earliest_cr_line: string (nullable = true)
 |-- inq_last_6mths: double (nullable = true)
 |-- inq_last_12m: double (nullable = true)
 |-- inq_fi: double (nullable = true)
 |-- mths_since_last_delinq: double (nullable = true)
 |-- mths_since_recent_inq: double (nullable = true)
 |-- mths_since_rcnt_il: double (nullable = true)
 |-- mths_since_recent_bc: double (nullable = true)
 |-- acc_now_delinq: double (nullable = true)
 |-- delinq_2yrs: double (nullable = true)
 |-- pub_rec: double (nullable = true)
 |-- collections_12_mths_ex_med: double (nullable = true)
 |-- chargeoff_within_12_mths: double (nullable = true)
 |-- tax_liens: double (nullable = true)
 |-- pub_rec_bankruptcies: double (nullable = true)
 |-- num_tl_120dpd_2m: double (nullable = true)
 |-- num_t

## Gold Level Processing

### Label Store

In [7]:
silver_loan_terms_dir = Path("datamart/silver/loan_terms")

In [8]:
from utils.gold_label_store import create_gold_label_store

create_gold_label_store(silver_loan_terms_dir, gold_dir, data_window=date_list)

                                                                                

Gold label store written to: datamart/gold


In [11]:
folder = Path("datamart/gold/label_store/weeks")
parquet_files = list(folder.rglob("*.parquet"))
sample_file = random.choice(parquet_files)
print(f"📂 Sampled file: {sample_file}")
df = spark.read.parquet(str(sample_file))
df.printSchema()
df.show(5, truncate=False)

📂 Sampled file: datamart/gold/label_store/weeks/label_store_partition=label_store_week_14_2024-04-01/part-00002-bdaba0c7-627c-44e0-b0d9-8c99b67d1093.c000.snappy.parquet
root
 |-- member_id: string (nullable = true)
 |-- snapshot_date: date (nullable = true)
 |-- grade: string (nullable = true)
 |-- start_of_week: date (nullable = true)
 |-- snapshot_week: integer (nullable = true)
 |-- snapshot_month: integer (nullable = true)

+-----------+-------------+-----+-------------+-------------+--------------+
|member_id  |snapshot_date|grade|start_of_week|snapshot_week|snapshot_month|
+-----------+-------------+-----+-------------+-------------+--------------+
|MEM_1000049|2024-04-04   |B    |2024-04-01   |14           |4             |
|MEM_1000317|2024-04-04   |G    |2024-04-01   |14           |4             |
|MEM_1000438|2024-04-04   |B    |2024-04-01   |14           |4             |
|MEM_1001   |2024-04-04   |B    |2024-04-01   |14           |4             |
|MEM_1007503|2024-04-04   |B 

### Feature Store