In [None]:
import pandas as pd
import os
import glob
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pprint
import pyspark
import pyspark.sql.functions as F

from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType

import utils.data_processing_bronze_table
import utils.data_processing_silver_table
import utils.data_processing_gold_table


In [None]:
# Initialize SparkSession
spark = pyspark.sql.SparkSession.builder \
    .appName("dev") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")

<H2>Read files

In [3]:
clickstream_df = spark.read.csv("data/feature_clickstream.csv", header=True, inferSchema=True)
attributes_df = spark.read.csv("data/features_attributes.csv", header=True, inferSchema=True)
financials_df = spark.read.csv("data/features_financials.csv", header=True, inferSchema=True)
loans_df = spark.read.csv("data/lms_loan_daily.csv", header=True, inferSchema=True)

<H2> BUILD Bronze DB</H2>

In [4]:
bronze_directory = "datamart/bronze"
if not os.path.exists(bronze_directory):
    os.makedirs(bronze_directory)

utils.data_processing_bronze_table.process_bronze_table(bronze_directory, clickstream_df, attributes_df, financials_df, loans_df)

(DataFrame[fe_1: int, fe_2: int, fe_3: int, fe_4: int, fe_5: int, fe_6: int, fe_7: int, fe_8: int, fe_9: int, fe_10: int, fe_11: int, fe_12: int, fe_13: int, fe_14: int, fe_15: int, fe_16: int, fe_17: int, fe_18: int, fe_19: int, fe_20: int, Customer_ID: string, snapshot_date: date, year: int, month: int],
 DataFrame[Customer_ID: string, Name: string, Age: string, SSN: string, Occupation: string, snapshot_date: date, year: int, month: int],
 DataFrame[Customer_ID: string, Annual_Income: string, Monthly_Inhand_Salary: double, Num_Bank_Accounts: int, Num_Credit_Card: int, Interest_Rate: int, Num_of_Loan: string, Type_of_Loan: string, Delay_from_due_date: int, Num_of_Delayed_Payment: string, Changed_Credit_Limit: string, Num_Credit_Inquiries: double, Credit_Mix: string, Outstanding_Debt: string, Credit_Utilization_Ratio: double, Credit_History_Age: string, Payment_of_Min_Amount: string, Total_EMI_per_month: double, Amount_invested_monthly: string, Payment_Behaviour: string, Monthly_Balanc

<h2>Build Silver DB</h2>

In [5]:
silver_directory = f"datamart/silver"
if not os.path.exists(silver_directory):
    os.makedirs(silver_directory)
    
utils.data_processing_silver_table.process_silver_table(silver_directory, bronze_directory, spark)

Starting Silver Layer Processing...
Cleaning clickstream data...
Cleaning attributes data...
Cleaning financial data...
Cleaning loans data...
Creating loan customer monthly aggregations...
Silver Layer Processing Completed


(DataFrame[fe_1: int, fe_2: int, fe_3: int, fe_4: int, fe_5: int, fe_6: int, fe_7: int, fe_8: int, fe_9: int, fe_10: int, fe_11: int, fe_12: int, fe_13: int, fe_14: int, fe_15: int, fe_16: int, fe_17: int, fe_18: int, fe_19: int, fe_20: int, Customer_ID: string, snapshot_date: date, year: int, month: int, total_activity: int, avg_activity: double, positive_features: int, negative_features: int],
 DataFrame[Customer_ID: string, Name: string, Age: int, SSN: string, Occupation: string, snapshot_date: date, year: int, month: int],
 DataFrame[Customer_ID: string, Annual_Income: double, Monthly_Inhand_Salary: double, Num_Bank_Accounts: int, Num_Credit_Card: int, Interest_Rate: int, Num_of_Loan: double, Type_of_Loan: string, Delay_from_due_date: int, Num_of_Delayed_Payment: double, Changed_Credit_Limit: string, Num_Credit_Inquiries: double, Credit_Mix: string, Outstanding_Debt: double, Credit_Utilization_Ratio: double, Credit_History_Age: string, Payment_of_Min_Amount: string, Total_EMI_per_m

<h2> Build Gold DB</h2>

In [6]:
gold_directory = f"datamart/gold"
if not os.path.exists(gold_directory):
    os.makedirs(gold_directory)
    
utils.data_processing_gold_table.process_gold_table(gold_directory, silver_directory, spark)

Starting Gold Layer Processing...
Creating Customer 360 view...
Creating Loan Portfolio Summary...


                                                                                

Gold Layer Processing Completed
