In [1]:
import os
import glob
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pprint
import pyspark
import pyspark.sql.functions as F

from tqdm import tqdm

from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType

In [2]:
# Initialize SparkSession
spark = pyspark.sql.SparkSession.builder \
    .appName("dev") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/01 12:00:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Build Bronze Tables

**Determine partitions** 

Here we decide to partition by dates

In [3]:
# set up config
snapshot_date_str = "2023-01-01"

start_date_str = "2023-01-01"
end_date_str = "2024-12-01"

In [4]:
# generate list of dates to process
def generate_first_of_month_dates(start_date_str, end_date_str):
    # Convert the date strings to datetime objects
    start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
    end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
    
    # List to store the first of month dates
    first_of_month_dates = []

    # Start from the first of the month of the start_date
    current_date = datetime(start_date.year, start_date.month, 1)

    while current_date <= end_date:
        # Append the date in yyyy-mm-dd format
        first_of_month_dates.append(current_date.strftime("%Y-%m-%d"))
        
        # Move to the first of the next month
        if current_date.month == 12:
            current_date = datetime(current_date.year + 1, 1, 1)
        else:
            current_date = datetime(current_date.year, current_date.month + 1, 1)

    return first_of_month_dates

dates_str_lst = generate_first_of_month_dates(start_date_str, end_date_str)
dates_str_lst

['2023-01-01',
 '2023-02-01',
 '2023-03-01',
 '2023-04-01',
 '2023-05-01',
 '2023-06-01',
 '2023-07-01',
 '2023-08-01',
 '2023-09-01',
 '2023-10-01',
 '2023-11-01',
 '2023-12-01',
 '2024-01-01',
 '2024-02-01',
 '2024-03-01',
 '2024-04-01',
 '2024-05-01',
 '2024-06-01',
 '2024-07-01',
 '2024-08-01',
 '2024-09-01',
 '2024-10-01',
 '2024-11-01',
 '2024-12-01']

**Create data lake for bronze tables**

In [5]:
# create bronze datalake
bronze_directory = "datamart/bronze"

if not os.path.exists(bronze_directory):
    os.makedirs(bronze_directory)

In [6]:
def process_bronze_table(table_name, source, db, snapshot_date_str, spark):
    # create bronze table 
    bronze_table = os.path.join(db, table_name)
    if not os.path.exists(bronze_table):
        os.makedirs(bronze_table)

    # prepare arguments
    snapshot_date = datetime.strptime(snapshot_date_str, "%Y-%m-%d")

    # load data - IRL ingest from back end source system
    df = spark.read.csv(source, header=True, inferSchema=True).filter(col('snapshot_date') == snapshot_date)

    # save bronze table to datamart - IRL connect to database to write
    partition_name = snapshot_date_str.replace('-','_') + '.csv'
    filepath = os.path.join(bronze_table, partition_name)
    df.toPandas().to_csv(filepath, index=False)

    return df

Create bronze tables

In [7]:
for date_str in tqdm(dates_str_lst, total=len(dates_str_lst), desc="Processing clickstream"):
    process_bronze_table('clickstream', 'data/feature_clickstream.csv', bronze_directory, date_str, spark)

for date_str in tqdm(dates_str_lst, total=len(dates_str_lst), desc="Processing attributes"):
    process_bronze_table('attributes', 'data/features_attributes.csv', bronze_directory, date_str, spark)

for date_str in tqdm(dates_str_lst, total=len(dates_str_lst), desc="Processing financials"):
    process_bronze_table('financials', 'data/features_financials.csv', bronze_directory, date_str, spark)

for date_str in tqdm(dates_str_lst, total=len(dates_str_lst), desc="Processing lms"):
    process_bronze_table('lms', 'data/lms_loan_daily.csv', bronze_directory, date_str, spark)

Processing clickstream:   0%|          | 0/24 [00:00<?, ?it/s]

Processing clickstream: 100%|██████████| 24/24 [00:30<00:00,  1.26s/it]         
Processing attributes: 100%|██████████| 24/24 [00:07<00:00,  3.39it/s]
Processing financials: 100%|██████████| 24/24 [00:10<00:00,  2.37it/s]
Processing lms: 100%|██████████| 24/24 [00:16<00:00,  1.47it/s]


# Build Silver Tables

## EDA

Take a peek at the bronze tables to find out what sort of preprocessing is needed.

In [55]:
df_clickstream = pd.read_csv(f"{bronze_directory}/clickstream/2023_01_01.csv")
df_clickstream.head()

Unnamed: 0,fe_1,fe_2,fe_3,fe_4,fe_5,fe_6,fe_7,fe_8,fe_9,fe_10,...,fe_13,fe_14,fe_15,fe_16,fe_17,fe_18,fe_19,fe_20,Customer_ID,snapshot_date
0,63,118,80,121,55,193,111,112,-101,83,...,-16,-81,-126,114,35,85,-73,76,CUS_0x1037,2023-01-01
1,-108,182,123,4,-56,27,25,-6,284,222,...,-14,-96,200,35,130,94,111,75,CUS_0x1069,2023-01-01
2,-13,8,87,166,214,-98,215,152,129,139,...,26,86,171,125,-130,354,17,302,CUS_0x114a,2023-01-01
3,-85,45,200,89,128,54,76,51,61,139,...,172,96,174,163,37,207,180,118,CUS_0x1184,2023-01-01
4,55,120,226,-86,253,97,107,68,103,126,...,76,43,183,159,-26,104,118,184,CUS_0x1297,2023-01-01


In [53]:
df_attributes = pd.read_csv(f"{bronze_directory}/attributes/2023_01_01.csv")
df_attributes.head()

Unnamed: 0,Customer_ID,Name,Age,SSN,Occupation,snapshot_date
0,CUS_0x1037,Matthewm,45,230-22-9583,Accountant,2023-01-01
1,CUS_0x1069,Andreas Cremero,32,761-27-5143,Accountant,2023-01-01
2,CUS_0x114a,Valetkevitchu,43,133-89-5234,Developer,2023-01-01
3,CUS_0x1184,Cohenq,49,963-76-2464,Lawyer,2023-01-01
4,CUS_0x1297,Edwardsz,46,#F%$D@*&8,Manager,2023-01-01


In [None]:
df_financials = pd.read_csv(f"{bronze_directory}/financials/2023_01_01.csv")
df_financials.head()

Unnamed: 0,Customer_ID,Annual_Income,Monthly_Inhand_Salary,Num_Bank_Accounts,Num_Credit_Card,Interest_Rate,Num_of_Loan,Type_of_Loan,Delay_from_due_date,Num_of_Delayed_Payment,...,Credit_Mix,Outstanding_Debt,Credit_Utilization_Ratio,Credit_History_Age,Payment_of_Min_Amount,Total_EMI_per_month,Amount_invested_monthly,Payment_Behaviour,Monthly_Balance,snapshot_date
0,CUS_0x1037,15989.085,1086.42375,5,4,2,4,"Credit-Builder Loan, Auto Loan, Auto Loan, and...",13,15,...,Good,665.82,40.697699,19 Years and 9 Months,No,33.797021,80.46523951443457,Low_spent_Small_value_payments,284.380115,2023-01-01
1,CUS_0x1069,58637.34,4799.445,4,6,10,119,"Personal Loan, Auto Loan, and Not Specified",9,17,...,Standard,208.8,25.233144,30 Years and 8 Months,Yes,139.885013,165.21061289035896,High_spent_Small_value_payments,434.848874,2023-01-01
2,CUS_0x114a,15305.46,1230.455,0,7,2,2,"Student Loan, and Home Equity Loan",14,2,...,Good,642.42_,27.525113,15 Years and 9 Months,No,20.301654,64.77848007633177,Low_spent_Small_value_payments,327.965366,2023-01-01
3,CUS_0x1184,19867.475,1396.622917,3,5,11,3,"Student Loan, Mortgage Loan, and Payday Loan",10,9,...,Good,707.29,26.68979,32 Years and 8 Months,No,42.606882,23.460944042729498,!@9#%8,313.594466,2023-01-01
4,CUS_0x1297,57738.06_,4881.505,9,8,30,9,"Payday Loan, Personal Loan, Payday Loan, Perso...",61,24,...,Bad,3916.47,25.742143,13 Years and 8 Months,Yes,296.284136,53.82117764831425,High_spent_Medium_value_payments,388.045187,2023-01-01


In [None]:
df_lms = pd.read_csv(f"{bronze_directory}/lms/2023_01_01.csv")
df_lms.head()

Unnamed: 0,loan_id,Customer_ID,loan_start_date,tenure,installment_num,loan_amt,due_amt,paid_amt,overdue_amt,balance,snapshot_date
0,CUS_0x1037_2023_01_01,CUS_0x1037,2023-01-01,10,0,10000,0.0,0.0,0.0,10000.0,2023-01-01
1,CUS_0x1069_2023_01_01,CUS_0x1069,2023-01-01,10,0,10000,0.0,0.0,0.0,10000.0,2023-01-01
2,CUS_0x114a_2023_01_01,CUS_0x114a,2023-01-01,10,0,10000,0.0,0.0,0.0,10000.0,2023-01-01
3,CUS_0x1184_2023_01_01,CUS_0x1184,2023-01-01,10,0,10000,0.0,0.0,0.0,10000.0,2023-01-01
4,CUS_0x1297_2023_01_01,CUS_0x1297,2023-01-01,10,0,10000,0.0,0.0,0.0,10000.0,2023-01-01


### Check data types for each table

#### Clickstream

In [20]:
df_clickstream.dtypes

fe_1              int64
fe_2              int64
fe_3              int64
fe_4              int64
fe_5              int64
fe_6              int64
fe_7              int64
fe_8              int64
fe_9              int64
fe_10             int64
fe_11             int64
fe_12             int64
fe_13             int64
fe_14             int64
fe_15             int64
fe_16             int64
fe_17             int64
fe_18             int64
fe_19             int64
fe_20             int64
Customer_ID      object
snapshot_date    object
dtype: object

Observations:
- snapshot_date should be datetime

#### Attributes

In [21]:
df_attributes.dtypes

Customer_ID      object
Name             object
Age              object
SSN              object
Occupation       object
snapshot_date    object
dtype: object

Observations:
- Age should be numeric
- snapshot_date should be datetime

#### Financials

In [22]:
df_financials.dtypes

Customer_ID                  object
Annual_Income                object
Monthly_Inhand_Salary       float64
Num_Bank_Accounts             int64
Num_Credit_Card               int64
Interest_Rate                 int64
Num_of_Loan                  object
Type_of_Loan                 object
Delay_from_due_date           int64
Num_of_Delayed_Payment       object
Changed_Credit_Limit         object
Num_Credit_Inquiries        float64
Credit_Mix                   object
Outstanding_Debt             object
Credit_Utilization_Ratio    float64
Credit_History_Age           object
Payment_of_Min_Amount        object
Total_EMI_per_month         float64
Amount_invested_monthly      object
Payment_Behaviour            object
Monthly_Balance             float64
snapshot_date                object
dtype: object

Observations:
- Annual_Income, Num_of_Loan, Num_of_Delayed_Payment, Changed_Credit_Limit, Outstanding_Debt, Amount_invested_monthly should be numeric
- snapshot_date should be datetime

#### LMS

In [24]:
df_lms.dtypes

loan_id             object
Customer_ID         object
loan_start_date     object
tenure               int64
installment_num      int64
loan_amt             int64
due_amt            float64
paid_amt           float64
overdue_amt        float64
balance            float64
snapshot_date       object
dtype: object

Observations:
- snapshot_date should be datetime

Check data distribution

In [59]:
df_attributes['Age'] = pd.to_numeric(df_attributes['Age'], errors='coerce')

In [61]:
df_attributes.describe()

Unnamed: 0,Age
count,508.0
mean,114.40748
std,785.781128
min,-500.0
25%,25.0
50%,34.0
75%,43.0
max,8547.0


In [56]:
df_attributes[df_attributes.duplicated(subset=['Name'])]

Unnamed: 0,Customer_ID,Name,Age,SSN,Occupation,snapshot_date
316,CUS_0x7fe2,Swethau,36,517-05-0352,Developer,2023-01-01


In [39]:
df_attributes[df_attributes.duplicated(subset=['SSN'])]

Unnamed: 0,Customer_ID,Name,Age,SSN,Occupation,snapshot_date
21,CUS_0x16f4,Forgionez,37,#F%$D@*&8,Media_Manager,2023-01-01
41,CUS_0x1c9c,Dougq,28,#F%$D@*&8,Musician,2023-01-01
50,CUS_0x2297,Sergio Goncalvesc,43,#F%$D@*&8,Teacher,2023-01-01
74,CUS_0x2cae,Dunaiu,28,#F%$D@*&8,Journalist,2023-01-01
94,CUS_0x34fa,Carolined,35,#F%$D@*&8,Mechanic,2023-01-01
106,CUS_0x3873,McCrankk,6283,#F%$D@*&8,Media_Manager,2023-01-01
115,CUS_0x3adc,Sakari Suoninenp,46,#F%$D@*&8,Journalist,2023-01-01
119,CUS_0x3b2a,Carolinez,23,#F%$D@*&8,Scientist,2023-01-01
123,CUS_0x3bd0,Katyax,15,#F%$D@*&8,Entrepreneur,2023-01-01
146,CUS_0x41c,Moonb,50,#F%$D@*&8,Developer,2023-01-01


In [32]:
df_attributes['Occupation'].unique()

array(['Accountant', 'Developer', 'Lawyer', 'Manager', 'Doctor',
       'Mechanic', 'Journalist', 'Media_Manager', 'Teacher',
       'Entrepreneur', 'Writer', 'Musician', 'Engineer', '_______',
       'Scientist', 'Architect'], dtype=object)

**Observations**

## Building the silver tables

Create data lake for silver tables

In [51]:
# create silver datalake
silver_directory = "datamart/silver"

if not os.path.exists(silver_directory):
    os.makedirs(silver_directory)

Define data types for columns

In [52]:
clickstream = {
    **{f'fe_{i}': IntegerType() for i in range(1, 21)},
    'Customer_ID': StringType(),
    'snapshot_date': DateType()
}

In [53]:
attributes = {
    'Customer_ID': StringType(),
    'Name': StringType(),
    'Age': IntegerType(),
    'SSN': StringType(),
    'Occupation': StringType(),
    'snapshot_date': DateType()
}

In [None]:
attributes = {
    'Customer_ID': StringType(),
    'Name': StringType(),
    'Age': IntegerType(),
    'SSN': StringType(),
    'Occupation': StringType(),
    'snapshot_date': DateType()
}