# Directory
Setting up the project and dataset directories

In [137]:
# Set root dir
import os

os.chdir("/")
print("Current Working Directory:", os.getcwd())

# !!CHANGE THIS!!
working_dir="/BigData"  # BREU + Docker + E: (offline)

dataset_dir=f"{working_dir}/dataset"

dataset_original_csv = f'{dataset_dir}/creditScores.csv'

dataset_csv_path = f'{dataset_dir}/creditScores_csv'
dataset_json_path = f'{dataset_dir}/creditScores_json'
dataset_parquet_path = f'{dataset_dir}/creditScores_parquet'
dataset_orc_path = f'{dataset_dir}/creditScores_orc'
dataset_avro_path = f'{dataset_dir}/creditScores_avro'

clean_dataset_path = f'{dataset_dir}/clean_creditScores_orc'
final_dataset_path = f'{dataset_dir}/final_creditScores_orc'

outputs = f"{working_dir}/output"
pipeline_model_path = f"{outputs}/pipeline_model"

os.chdir(working_dir)
print(f"New working directory: {working_dir}")


Current Working Directory: /
New working directory: /BigData


# Spark

In [138]:
from pyspark import SparkConf
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, SQLTransformer
from pyspark.sql import functions as F, SparkSession, DataFrame
from pyspark.sql.functions import *

proj_conf = SparkConf()\
        .setAppName("projBigData_performance")\
        .set("spark.driver.memory", "16g")\
        .set("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.5.0")

spark = SparkSession.builder.config(conf=proj_conf).getOrCreate()

In [94]:
# spark.stop()

In [139]:
df = spark.read.orc(clean_dataset_path)
df.printSchema()

root
 |-- ID: long (nullable = true)
 |-- Customer_ID: long (nullable = true)
 |-- Month: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- SSN: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Annual_Income: float (nullable = true)
 |-- Monthly_Inhand_Salary: float (nullable = true)
 |-- Num_Bank_Accounts: integer (nullable = true)
 |-- Num_Credit_Card: integer (nullable = true)
 |-- Interest_Rate: float (nullable = true)
 |-- Num_of_Loan: integer (nullable = true)
 |-- Type_of_Loan: string (nullable = true)
 |-- Delay_from_due_date: integer (nullable = true)
 |-- Num_of_Delayed_Payment: float (nullable = true)
 |-- Changed_Credit_Limit: double (nullable = true)
 |-- Num_Credit_Inquiries: integer (nullable = true)
 |-- Credit_Mix: string (nullable = true)
 |-- Outstanding_Debt: float (nullable = true)
 |-- Credit_Utilization_Ratio: float (nullable = true)
 |-- Credit_History_Age: integer (nullable = true)
 |-- 

In [96]:
df.orderBy("Customer_ID").show(truncate=False)

+-----+-----------+--------+-----------------+----+-----------+----------+-------------+---------------------+-----------------+---------------+-------------+-----------+-------------------------------------------------+-------------------+----------------------+--------------------+--------------------+----------+----------------+------------------------+------------------+---------------------+-------------------+-----------------------+--------------------------------+---------------+------------+
|ID   |Customer_ID|Month   |Name             |Age |SSN        |Occupation|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|Type_of_Loan                                     |Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|Outstanding_Debt|Credit_Utilization_Ratio|Credit_History_Age|Payment_of_Min_Amount|Total_EMI_per_month|Amount_invested_monthly|Payment_Behaviour               |Monthly_Balance|Cr

# Process columns (outside pipeline)

In [97]:
pipeline_steps = []

### Aux functions

In [98]:
def check_missing_values(df: DataFrame, column: str) -> None:
    missing_count = df.filter((F.col(column).isNull()) | (F.col(column) == "")).count() # Considers isNull and Empty string as missing
    print(f"Number of missing values in column '{column}': {missing_count}")
    
def check_distinc_values(df: DataFrame, column: str, truncate: bool = True) -> None:
    df.select(column).distinct().show(truncate=truncate)

def get_distinct_values(df: DataFrame, column_name: str) -> list:
    unique_values = df.select(column_name).distinct().rdd.flatMap(lambda x: x).collect()
    return unique_values

def string_na_missing_values(df: DataFrame, column_name: str) -> DataFrame:
    # Replace null values or empty strings in the specified column with the string "NA"
    df = df.withColumn(column_name, F.when((F.col(column_name).isNull()) | (F.col(column_name) == ""), F.lit("NA")).otherwise(F.col(column_name)))
    return df

#### In handling the empty values we had a few options and proceeded as follows:

Numerical Columns -> Filter by customer and replace by most common in the given customer for values which were meant to be constant values such as yearly income within the same year, or occupation.

Numerical Columns -> Filter by customer and replace with mean value of the given column for that customer

Categorical Columns -> Fill with string "NA" as per the function string_na_mising_values

---

#### Proceeding this way in our view allows us to:

Retain data integrity - Not losing rows and information due to dropping them on finding an empty value.

Avoiding Bias in the Model - Dropping rows may lead to bias in the model especially if the missing values happen to not be random. It's important to avoid unintentionally excluding certain patterns or groups from your analysis, and marking values as "NA" ensures the model considers absence of information.

Facilitating impute parameters - On deploying the model, the model is prepared to handle empty values and process them accordingly, with respect to it's training in what that empty value might impact the outcome.

Consistency - Consistent approach to handling missing values across most features avoids unintentional external bias.

## Remove unused columns (cleaning)

ID, Name and SSN are not required

In [99]:
to_drop = ["ID", "Name", "SSN"]
for col in to_drop:
    df = df.drop(col)

df.orderBy("Customer_ID").show(truncate=False)

# We will be keeping Customer_ID for now as a means to populate other columns based on data that's universal for every entry of the same Customer_ID

+-----------+--------+----+----------+-------------+---------------------+-----------------+---------------+-------------+-----------+-------------------------------------------------+-------------------+----------------------+--------------------+--------------------+----------+----------------+------------------------+------------------+---------------------+-------------------+-----------------------+--------------------------------+---------------+------------+
|Customer_ID|Month   |Age |Occupation|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|Type_of_Loan                                     |Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|Outstanding_Debt|Credit_Utilization_Ratio|Credit_History_Age|Payment_of_Min_Amount|Total_EMI_per_month|Amount_invested_monthly|Payment_Behaviour               |Monthly_Balance|Credit_Score|
+-----------+--------+----+----------+-------------+--------

## Dealing with Outliers

In [100]:
def iqr_outlier_treatment(dataframe: DataFrame, columns: list, factor=3):

    for column in columns:
        # Calculate Q1, Q3, and IQR
        quantiles = dataframe.approxQuantile(column, [0.25, 0.75], 0.01)
        q1, q3 = quantiles[0], quantiles[1]
        iqr = q3 - q1

        # Define the upper and lower bounds for outliers
        lower_bound = q1 - factor * iqr
        upper_bound = q3 + factor * iqr

        # Replace outliers with empty values
        dataframe = dataframe.withColumn(column, F.when((F.col(column) < lower_bound) | (F.col(column) > upper_bound), None).otherwise(F.col(column)))

    return dataframe

In [101]:
numeric_columns = ['Age', 'Annual_Income', 'Monthly_Inhand_Salary', 'Num_Bank_Accounts', 'Num_Credit_Card',
          'Interest_Rate', 'Num_of_Loan', 'Delay_from_due_date', 'Num_of_Delayed_Payment', 'Changed_Credit_Limit',
          'Num_Credit_Inquiries', 'Outstanding_Debt', 'Credit_Utilization_Ratio', 'Credit_History_Age', 'Total_EMI_per_month',
          'Amount_invested_monthly', 'Monthly_Balance']

numeric_columns

['Age',
 'Annual_Income',
 'Monthly_Inhand_Salary',
 'Num_Bank_Accounts',
 'Num_Credit_Card',
 'Interest_Rate',
 'Num_of_Loan',
 'Delay_from_due_date',
 'Num_of_Delayed_Payment',
 'Changed_Credit_Limit',
 'Num_Credit_Inquiries',
 'Outstanding_Debt',
 'Credit_Utilization_Ratio',
 'Credit_History_Age',
 'Total_EMI_per_month',
 'Amount_invested_monthly',
 'Monthly_Balance']

In [102]:
df = iqr_outlier_treatment(df, numeric_columns)

In [103]:
df.orderBy("Customer_ID").show()

+-----------+--------+----+----------+-------------+---------------------+-----------------+---------------+-------------+-----------+--------------------+-------------------+----------------------+--------------------+--------------------+----------+----------------+------------------------+------------------+---------------------+-------------------+-----------------------+--------------------+---------------+------------+
|Customer_ID|   Month| Age|Occupation|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|        Type_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|Outstanding_Debt|Credit_Utilization_Ratio|Credit_History_Age|Payment_of_Min_Amount|Total_EMI_per_month|Amount_invested_monthly|   Payment_Behaviour|Monthly_Balance|Credit_Score|
+-----------+--------+----+----------+-------------+---------------------+-----------------+---------------+-------------+-----------+--------

## Age (cleaning)

There are a lot of ages that do not make sense.

We need to ensure that every client records has the same age or a diference of 1.

In [104]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 1) Calculate mode (most common val by customer)
age_mode_df = (df.groupBy('Customer_ID', 'Age')
               .count()
               .withColumn('max_count', F.max('count').over(Window.partitionBy('Customer_ID')))
               .where(F.col('count') == F.col('max_count'))
               .drop('count', 'max_count')
               .withColumnRenamed('Age', 'Most_Common_Age'))

age_mode_df.show(10)

+-----------+---------------+
|Customer_ID|Most_Common_Age|
+-----------+---------------+
|       1006|             38|
|       1007|             48|
|       1008|             37|
|       1009|             22|
|       1011|             44|
|       1013|             30|
|       1014|             55|
|       1015|             25|
|       1017|             27|
|       1019|             30|
+-----------+---------------+
only showing top 10 rows



In [105]:
# # 2) Calc difference between ages and most common
df = df.join(age_mode_df, 'Customer_ID', 'left')
df = df.withColumn('age_difference', F.abs(F.col('Age') - F.col('Most_Common_Age')))

df.show()

+-----------+--------+---+-------------+-------------+---------------------+-----------------+---------------+-------------+-----------+--------------------+-------------------+----------------------+--------------------+--------------------+----------+----------------+------------------------+------------------+---------------------+-------------------+-----------------------+--------------------+---------------+------------+---------------+--------------+
|Customer_ID|   Month|Age|   Occupation|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|        Type_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|Outstanding_Debt|Credit_Utilization_Ratio|Credit_History_Age|Payment_of_Min_Amount|Total_EMI_per_month|Amount_invested_monthly|   Payment_Behaviour|Monthly_Balance|Credit_Score|Most_Common_Age|age_difference|
+-----------+--------+---+-------------+-------------+---------------------+

In [106]:
df.orderBy("Customer_ID").show(10)

+-----------+--------+----+----------+-------------+---------------------+-----------------+---------------+-------------+-----------+--------------------+-------------------+----------------------+--------------------+--------------------+----------+----------------+------------------------+------------------+---------------------+-------------------+-----------------------+--------------------+---------------+------------+---------------+--------------+
|Customer_ID|   Month| Age|Occupation|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|        Type_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|Outstanding_Debt|Credit_Utilization_Ratio|Credit_History_Age|Payment_of_Min_Amount|Total_EMI_per_month|Amount_invested_monthly|   Payment_Behaviour|Monthly_Balance|Credit_Score|Most_Common_Age|age_difference|
+-----------+--------+----+----------+-------------+---------------------+------

In [107]:
# 3) Identify and replace outliers (more than 1 year diff)
df = df.withColumn('Age', F.when((df['age_difference'] > 1) | df['Age'].isNull(), df['Most_Common_Age']).otherwise(df['Age']))

In [108]:
df.orderBy("Customer_ID").show(10)

+-----------+--------+---+----------+-------------+---------------------+-----------------+---------------+-------------+-----------+--------------------+-------------------+----------------------+--------------------+--------------------+----------+----------------+------------------------+------------------+---------------------+-------------------+-----------------------+--------------------+---------------+------------+---------------+--------------+
|Customer_ID|   Month|Age|Occupation|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|        Type_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|Outstanding_Debt|Credit_Utilization_Ratio|Credit_History_Age|Payment_of_Min_Amount|Total_EMI_per_month|Amount_invested_monthly|   Payment_Behaviour|Monthly_Balance|Credit_Score|Most_Common_Age|age_difference|
+-----------+--------+---+----------+-------------+---------------------+---------

In [109]:
# Drop aux cols
df = df.drop('Most_Common_Age', 'age_difference')

df.orderBy("Customer_ID").show(truncate=False)

+-----------+--------+---+----------+-------------+---------------------+-----------------+---------------+-------------+-----------+-------------------------------------------------+-------------------+----------------------+--------------------+--------------------+----------+----------------+------------------------+------------------+---------------------+-------------------+-----------------------+--------------------------------+---------------+------------+
|Customer_ID|Month   |Age|Occupation|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|Type_of_Loan                                     |Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|Outstanding_Debt|Credit_Utilization_Ratio|Credit_History_Age|Payment_of_Min_Amount|Total_EMI_per_month|Amount_invested_monthly|Payment_Behaviour               |Monthly_Balance|Credit_Score|
+-----------+--------+---+----------+-------------+-----------

## Save Final Dataframe/Dataset

This will be the initial dataset used to train our models (before the pipeline)

In [110]:
df = df.repartition(16)
df.write.mode("overwrite").orc(final_dataset_path)

# Process columns (inside pipeline)

In this section, we will prepare the pipeline to be used in the next step (models)

The pipeline will start with the last save Dataframe, as will the models step.

## Type of Loans (Pipeline)

Type_of_loan is a complex column that need to be processed into several cols

In [111]:
check_distinc_values(df, "Type_of_loan", truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------+
|Type_of_loan                                                                                                                                        |
+----------------------------------------------------------------------------------------------------------------------------------------------------+
|Not Specified, Personal Loan, and Debt Consolidation Loan                                                                                           |
|Auto Loan, Payday Loan, Student Loan, and Credit-Builder Loan                                                                                       |
|Personal Loan, Personal Loan, Credit-Builder Loan, Student Loan, and Auto Loan                                                                      |
|Personal Loan, Home Equity Loan, Home Equity Loan, Mortgage Loan, Personal Loan, Student Loan

In [112]:
all_loan_types = ["credit-builder loan", "not specified", "mortgage loan", "auto loan", "student loan",
                  "home equity loan", "personal loan", "payday loan", "debt consolidation loan"]

In [113]:
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
import pyspark.sql.functions as F
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable


# https://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml
class OneHotLoanType(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
    """
    A custom Transformer that converts 'Type_of_loan' into one-hot encoded columns.
    """
    def __init__(self, inputCol="Type_of_loan", outputCol="loan_type_vector", loanTypes=[]):
        super(OneHotLoanType, self).__init__()
        self.setInputCol(inputCol)
        self.setOutputCol(outputCol)
        self.loanTypes = loanTypes
    
    def setInputCol(self, value):
        """
        Sets the value of :py:attr:`inputCol`.
        """
        return self._set(inputCol=value)
    
    def setOutputCol(self, value):
        """
        Sets the value of :py:attr:`outputCol`.
        """
        return self._set(outputCol=value)

    def _transform(self, df):
        inputCol = self.getInputCol()
        clean_input_field = F.regexp_replace(F.lower(F.col(inputCol)), " and ", " ")
        
        for loan_type in self.loanTypes:
            df = df.withColumn(loan_type, F.array_contains(F.split(clean_input_field, ", "), loan_type).cast("int"))
            df = df.withColumn(loan_type, F.coalesce(F.col(loan_type), F.lit(0)))
            df = df.withColumnRenamed(loan_type, loan_type.replace(" ", "_").replace("-", "_"))

        return df

In [114]:
one_hot_loan_type = OneHotLoanType(inputCol="Type_of_loan", outputCol="loan_type_vector", loanTypes=all_loan_types)

pipeline_steps += [one_hot_loan_type]

print("Pipeline Steps:")
pipeline_steps

Pipeline Steps:


[OneHotLoanType_240f364623ce]

## Month (Pipeline)

In [115]:
check_missing_values(df, "Month")
check_distinc_values(df, "Month")

Number of missing values in column 'Month': 0
+--------+
|   Month|
+--------+
|    July|
|February|
| January|
|   March|
|     May|
|  August|
|   April|
|    June|
+--------+



In [116]:
# Set SQL Transformer query month -> month_num
sql_statement = f"""SELECT *,
                    CASE Month
                        WHEN 'January' THEN 1
                        WHEN 'February' THEN 2
                        WHEN 'March' THEN 3
                        WHEN 'April' THEN 4
                        WHEN 'May' THEN 5
                        WHEN 'June' THEN 6
                        WHEN 'July' THEN 7
                        WHEN 'August' THEN 8
                        WHEN 'September' THEN 9
                        WHEN 'October' THEN 10
                        WHEN 'November' THEN 11
                        WHEN 'December' THEN 12
                        ELSE 0
                    END as Month_num FROM __THIS__;
                    """
pipeline_steps += [SQLTransformer(statement=sql_statement)]

print("Pipeline Steps:")
pipeline_steps

Pipeline Steps:


[OneHotLoanType_240f364623ce, SQLTransformer_bf1a65c74c3b]

## Populate missing categorical (Pipeline)

In [117]:
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCols, Param, Params, TypeConverters
import pyspark.sql.functions as F

class FillMissingCategorical(Transformer, HasInputCols, Params, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self, inputCol=None, fillValue="NA"):
        super(FillMissingCategorical, self).__init__()
        self.inputCol = Param(self, "inputCol", "")
        self.fillValue = Param(self, "fillValue", "")
        self._setDefault(inputCol=inputCol, fillValue=fillValue)
        
    def setInputCol(self, value):
        return self._set(inputCol=value)
    
    def setFillValue(self, value):
        return self._set(fillValue=value)
    
    def _transform(self, df):
        fill_value = self.getOrDefault(self.fillValue)
        target_col = self.getOrDefault(self.inputCol)
        
        df = df.withColumn(target_col, F.when((F.col(target_col).isNull()) | (F.col(target_col) == ""), F.lit(fill_value)).otherwise(F.col(target_col)))

        return df

In [118]:
default_fill_value = "NA"

# Indexers with no order. Needs one_hot encode
fill_missing_and_string_indexer = ["Occupation", "Credit_Mix", "Payment_Behaviour", "Payment_of_Min_Amount"]

for categoricalCol in fill_missing_and_string_indexer:
    fill_missing = FillMissingCategorical(inputCol=categoricalCol, fillValue=default_fill_value) # Error in the indexer if missing values
    string_indexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "_indexed", handleInvalid='keep') # deal with unseen values
    encoder = OneHotEncoder(inputCols=[string_indexer.getOutputCol()], outputCols=[categoricalCol + "_oneHot"])
    pipeline_steps += [fill_missing, string_indexer, encoder]
    # pipeline_steps += [string_indexer, encoder]

print("Pipeline Steps:")
pipeline_steps

Pipeline Steps:


[OneHotLoanType_240f364623ce,
 SQLTransformer_bf1a65c74c3b,
 FillMissingCategorical_db19d6bea5b1,
 StringIndexer_23a5662e3390,
 OneHotEncoder_42ba01d3697f,
 FillMissingCategorical_9fda294c4025,
 StringIndexer_d02b3207599c,
 OneHotEncoder_87589e059d9a,
 FillMissingCategorical_d9ac17981d2c,
 StringIndexer_b2a70d5848d7,
 OneHotEncoder_56378ce551b3,
 FillMissingCategorical_1fdc1d84f679,
 StringIndexer_ad3380a95f3f,
 OneHotEncoder_42a543ae10b2]

## Populate missing numerical (Pipeline)

In [119]:
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCols, Param, Params
import pyspark.sql.functions as F

class ZeroFillImputer(Transformer, HasInputCols, Params, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self, inputCols=None):
        super(ZeroFillImputer, self).__init__()
        self.inputCols = Param(self, "inputCols", "")
        self._setDefault(inputCols=inputCols)
        
    def setInputCols(self, value):
        return self._set(inputCols=value)
    
    def _transform(self, df):
        input_cols = self.getOrDefault(self.inputCols)
        for col in input_cols:
            df = df.withColumn(col, F.coalesce(F.col(col), F.lit(0)))
        return df

In [120]:
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCols, Param, Params
import pyspark.sql.functions as F
from pyspark.sql.window import Window

class ImputerWithCustomerData(Transformer, HasInputCols, Params, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self, inputCols=None):
        super(ImputerWithCustomerData, self).__init__()
        self.inputCols = Param(self, "inputCols", "")
        self._setDefault(inputCols=inputCols)
        
    def setInputCols(self, value):
        return self._set(inputCols=value)
    
    def _transform(self, df):
        input_cols = self.getOrDefault(self.inputCols)
        
        for col in input_cols:
            # Calculate the global mean for the column
            global_mean = df.select(F.mean(col).alias(f'{col}_global_mean')).collect()[0][f'{col}_global_mean']
            
            # Calculate the mean of values grouped by Customer_ID
            windowSpec  = Window.partitionBy("Customer_ID")
            mean_values = df.withColumn(f"{col}_customer_mean", F.mean(df[col]).over(windowSpec))
            
            # Replace the missing values by the customer mean, if possible, otherwise use the global mean
            df = mean_values.withColumn(col, F.coalesce(F.col(col), F.col(f"{col}_customer_mean"), F.lit(global_mean)))
            
            # Drop the intermediate mean column
            df = df.drop(f"{col}_customer_mean")

            
        return df


In [121]:
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCols, Param, Params
import pyspark.sql.functions as F

class EnsureIntegerType(Transformer, HasInputCols, Params, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self, inputCols=None):
        super(EnsureIntegerType, self).__init__()
        self.inputCols = Param(self, "inputCols", "")
        self._setDefault(inputCols=inputCols)
        
    def setInputCols(self, value):
        return self._set(inputCols=value)
    
    def _transform(self, df):
        input_cols = self.getOrDefault(self.inputCols)
        for col in input_cols:
            df = df.withColumn(col, F.col(col).cast('int'))
        return df


In [122]:
from pyspark.ml.feature import Imputer

numerical_to_fill = ["Age", "Annual_Income", "Monthly_Inhand_Salary", "Num_Bank_Accounts",
                     "Delay_from_due_date", "Num_of_Delayed_Payment", "Changed_Credit_Limit",
                     "Outstanding_Debt", "Credit_History_Age", "Amount_invested_monthly",
                     "Monthly_Balance", "Num_Credit_Card", "Num_Credit_Inquiries", "Interest_Rate",
                     "Credit_Utilization_Ratio", "Total_EMI_per_month"]

numerical_to_zero_fill = ["Num_of_Loan"]

ensure_int_cols = ['Age', 'Num_Bank_Accounts', 'Delay_from_due_date', 'Num_of_Delayed_Payment',
                   'Credit_History_Age', 'Num_Credit_Card', 'Num_Credit_Inquiries']

# mean_imputer = Imputer(inputCols=numerical_to_fill,  # List the columns to be imputed
#                        outputCols=["{}_imputed".format(c) for c in numerical_to_fill],  # Output column names
#                        strategy='mean'  # Can be "mean", "median", or "mode"
#                        )

mean_imputer = ImputerWithCustomerData(inputCols=numerical_to_fill)

zero_imputer = ZeroFillImputer(inputCols=numerical_to_zero_fill)

ensure_integer_transformer = EnsureIntegerType(inputCols=ensure_int_cols)

pipeline_steps += [mean_imputer]
pipeline_steps += [zero_imputer]
pipeline_steps += [ensure_integer_transformer]

print("Pipeline Steps:")
pipeline_steps

Pipeline Steps:


[OneHotLoanType_240f364623ce,
 SQLTransformer_bf1a65c74c3b,
 FillMissingCategorical_db19d6bea5b1,
 StringIndexer_23a5662e3390,
 OneHotEncoder_42ba01d3697f,
 FillMissingCategorical_9fda294c4025,
 StringIndexer_d02b3207599c,
 OneHotEncoder_87589e059d9a,
 FillMissingCategorical_d9ac17981d2c,
 StringIndexer_b2a70d5848d7,
 OneHotEncoder_56378ce551b3,
 FillMissingCategorical_1fdc1d84f679,
 StringIndexer_ad3380a95f3f,
 OneHotEncoder_42a543ae10b2,
 ImputerWithCustomerData_2ae4d0da2c4f,
 ZeroFillImputer_d60b0d32397f,
 EnsureIntegerType_eb0547c5b4c7]

## Credit Score - Target (Pipeline)

In [123]:
sql_statement = f"""SELECT *,
                    CASE Credit_Score
                        WHEN 'Poor' THEN 0
                        WHEN 'Standard' THEN 1
                        WHEN 'Good' THEN 2
                        ELSE -1
                    END as Credit_score_num FROM __THIS__;
                    """

pipeline_steps += [SQLTransformer(statement=sql_statement)]

print("Pipeline Steps:")
pipeline_steps

Pipeline Steps:


[OneHotLoanType_240f364623ce,
 SQLTransformer_bf1a65c74c3b,
 FillMissingCategorical_db19d6bea5b1,
 StringIndexer_23a5662e3390,
 OneHotEncoder_42ba01d3697f,
 FillMissingCategorical_9fda294c4025,
 StringIndexer_d02b3207599c,
 OneHotEncoder_87589e059d9a,
 FillMissingCategorical_d9ac17981d2c,
 StringIndexer_b2a70d5848d7,
 OneHotEncoder_56378ce551b3,
 FillMissingCategorical_1fdc1d84f679,
 StringIndexer_ad3380a95f3f,
 OneHotEncoder_42a543ae10b2,
 ImputerWithCustomerData_2ae4d0da2c4f,
 ZeroFillImputer_d60b0d32397f,
 EnsureIntegerType_eb0547c5b4c7,
 SQLTransformer_775f00266c6e]

In [124]:
df.orderBy("Customer_ID").show(truncate=False)

+-----------+--------+---+----------+-------------+---------------------+-----------------+---------------+-------------+-----------+-------------------------------------------------+-------------------+----------------------+--------------------+--------------------+----------+----------------+------------------------+------------------+---------------------+-------------------+-----------------------+--------------------------------+---------------+------------+
|Customer_ID|Month   |Age|Occupation|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|Type_of_Loan                                     |Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|Outstanding_Debt|Credit_Utilization_Ratio|Credit_History_Age|Payment_of_Min_Amount|Total_EMI_per_month|Amount_invested_monthly|Payment_Behaviour               |Monthly_Balance|Credit_Score|
+-----------+--------+---+----------+-------------+-----------

## Asembler

In [125]:
# Columns that we will keep after running the pipeline.
# It contains every relevant feature, including the target var
# Also includes the "features" column
cols_to_keep = ["Month_num",
                "Age",
                "Annual_Income",
                "Monthly_Inhand_Salary",
                "Occupation_oneHot",
                "Num_Bank_Accounts",
                "Num_Credit_Card",
                "Interest_Rate",
                "Delay_from_due_date",
                "Num_of_Delayed_Payment",
                "Changed_Credit_Limit",
                "Num_Credit_Inquiries",
                "Outstanding_Debt",
                "Credit_Utilization_Ratio",
                "Credit_History_Age",
                "Total_EMI_per_month",
                "Amount_invested_monthly",
                "Monthly_Balance",
                "Credit_Mix_oneHot",
                "Num_of_Loan",
                "debt_consolidation_loan",
                "personal_loan",
                "payday_loan",
                "mortgage_loan",
                "credit_builder_loan",
                "auto_loan",
                "home_equity_loan",
                "student_loan",
                "not_specified",
                "Payment_Behaviour_oneHot",
                "Payment_of_Min_Amount_oneHot",
                "Credit_score_num",
                "features",
                "scaledFeatures"]

# Features used to predict the target var
# Pretty much {cols_to_keep} but without the target var
features = ["Month_num",
            "Age",
            "Annual_Income",
            "Monthly_Inhand_Salary",
            "Occupation_oneHot",
            "Num_Bank_Accounts",
            "Num_Credit_Card",
            "Interest_Rate",
            "Delay_from_due_date",
            "Num_of_Delayed_Payment",
            "Changed_Credit_Limit",
            "Num_Credit_Inquiries",
            "Outstanding_Debt",
            "Credit_Mix_oneHot",
            "Credit_Utilization_Ratio",
            "Credit_History_Age",
            "Total_EMI_per_month",
            "Amount_invested_monthly",
            "Monthly_Balance",
            "Num_of_Loan",
            "debt_consolidation_loan",
            "personal_loan",
            "payday_loan",
            "mortgage_loan",
            "credit_builder_loan",
            "auto_loan",
            "home_equity_loan",
            "student_loan",
            "not_specified",
            "Payment_Behaviour_oneHot",
            "Payment_of_Min_Amount_oneHot"]

In [126]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

assembler = VectorAssembler(inputCols=features,
                            outputCol="features")

scaler = StandardScaler(inputCol="features",
                        outputCol="scaledFeatures",
                        withStd=True, withMean=False)

pipeline_steps += [assembler]
pipeline_steps += [scaler]

# Run Pipeline

Just to test the pipeline. The transformed Dataframe will not be saved

In [127]:
pipeline_steps

[OneHotLoanType_240f364623ce,
 SQLTransformer_bf1a65c74c3b,
 FillMissingCategorical_db19d6bea5b1,
 StringIndexer_23a5662e3390,
 OneHotEncoder_42ba01d3697f,
 FillMissingCategorical_9fda294c4025,
 StringIndexer_d02b3207599c,
 OneHotEncoder_87589e059d9a,
 FillMissingCategorical_d9ac17981d2c,
 StringIndexer_b2a70d5848d7,
 OneHotEncoder_56378ce551b3,
 FillMissingCategorical_1fdc1d84f679,
 StringIndexer_ad3380a95f3f,
 OneHotEncoder_42a543ae10b2,
 ImputerWithCustomerData_2ae4d0da2c4f,
 ZeroFillImputer_d60b0d32397f,
 EnsureIntegerType_eb0547c5b4c7,
 SQLTransformer_775f00266c6e,
 VectorAssembler_34a0d566fa36,
 StandardScaler_2df228480916]

## Run pipeline

In [128]:
pipeline = Pipeline(stages=pipeline_steps)

In [129]:
df.show(1)

+-----------+-----+---+----------+-------------+---------------------+-----------------+---------------+-------------+-----------+--------------------+-------------------+----------------------+--------------------+--------------------+----------+----------------+------------------------+------------------+---------------------+-------------------+-----------------------+--------------------+---------------+------------+
|Customer_ID|Month|Age|Occupation|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|        Type_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|Outstanding_Debt|Credit_Utilization_Ratio|Credit_History_Age|Payment_of_Min_Amount|Total_EMI_per_month|Amount_invested_monthly|   Payment_Behaviour|Monthly_Balance|Credit_Score|
+-----------+-----+---+----------+-------------+---------------------+-----------------+---------------+-------------+-----------+--------------------

In [130]:
df.printSchema()

root
 |-- Customer_ID: long (nullable = true)
 |-- Month: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Annual_Income: float (nullable = true)
 |-- Monthly_Inhand_Salary: float (nullable = true)
 |-- Num_Bank_Accounts: integer (nullable = true)
 |-- Num_Credit_Card: integer (nullable = true)
 |-- Interest_Rate: float (nullable = true)
 |-- Num_of_Loan: integer (nullable = true)
 |-- Type_of_Loan: string (nullable = true)
 |-- Delay_from_due_date: integer (nullable = true)
 |-- Num_of_Delayed_Payment: float (nullable = true)
 |-- Changed_Credit_Limit: double (nullable = true)
 |-- Num_Credit_Inquiries: integer (nullable = true)
 |-- Credit_Mix: string (nullable = true)
 |-- Outstanding_Debt: float (nullable = true)
 |-- Credit_Utilization_Ratio: float (nullable = true)
 |-- Credit_History_Age: integer (nullable = true)
 |-- Payment_of_Min_Amount: string (nullable = true)
 |-- Total_EMI_per_month: float (nullable = true)
 |-- 

In [131]:
print("Fiting pipeline...")
# Fit
pipe_model = pipeline.fit(df)
pipe_model.save(pipeline_model_path)

Fiting pipeline...


In [132]:
# Transform the data
print("Transforming data...")
transformed_df = pipe_model.transform(df)

Transforming data...


In [133]:
for col in transformed_df.columns:
    if col not in cols_to_keep:
        transformed_df = transformed_df.drop(col)

transformed_df = transformed_df.select(*cols_to_keep) 

In [134]:
transformed_df.show(truncate=False)

+---------+---+--------------+---------------------+-----------------+-----------------+---------------+-------------+-------------------+----------------------+--------------------+--------------------+-----------------+------------------------+------------------+-------------------+-----------------------+------------------+-----------------+-----------+-----------------------+-------------+-----------+-------------+-------------------+---------+----------------+------------+-------------+------------------------+----------------------------+----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------

In [135]:
transformed_df.select("features", "Credit_score_num").show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+
|features                                                                                                                                                                                                                                                                                  |Credit_score_num|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+
|(57,[0,1,2,3,5,20,21,22,23,24,25,26,27,29,32,33,34,35,36,37,42,43,46,48,55],[1.0,43.0,112287.

In [136]:
spark.stop()