# Credit Default Prediction on Amex Dataset

### Importing the necessary libraries

In [1]:
import numpy as np
import pandas as pd 

import pyspark
from pyspark import StorageLevel
from pyspark.sql import (
    SparkSession, 
    types, 
    functions as F,
)
from pyspark.sql.functions import (
    col,
    isnan,
    when,
    count,
)
from pyspark.ml import Pipeline 
from pyspark.ml.feature import (
    OneHotEncoder, 
    StringIndexer, 
    VectorAssembler, 
    Imputer,
)
from pyspark.ml.classification import (
    LogisticRegression, 
    LinearSVC,
    DecisionTreeClassifier,
    GBTClassifier,
    RandomForestClassifier,
)
from pyspark.ml.evaluation import (
    BinaryClassificationEvaluator,
    MulticlassClassificationEvaluator,
)

import itertools

import pickle

### Create a Spark Session

In [2]:
spark = SparkSession.builder \
                    .appName("amex-app") \
                    .master("local[*]") \
                    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/12/23 19:46:08 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/12/23 19:46:08 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/12/23 19:46:08 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/12/23 19:46:08 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


### Important Global Variables

In [3]:
TRAIN_DATA_PATH = 'gs://icdp-bigdata-bucket/train_data.csv'
TRAIN_LABEL_PATH = 'gs://icdp-bigdata-bucket/train_labels.csv'

### Miscellaneous Utility Functions

In [4]:
## Function to create a Schema Object for the Dataframe 
def create_spark_schema(series):
    fields = list()
    
    for value in series: 
        if value in string_dtypes:
            fields.append(
                types.StructField(
                    value, 
                    types.StringType(), 
                    True,
                )
            )
        elif value in date_dtypes:
            fields.append(
                types.StructField(
                    value, 
                    types.DateType(), 
                    True,
                )
            )
        elif value in integer_dtypes:
            fields.append(
                types.StructField(
                    value, 
                    types.IntegerType(), 
                    True,
                )
            )
        else:
            fields.append(
                types.StructField(
                    value, 
                    types.FloatType(), 
                    True,
                )
            )
    return types.StructType(fields)

In [5]:
#Add Suffix to List Elements
def add_suffix(names, suffix):
    return [name + suffix for name in names]

In [6]:
# Drop Columns with Null values above a certain threshold
def dropNullColumns(df, threshold):
    """
    This function drops columns containing all null values.
    :param df: A PySpark DataFrame
    """
  
    null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(
        c) for c in df.columns]).collect()[0].asDict()
    print("null counts calculated...")
    df_count = df.count()
    col_to_drop = [k for k, v in null_counts.items() if v >(df_count * threshold)]  
    print("columns to drop found...")
    df = df.drop(*col_to_drop)  
  
    return df, col_to_drop

### Reading the Dataframe

#### Reading the First 20 rows only

In [7]:
train_df_temp = spark.read.option(
    "header", 'true',
).csv(
    TRAIN_DATA_PATH,
).limit(
    20
)
train_labels_temp = spark.read.option(
    "header", 'true',
).csv(
    TRAIN_LABEL_PATH,
).limit(
    20
)

                                                                                

#### Define Schema Using Sampled Temporary Dataframe

In [8]:
## Known Datatypes: 

string_dtypes = ["customer_ID", 'B_30', 'B_38', 'D_114', 'D_116', 'D_117', 'D_120', 'D_126', 'D_63', 'D_64', 'D_66', 'D_68']
date_dtypes = ['S_2']
integer_dtypes = ['target']

In [9]:
train_schema = create_spark_schema(train_df_temp.columns)
label_schema = create_spark_schema(train_labels_temp.columns)

#### Remove Temp Datasets from Memory

In [10]:
train_df_temp.unpersist()
train_labels_temp.unpersist()

del train_df_temp
del train_labels_temp

#### Reading the Whole Dataset with the Inferred Schema

In [11]:
train_df = spark.read.option(
    "header", 
    "true",
).csv(
    TRAIN_DATA_PATH, 
    schema=train_schema
)
label_df = spark.read.option(
    "header", 
    "true",
).csv(
    TRAIN_LABEL_PATH, 
    schema=label_schema,
)

In [12]:
## Other categorization of the known dtypes
info_cols = ['customer_ID', 'S_2']
target_cols = ['target']
cat_cols = ['B_30', 'B_38', 'D_114', 'D_116', 'D_117', 'D_120', 'D_126', 'D_63', 'D_64', 'D_66', 'D_68']


# Define Numeric Columns
excluded = info_cols + cat_cols
num_cols = [col for col in train_df.columns if col not in excluded]

### Preprocessing of the Dataset

#### Dropping Null Columns

In [13]:
## Remove All Columns with More than 5% Missing Values
train_df, cols_to_drop = dropNullColumns(train_df, 0.05)

22/12/23 19:46:23 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

null counts calculated...




columns to drop found...


                                                                                

#### Remove Less Important Column S_2

In [14]:
## Remove the S_2 variable as the testing data and the training data are in different time periods 
train_df = train_df.drop("S_2")

In [15]:
cols_to_drop.append("S_2")

#### Converting Categorical Columns to Numeric using StringIndexer

In [16]:
cat_columns_to_index = list(set(train_df.columns) & set(cat_cols))

In [17]:
cat_cols_indexed = add_suffix(cat_columns_to_index, "_index")

## Create StringIndexer Object
indexer = StringIndexer(
    inputCols=cat_columns_to_index,
    outputCols=cat_cols_indexed,
)
indexer.setHandleInvalid("keep")
indexer_model = indexer.fit(train_df)

train_df = indexer_model.transform(train_df)

                                                                                

#### Impute values for numerical columns

In [18]:
num_columns_to_impute = list(set(train_df.columns) & set(num_cols))

In [19]:
num_cols_imputed = add_suffix(num_columns_to_impute, "_imputed")

##Create Imputer
imputer = Imputer(
    inputCols=num_columns_to_impute,
    outputCols=num_cols_imputed,
)
imputer.setStrategy("median")

imputer_model = imputer.fit(train_df)

train_df = imputer_model.transform(train_df)

                                                                                

#### OneHotEncode the Categorical Columns

In [20]:
cat_cols_ohe = add_suffix(cat_cols_indexed, "_ohe")

### Create Ohe Object
ohe = OneHotEncoder(
    inputCols = cat_cols_indexed,
    outputCols = cat_cols_ohe,
)

ohe_model = ohe.fit(train_df)

train_df = ohe_model.transform(train_df)

In [21]:
useful_cols = ["customer_ID"] + cat_cols_ohe + num_cols_imputed

### Remove Unnecessary Columns and Aggregate

In [22]:
train_df = train_df.select(*useful_cols)

In [23]:
new_num_cols = []
for num_col in num_cols_imputed:
    new_name = num_col.split("_")[0] + "_" + num_col.split("_")[1]
    new_num_cols.append(new_name)
    train_df = train_df.withColumnRenamed(num_col, new_name)
new_cat_cols = []
for cat_col in cat_cols_ohe:
    new_name = cat_col.split("_")[0] + "_" + cat_col.split("_")[1]
    new_cat_cols.append(new_name)
    train_df = train_df.withColumnRenamed(cat_col, new_name)

In [24]:
## Aggregation Functions
num_funcs = [
    (F.mean, "_mean"),
     (F.min, "_min"),
     (F.max, "_max"),
]

cat_funcs = [
    (F.count, "_count"),
    (F.last, "_last"),
    (F.countDistinct, "_nunique"),
]

In [25]:
agg_num_args = [
    func(col).alias(col + suffix) 
    for col, (func, suffix) in itertools.product(new_num_cols, num_funcs)]

agg_cols_args = [
    func(col).alias(col + suffix) 
    for col, (func, suffix) in itertools.product(new_cat_cols, cat_funcs)]

# Combine numeric and categoric agg arguments
agg_args = agg_num_args + agg_cols_args

In [26]:
train_df = train_df.groupBy("customer_ID").agg(*agg_args)

In [27]:
train_df = train_df.join(
    F.broadcast(label_df), 
    on="customer_ID",
)

In [28]:
va_model = VectorAssembler(
    inputCols=train_df.drop(
        "customer_ID",
        "target",
    ).columns,
    outputCol="features",
    handleInvalid="skip",
)

In [29]:
train_df = va_model.transform(
    train_df,
).select(
    [
        "customer_ID", 
        "features", 
        "target",
    ]
).persist(
    StorageLevel.DISK_ONLY,
)

                                                                                

### Train Test Split

In [30]:
train_split, test_split = train_df.randomSplit(weights = [0.8, 0.2], seed = 42)

### Fit Models

#### Logistic Regression

In [None]:
lr = LogisticRegression(
    featuresCol="features",
    labelCol="target",
)
lr_model = lr.fit(train_split)

22/12/23 19:04:28 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/12/23 19:04:28 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [None]:
lr_preds = lr_model.transform(test_split)

In [None]:
binEval = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="target",metricName="areaUnderROC")
multiEval = MulticlassClassificationEvaluator(labelCol = "target", predictionCol = "prediction")

In [None]:
print("AUCROC: ", binEval.evaluate(lr_preds))
print("Accuracy: ", multiEval.evaluate(lr_preds, {multiEval.metricName: "accuracy"}))
print("F1 Score: ", multiEval.evaluate(lr_preds, {multiEval.metricName: "f1"}))
print("Weighted Precision: ", multiEval.evaluate(lr_preds, {multiEval.metricName: "weightedPrecision"}))
print("Weighted Recall: ", multiEval.evaluate(lr_preds, {multiEval.metricName: "weightedRecall"}))

                                                                                

AUCROC:  0.8486418824365403


                                                                                

Accuracy:  0.8894452036075431


                                                                                

F1 Score:  0.8885082375446438


                                                                                

Weighted Precision:  0.8879481704342884




Weighted Recall:  0.8894452036075431


                                                                                

#### Decision Tree Classifier

In [None]:
dt = DecisionTreeClassifier(
    featuresCol="features", 
    labelCol="target"
)
dt_model = dt.fit(train_split)

                                                                                

In [None]:
dt_preds = dt_model.transform(test_split)

In [None]:
binEval = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="target",metricName="areaUnderROC")
multiEval = MulticlassClassificationEvaluator(labelCol = "target", predictionCol = "prediction")

In [None]:
print("AUCROC: ", binEval.evaluate(dt_preds))
print("Accuracy: ", multiEval.evaluate(dt_preds, {multiEval.metricName: "accuracy"}))
print("F1 Score: ", multiEval.evaluate(dt_preds, {multiEval.metricName: "f1"}))
print("Weighted Precision: ", multiEval.evaluate(dt_preds, {multiEval.metricName: "weightedPrecision"}))
print("Weighted Recall: ", multiEval.evaluate(dt_preds, {multiEval.metricName: "weightedRecall"}))

                                                                                

AUCROC:  0.8200578293739653


                                                                                

Accuracy:  0.8605848592511616


                                                                                

F1 Score:  0.8606816617623854


                                                                                

Weighted Precision:  0.860781015714613




Weighted Recall:  0.8605848592511615


                                                                                

#### Linear SVC

In [None]:
svc = LinearSVC(
    featuresCol="features",
    labelCol="target",
)
svc_model = svc.fit(train_split)

                                                                                

In [None]:
svc_preds = svc_model.transform(test_split)

In [None]:
binEval = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="target",metricName="areaUnderROC")
multiEval = MulticlassClassificationEvaluator(labelCol = "target", predictionCol = "prediction")

In [None]:
print("AUCROC: ", binEval.evaluate(svc_preds))
print("Accuracy: ", multiEval.evaluate(svc_preds, {multiEval.metricName: "accuracy"}))
print("F1 Score: ", multiEval.evaluate(svc_preds, {multiEval.metricName: "f1"}))
print("Weighted Precision: ", multiEval.evaluate(svc_preds, {multiEval.metricName: "weightedPrecision"}))
print("Weighted Recall: ", multiEval.evaluate(svc_preds, {multiEval.metricName: "weightedRecall"}))

                                                                                

AUCROC:  0.8491408126883391


                                                                                

Accuracy:  0.8890625854058486


                                                                                

F1 Score:  0.8882420515505906


                                                                                

Weighted Precision:  0.8877104482580733




Weighted Recall:  0.8890625854058486


                                                                                

#### Gradient Boosted Trees Classifier

In [None]:
gbt = GBTClassifier(
    featuresCol = 'features', 
    labelCol = 'target',
)
gbt_model = gbt.fit(train_split)

22/12/23 21:07:34 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1004.2 KiB
22/12/23 21:07:40 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1004.7 KiB
22/12/23 21:07:44 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1005.3 KiB
22/12/23 21:07:48 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1006.4 KiB
22/12/23 21:07:53 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1008.7 KiB
22/12/23 21:08:00 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1011.5 KiB
22/12/23 21:08:05 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1012.0 KiB
22/12/23 21:08:09 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1012.6 KiB
22/12/23 21:08:14 WARN org.apache.spark.scheduler.DAGScheduler: Broadcas

In [None]:
gbt_preds = gbt_model.transform(test_split)

In [None]:
binEval = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="target",metricName="areaUnderROC")
multiEval = MulticlassClassificationEvaluator(labelCol = "target", predictionCol = "prediction")

In [None]:
print("AUCROC: ", binEval.evaluate(gbt_preds))
print("Accuracy: ", multiEval.evaluate(gbt_preds, {multiEval.metricName: "accuracy"}))
print("F1 Score: ", multiEval.evaluate(gbt_preds, {multiEval.metricName: "f1"}))
print("Weighted Precision: ", multiEval.evaluate(gbt_preds, {multiEval.metricName: "weightedPrecision"}))
print("Weighted Recall: ", multiEval.evaluate(gbt_preds, {multiEval.metricName: "weightedRecall"}))

22/12/23 21:15:31 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1029.8 KiB
22/12/23 21:15:31 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/12/23 21:15:31 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

AUCROC:  0.8426669815866115


22/12/23 21:15:44 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1037.4 KiB
                                                                                

Accuracy:  0.8781962284777262


22/12/23 21:15:55 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1037.4 KiB
                                                                                

F1 Score:  0.8782554150262379


22/12/23 21:16:06 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1037.4 KiB
                                                                                

Weighted Precision:  0.8783158938891666


22/12/23 21:16:18 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1037.4 KiB

Weighted Recall:  0.878196228477726


                                                                                

### Save Models and Meta Data

#### Data to Save

In [None]:
meta_data = {
    #"spark_session": spark,
    "schema":{
        "train_schema": train_schema,
        "label_schema": label_schema,
    },
    "column_names":{
        "cols_to_drop": cols_to_drop,
        "cat_columns_to_index": cat_columns_to_index,
        "num_cols_imputed": num_cols_imputed,
        "cat_cols_ohe": cat_cols_ohe,
        "useful_cols": useful_cols,
    },
}

In [None]:
with open('/home/aap2239/interpretable-credit-default-prediction/meta_data.pkl', 'wb') as handle:
    pickle.dump(meta_data, handle, protocol=pickle.HIGHEST_PROTOCOL)


In [None]:
from google.cloud import storage

PROJECT = 'big-data-86948'
BUCKET_NAME = 'icdp-bigdata-bucket'
first_layer = "icdp_deployment/"
second_layer_meta = "meta_data/"
second_layer_objects = "objects/"
storage_client = storage.Client(project=PROJECT)

In [None]:
def create_folder(bucket_name, folder_name):
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(folder_name)
    blob.upload_from_string('', content_type='application/x-www-form-urlencoded;charset=UTF-8')

In [None]:
def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket. https://cloud.google.com/storage/docs/ """
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print('File {} uploaded to {}.'.format(
        source_file_name,
        destination_blob_name))

In [None]:
create_folder(BUCKET_NAME, first_layer)

In [None]:
create_folder(BUCKET_NAME, "icdp_deployment/"+second_layer_meta)
create_folder(BUCKET_NAME, "icdp_deployment/"+second_layer_objects)

In [None]:
upload_blob(BUCKET_NAME, "/home/aap2239/interpretable-credit-default-prediction/meta_data.pkl", "icdp_deployment/"+second_layer_meta+"meta_data.pkl")

In [None]:
!rm /home/aap2239/interpretable-credit-default-prediction/meta_data.pkl

#### Models to Save 

In [None]:
indexer_model.save("gs://icdp-bigdata-bucket/icdp_deployment/objects/indexer_model")

In [None]:
imputer_model.save("gs://icdp-bigdata-bucket/icdp_deployment/objects/imputer_model")

In [None]:
ohe_model.save("gs://icdp-bigdata-bucket/icdp_deployment/objects/ohe_model")

In [None]:
va_model.save("gs://icdp-bigdata-bucket/icdp_deployment/objects/va_model")

In [None]:
lr_model.save("gs://icdp-bigdata-bucket/icdp_deployment/objects/lr__model")