In [2]:
import os
import shutil
import argparse
from datetime import datetime

import pyspark
from pyspark.sql import SparkSession

from utils.data_preprocessing_bronze_table import *
from utils.data_preprocessing_silver_table import *
from utils.helper import *


current_directory = os.path.dirname(os.path.abspath(os.path.join(os.getcwd(), "cs611-assignment-1")))
csv_dir = os.path.join(current_directory, "data")
print(current_directory)

# Other parts
data_mart_dir = os.path.join(current_directory, "datamart")
bronze_dir = os.path.join(data_mart_dir, "bronze")
silver_dir = os.path.join(data_mart_dir, "silver")
gold_dir = os.path.join(data_mart_dir, "gold")



c:\Users\Admin\Desktop\SMU\CS611\cs611-assignment-1


In [3]:
# Run this to refresh the dataset
# Refresh current directory
if os.path.exists(data_mart_dir):
    shutil.rmtree(data_mart_dir)

os.mkdir(data_mart_dir)
os.mkdir(bronze_dir)
os.mkdir(silver_dir)
os.mkdir(gold_dir)

In [4]:
def data_prep_bronze(start_date, end_date, spark : SparkSession):
    print('\n\n---starting Bronze Table job---\n\n')

    # Get all the datetimes 
    dates_str_list = generate_first_of_month_dates(start_date, end_date)

    # We can build the bronze table
    # Get csvs
    csv_files = os.listdir(csv_dir)

    for csv_file in csv_files:
        csv_full_dir = os.path.join(csv_dir, csv_file)
        for date_str in dates_str_list:
            print("Preparing bronze table {}".format(csv_file))
            prepare_bronze_table_daily(csv_full_dir, bronze_dir, spark, date_str)

    # Now we can prepare the silver
    # for csv_file in csv_files:
    #     csv_full_dir = os.path.join(csv_dir, csv_file)
    #     csv_type = csv_file.rstrip(".csv")

    #     if csv_type == "lms_loan_daily":
    #         # Get all the lms_loan_daily files
    #         for date_str in dates_str_list:
    #             expected_lms_loan_daily_file_name = "bronze_" + csv_type + "_" + date_str + ".csv"
    #             expected_full_dir = os.path.join(bronze_dir, expected_lms_loan_daily_file_name)

    #             process_silver_table_loan_daily(expected_full_dir,
    #                                             silver_dir,
    #                                             date_str,
    #                                             spark)
                
    #     elif csv_type == "feature_finanicals":
    #         # Get all feature_financials files
    #         for date_str in dates_str_list:
    #             expected_feature_financials_file_name = "bronze_" + csv_type + "_" + date_str + ".csv"
    #             expected_full_dir = os.path.join(bronze_dir, expected_feature_financials_file_name)

    #             process_silver_table_feature_financials(expected_full_dir,
    #                                                     silver_dir,
    #                                                     date_str,
    #                                                     spark)

In [5]:
def data_prep_silver(start_date, end_date, spark : SparkSession):
    print('\n\n---starting Silver table job---\n\n')

    # Get all the datetimes 
    dates_str_list = generate_first_of_month_dates(start_date, end_date)

    # We can build the silver table
    for date_str in dates_str_list:
        # Build the silver table for each csv
        expected_lms_loan_daily_file_name = "bronze_lms_loan_daily_" + date_str + ".csv"
        expected_loan_full_dir = os.path.join(bronze_dir, expected_lms_loan_daily_file_name)

        process_silver_table_loan_daily(expected_loan_full_dir,
                                        silver_dir,
                                        date_str,
                                        spark)
        
        expected_feature_financials_file_name = "bronze_features_financial_" + date_str + ".csv"
        expected_financial_full_dir = os.path.join(bronze_dir, expected_feature_financials_file_name)

        process_silver_table_feature_financials(expected_financial_full_dir,
                                                silver_dir,
                                                date_str,
                                                spark)

In [6]:
spark = pyspark.sql.SparkSession.builder \
        .config("spark.driver.memory", "4g") \
        .config("spark.executor.memory", "4g") \
        .appName("dev") \
        .master("local[*]") \
        .getOrCreate()
    
# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")

In [7]:
data_prep_bronze("2023-01-01", "2024-12-01", spark)



---starting Bronze Table job---


Preparing bronze table features_attributes.csv
Row Count for Date 2023-01-01 00:00:00 : 530
Bronze features_attribute Daily Date 2023-01-01 00:00:00 saved to : c:\Users\Admin\Desktop\SMU\CS611\cs611-assignment-1\datamart\bronze\bronze_features_attribute_2023-01-01.csv
Preparing bronze table features_attributes.csv
Row Count for Date 2023-02-01 00:00:00 : 501
Bronze features_attribute Daily Date 2023-02-01 00:00:00 saved to : c:\Users\Admin\Desktop\SMU\CS611\cs611-assignment-1\datamart\bronze\bronze_features_attribute_2023-02-01.csv
Preparing bronze table features_attributes.csv
Row Count for Date 2023-03-01 00:00:00 : 506
Bronze features_attribute Daily Date 2023-03-01 00:00:00 saved to : c:\Users\Admin\Desktop\SMU\CS611\cs611-assignment-1\datamart\bronze\bronze_features_attribute_2023-03-01.csv
Preparing bronze table features_attributes.csv
Row Count for Date 2023-04-01 00:00:00 : 510
Bronze features_attribute Daily Date 2023-04-01 00:00:00 saved to

In [12]:
def process_silver_table_feature_financials(bronze_feature_financials : str, silver_table_dir : str, date : str, spark : SparkSession):
    """
    Process Feature finanicals
    """
    df = spark.read.csv(bronze_feature_financials, header=True, inferSchema=True)
    print(f"Loaded {bronze_feature_financials}, row count {df.count()}")

    column_type_map = {
        "Customer_ID": StringType(),
        "Annual_Income": FloatType(),
        "Monthly_Inhand_Salary": FloatType(),
        "Num_Bank_Accounts": IntegerType(),
        "Num_Credit_Card": IntegerType(),
        "Interest_Rate": IntegerType(),
        "Num_of_Loan": IntegerType(), # Has weird values, need to handle 
        "Type_of_Loan": StringType(),  # List-like string, can parse later
        "Delay_from_due_date": IntegerType(),
        "Num_of_Delayed_Payment": IntegerType(),
        "Changed_Credit_Limit": FloatType(),
        "Num_Credit_Inquiries": FloatType(),
        "Credit_Mix": StringType(),  # e.g. "Bad", "Good", etc.
        "Outstanding_Debt": FloatType(),
        "Credit_Utilization_Ratio": FloatType(),
        "Credit_History_Age": FloatType(),  # Date is in Year and months, need to parse
        "Payment_of_Min_Amount": StringType(),  # "Yes"/"No"
        "Total_EMI_per_month": FloatType(),
        "Amount_invested_monthly": FloatType(),
        "Payment_Behaviour": StringType(),  # Categorical, need more information to determine whether to remain categorical or ordinal
        "Monthly_Balance": FloatType(),
        "snapshot_date": DateType()
    }

    for column, new_type in column_type_map.items():
        # if column == "Type_of_Loan":
        #     df = df.withColumn(column, parse_type_of_loan_udf(col(column)).cast(new_type))
        if column == "Credit_Mix":
            df = df.withColumn(column, parse_credit_mix_udf(col(column)).cast(new_type))
        elif column == "Credit_History_Age":
            df = df.withColumn(column, parse_credit_history_age_udf(col(column)).cast(new_type))
        elif column == "Payment_of_Min_Amount":
            df = df.withColumn(column, parse_payment_min_amount_udf(col(column)).cast(new_type))
        elif column == "Changed_Credit_Limit":
            df = df.withColumn(column, parse_changed_credit_limit_udf(col(column)).cast(new_type))
        # elif column == "Payment_Behaviour":
        #     df = df.withColumn(column, parse_payment_behaviour_udf(col(column)).cast(new_type))
        # # Need to fix the values
        elif new_type == IntegerType():
            df = df.withColumn(column, parse_int_udf(col(column)).cast(new_type))
        # elif new_type == FloatType():
        #     df = df.withColumn(column, parse_float_udf(col(column)))
        else:
            df = df.withColumn(column, col(column).cast(new_type))

    # We can do some repair on the number of loans by checking the values from the loan types
    # and replacing the value inside depending on the number of loans
    # df = df.withColumn("Num_of_Loan", get_true_number_of_loan_udf(col("Type_of_Loan")).cast(IntegerType()))

    # We need to remove any anomalous values within the dataset
    # Preseeded some None values so we can remove that first
    df = df.dropna()

    # Remove anomalous data based on certain values
    # e.g. > 50 credit cards, more than 50 loans, CUS_0x1140 where monthly salary 914 but yearly salary is 14 millions
    # df = df.filter(col("Outstanding_Debt") >= 0)
    print("---------Filtering Anomalous Data------------")
    df = df.filter(col("Annual_Income") >= 0)
    df = df.filter(col("Monthly_Inhand_Salary") >= 0)
    df = df.filter((col("Num_Bank_Accounts") <= 20) & (col("Num_Bank_Accounts") > 0))
    df = df.filter((col("Num_Credit_Card") <= 20) & (col("Num_Credit_Card") >= 0))
    df = df.filter((col("Num_of_Loan") <= 20) & (col("Num_of_Loan") >= 0))
    # df = df.filter(col("Payment_Behaviour").contains("payment"))

    # ! My computer sucks I cant run this in time
    # df = df.filter((col("Interest_Rate") <= 600) & (col("Interest_Rate") >= 0)) # According to online, max rating is ~600%
    # df = df.filter((col("Monthly_Inhand_Salary") * 12 * 4 < col("Annual_Income")))

    # Remove filter columns, then change loan type to counter column
    print("---------Extracting Loan Data------------")
    # df_split = df.withColumn("loan_type_array", split("Type_of_Loan", ","))
    # df_exploded = df_split.withColumn("loan_type", explode("loan_type_array"))
    # df_normalized = df_exploded.withColumn("loan_type", trim(lower("loan_type")))
    # df_normalized.show(10)
    # loan_types = [row.loan_type for row in df_normalized.select("loan_type").distinct().collect()]

    # for loan in loan_types:
    #     df = df.withColumn(
    #         loan,
    #         when(lower(col("Type_of_Loan")).contains(loan), 1).otherwise(0)
    
    #     )
    
    # We then can count the number of each time of loan based off the Num_of_loan column

    # save silver table - IRL connect to database to write
    print("---------Writing Pandas File------------")
    partition_name = "silver_feature_finanicals_" + date + '.csv'
    filepath = os.path.join(silver_table_dir, partition_name)
    df.show(200)

    df.toPandas().to_csv(filepath, index=False)
    print('saved to:', filepath, " Row count : ", df.count())
    
    return df

In [13]:
sample_dir = os.path.join(current_directory, "datamart", "bronze", "bronze_features_financial_2023-01-01.csv")

process_silver_table_feature_financials(sample_dir,
                                        silver_dir,
                                        "2023-01-01",
                                        spark)

Loaded c:\Users\Admin\Desktop\SMU\CS611\cs611-assignment-1\datamart\bronze\bronze_features_financial_2023-01-01.csv, row count 530
---------Filtering Anomalous Data------------
---------Extracting Loan Data------------
---------Writing Pandas File------------
+-----------+-------------+---------------------+-----------------+---------------+-------------+-----------+--------------------+-------------------+----------------------+--------------------+--------------------+----------+----------------+------------------------+------------------+---------------------+-------------------+-----------------------+--------------------+---------------+-------------+
|Customer_ID|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|        Type_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|Outstanding_Debt|Credit_Utilization_Ratio|Credit_History_Age|Payment_of_Min_Amount|Total_EMI_per_month|Amount

DataFrame[Customer_ID: string, Annual_Income: float, Monthly_Inhand_Salary: float, Num_Bank_Accounts: int, Num_Credit_Card: int, Interest_Rate: int, Num_of_Loan: int, Type_of_Loan: string, Delay_from_due_date: int, Num_of_Delayed_Payment: int, Changed_Credit_Limit: float, Num_Credit_Inquiries: float, Credit_Mix: string, Outstanding_Debt: float, Credit_Utilization_Ratio: float, Credit_History_Age: float, Payment_of_Min_Amount: string, Total_EMI_per_month: float, Amount_invested_monthly: float, Payment_Behaviour: string, Monthly_Balance: float, snapshot_date: date]

In [None]:
data_prep_silver("2023-01-01", "2024-12-01", spark)