# Chapter 3 - Feature Engineering

## Importing Needed Libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, when, lit, mean, count, udf
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, DoubleType, StringType
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.decomposition import PCA
from sklearn.metrics import roc_curve, auc


import tensorflow as tf
from tensorflowonspark import TFCluster, TFNode

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import warnings
warnings.filterwarnings('ignore')

import os
import shutil






## Spark Initialization and MySQL Server connection

In [2]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Feature Engineering") \
    .config("spark.some.config.option", "some-value") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,mysql:mysql-connector-java:8.0.17") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

# MySQL connection
jdbc_url = "jdbc:mysql://localhost:3306/HomeLoans?serverTimezone=UTC"
connection_properties = {
    "user": "root",
    "password": "mypass",
    "driver": "com.mysql.cj.jdbc.Driver"
}


## Create Dataframes for all the Tables

In [3]:
df_train = spark.read.jdbc(url=jdbc_url, table="applicationtrain", properties=connection_properties)
df_test = spark.read.jdbc(url=jdbc_url, table="applicationtest", properties=connection_properties)
df_bureau = spark.read.jdbc(url=jdbc_url, table="bureau", properties=connection_properties)
df_bureau_balance = spark.read.jdbc(url=jdbc_url, table="bureau_balance", properties=connection_properties)
df_credit_card_balance = spark.read.jdbc(url=jdbc_url, table="credit_card_balance", properties=connection_properties)
df_installments_payments = spark.read.jdbc(url=jdbc_url, table="installments_payments", properties=connection_properties)
df_pos_cash_balance = spark.read.jdbc(url=jdbc_url, table="pos_cash_balance", properties=connection_properties)
df_previous_application = spark.read.jdbc(url=jdbc_url, table="previous_application", properties=connection_properties)
df_column_desc= spark.read.jdbc(url=jdbc_url, table="homecredit_columns_description", properties=connection_properties)


## Combine Train and Test Datasets to perform the changes in both 

But first, a new column named TARGET is created in the TEST dataset to match the TARGET in the train dataset.

In [4]:
df_test = df_test.withColumn("TARGET", lit(None).cast("double"))
df_combined = df_train.unionByName(df_test)
df_combined.show()

# total df 356255
# TEST = 48744

+----------+------+------------------+-----------+------------+---------------+------------+----------------+----------+-----------+---------------+---------------+--------------------+--------------------+--------------------+-----------------+--------------------------+----------+-------------+-----------------+---------------+-----------+----------+--------------+---------------+----------------+----------+----------+---------------+---------------+--------------------+---------------------------+--------------------------+-----------------------+--------------------------+--------------------------+---------------------------+----------------------+----------------------+-----------------------+--------------------+-------------------+-------------------+-------------------+--------------+----------------+---------------------------+------------------+--------------+-------------+-------------+-------------+-------------+------------+--------------------+--------------+------------

## Calculate and Drop Columns Which Have 50% of Values as Nan

In [5]:
total_rows = df_combined.count()

# Calculating the null values for each column and then Convert it to Dictionary
missing_values_df = df_combined.select([(total_rows - count(c)).alias(c) for c in df_combined.columns])
missing_values_dict = missing_values_df.first().asDict()
missing_percentage = {k: v / total_rows for k, v in missing_values_dict.items()}

# Filter out columns with more than 50% missing values, but keep TARGET in any case
columns_to_keep = [k for k, v in missing_percentage.items() if v <= 0.50 or k == 'TARGET']
columns_to_drop = [k for k, v in missing_percentage.items() if v > 0.50 and k != 'TARGET']

df_combined_Only_50 = df_combined.select(columns_to_keep)
print(f"Columns dropped due to missing values > 50%: {columns_to_drop}")
df_combined_Only_50.show()

Columns dropped due to missing values > 50%: ['OWN_CAR_AGE', 'EXT_SOURCE_1', 'APARTMENTS_AVG', 'BASEMENTAREA_AVG', 'YEARS_BUILD_AVG', 'COMMONAREA_AVG', 'ELEVATORS_AVG', 'ENTRANCES_AVG', 'FLOORSMIN_AVG', 'LANDAREA_AVG', 'LIVINGAPARTMENTS_AVG', 'NONLIVINGAPARTMENTS_AVG', 'NONLIVINGAREA_AVG', 'APARTMENTS_MODE', 'BASEMENTAREA_MODE', 'YEARS_BUILD_MODE', 'COMMONAREA_MODE', 'ELEVATORS_MODE', 'ENTRANCES_MODE', 'FLOORSMIN_MODE', 'LANDAREA_MODE', 'LIVINGAPARTMENTS_MODE', 'NONLIVINGAPARTMENTS_MODE', 'NONLIVINGAREA_MODE', 'APARTMENTS_MEDI', 'BASEMENTAREA_MEDI', 'YEARS_BUILD_MEDI', 'COMMONAREA_MEDI', 'ELEVATORS_MEDI', 'ENTRANCES_MEDI', 'FLOORSMIN_MEDI', 'LANDAREA_MEDI', 'LIVINGAPARTMENTS_MEDI', 'NONLIVINGAPARTMENTS_MEDI', 'NONLIVINGAREA_MEDI', 'FONDKAPREMONT_MODE', 'WALLSMATERIAL_MODE']
+----------+------+------------------+-----------+------------+---------------+------------+----------------+----------+-----------+---------------+---------------+--------------------+--------------------+---------

## Drop all Building Columns and Occupation_TYPE

In [6]:
building_Columns = [
    "APARTMENTS", "YEARS_BEGINEXPLUATATION", "ELEVATORS", "ENTRANCES", "FLOORSMAX", 
    "LIVINGAREA", "NONLIVINGAREA", "HOUSETYPE", "TOTALAREA", "WALLSMATERIAL", "EMERGENCYSTATE", "LIVE_CITY_NOT_WORK_CITY", "REG_CITY_NOT_WORK_CITY", 
    "REG_CITY_NOT_LIVE_CITY", "LIVE_REGION_NOT_WORK_REGION", "REG_REGION_NOT_WORK_REGION", "REG_REGION_NOT_LIVE_REGION", "OBS_30_CNT_SOCIAL_CIRCLE",
    "DEF_30_CNT_SOCIAL_CIRCLE", "OBS_60_CNT_SOCIAL_CIRCLE", "DEF_60_CNT_SOCIAL_CIRCLE", "OCCUPATION_TYPE"
    
]

all_columns = df_combined_Only_50.columns
columns_to_keep = [col for col in all_columns if not any(keyword in col for keyword in building_Columns) or col == 'TARGET']
df_combined_without_Building = df_combined_Only_50.select(columns_to_keep)

## Convert Days To Years Columns

In [7]:
df_combined_without_Building = df_combined_without_Building.withColumn("AGE_YEARS", df_combined_without_Building["DAYS_BIRTH"] / -365)
df_combined_without_Building = df_combined_without_Building.withColumn("YEARS_EMPLOYED", df_combined_without_Building["DAYS_EMPLOYED"] / -365)
df_combined_without_Building = df_combined_without_Building.withColumn("YEARS_REGISTRATION", df_combined_without_Building["DAYS_REGISTRATION"] / -365)

## Fill Missing Values for Numerical and Categorical Columns
First EXT_SOURCE_3 is casted as Double since <br>
They won't fill Mean values as they have different datatype from EXT_SOURCE_2 for some reason.

In [8]:
df_combined_without_Building = df_combined_without_Building \
    .withColumn("EXT_SOURCE_3", F.col("EXT_SOURCE_3").cast(DoubleType()))

# Find the types for all the columns
numerical_columns = [field.name for field in df_combined_without_Building.schema.fields if isinstance(field.dataType, (IntegerType, DoubleType)) and field.name != 'TARGET']
categorical_columns = [field.name for field in df_combined_without_Building.schema.fields if isinstance(field.dataType, StringType) and field.name != 'TARGET']

print("Numerical columns:", numerical_columns)
print("Categorical columns:", categorical_columns)

# Function for Numerical Columns Using Mean
def fill_missing_with_mean(df, columns):
    for column in columns:
        mean_value = df.select(F.mean(F.col(column)).alias(column)).first()[column]
        if mean_value is not None:
            df = df.fillna({column: mean_value})
    return df

# Function for Categorical Columns using Mode
def fill_missing_with_mode(df, columns):
    for column in columns:
        mode_value = df.groupBy(column).count().orderBy('count', ascending=False).first()[0]
        if mode_value is not None:
            df = df.fillna({column: mode_value})
    return df

df_filled_values = fill_missing_with_mean(df_combined_without_Building, numerical_columns)
df_filled_values = fill_missing_with_mode(df_filled_values, categorical_columns)
df_filled_values.show()

Numerical columns: ['SK_ID_CURR', 'CNT_CHILDREN', 'AMT_INCOME_TOTAL', 'AMT_CREDIT', 'AMT_ANNUITY', 'AMT_GOODS_PRICE', 'REGION_POPULATION_RELATIVE', 'DAYS_BIRTH', 'DAYS_EMPLOYED', 'DAYS_REGISTRATION', 'DAYS_ID_PUBLISH', 'FLAG_MOBIL', 'FLAG_EMP_PHONE', 'FLAG_WORK_PHONE', 'FLAG_CONT_MOBILE', 'FLAG_PHONE', 'FLAG_EMAIL', 'CNT_FAM_MEMBERS', 'REGION_RATING_CLIENT', 'REGION_RATING_CLIENT_W_CITY', 'HOUR_APPR_PROCESS_START', 'EXT_SOURCE_2', 'EXT_SOURCE_3', 'DAYS_LAST_PHONE_CHANGE', 'FLAG_DOCUMENT_2', 'FLAG_DOCUMENT_3', 'FLAG_DOCUMENT_4', 'FLAG_DOCUMENT_5', 'FLAG_DOCUMENT_6', 'FLAG_DOCUMENT_7', 'FLAG_DOCUMENT_8', 'FLAG_DOCUMENT_9', 'FLAG_DOCUMENT_10', 'FLAG_DOCUMENT_11', 'FLAG_DOCUMENT_12', 'FLAG_DOCUMENT_13', 'FLAG_DOCUMENT_14', 'FLAG_DOCUMENT_15', 'FLAG_DOCUMENT_16', 'FLAG_DOCUMENT_17', 'FLAG_DOCUMENT_18', 'FLAG_DOCUMENT_19', 'FLAG_DOCUMENT_20', 'FLAG_DOCUMENT_21', 'AGE_YEARS', 'YEARS_EMPLOYED', 'YEARS_REGISTRATION']
Categorical columns: ['NAME_CONTRACT_TYPE', 'CODE_GENDER', 'FLAG_OWN_CAR', 'FL

In [9]:
df_filled_values.count()

356255

## Check If All Null Values are Replaced

First Run Identified that Some columns still remain with nulls <br>
OCCUPATION_TYPE Remains with Null Values

In [10]:
null_counts = df_filled_values.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_filled_values.columns])
null_counts.show()

+----------+------+------------------+-----------+------------+---------------+------------+----------------+----------+-----------+---------------+---------------+----------------+-------------------+------------------+-----------------+--------------------------+----------+-------------+-----------------+---------------+----------+--------------+---------------+----------------+----------+----------+---------------+--------------------+---------------------------+--------------------------+-----------------------+-----------------+------------+------------+----------------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+--------------------------+-------------------------+--------------------------+

In [11]:
df_filled_values.count()

356255

# Data Aggregation and Dataframe Enhancement

## Outliers Handling

### Dropping all the Rows Where Children count are more than 8

In [12]:
#df_filled_values = df_filled_values.filter(col("CNT_CHILDREN") <= 8) 

## Filter Out Outliers and Defects

- Filter out people aged more than 90 years and less than 18
- Filter out people with years of employement over 90 and less than 0
- Filter out Loans with over 2mil credit amount

In [13]:
#df_filled_values = df_filled_values.filter((col("AGE_YEARS") <= 90) & (col("AGE_YEARS") > 18))
#df_filled_values = df_filled_values.filter((col("YEARS_EMPLOYED") <= 90) & (col("YEARS_EMPLOYED") >= 0))
#df_filled_values = df_filled_values.filter(col('AMT_CREDIT') <= 20000000)

## Bureau Aggregations

In [14]:
def aggregate_bureau_data(bureau_df, join_col='SK_ID_CURR'):
    bureau_agg = bureau_df.groupBy(join_col).agg(
        F.mean('AMT_CREDIT_SUM').alias('mean_credit_sum_bureau'),
        F.sum('AMT_CREDIT_SUM').alias('sum_credit_sum_bureau'),
        F.max('DAYS_CREDIT').alias('max_days_credit_bureau'),
        F.mean('DAYS_CREDIT').alias('mean_days_credit_bureau'),
        F.min('DAYS_CREDIT').alias('min_days_credit_bureau'),
        F.count('*').alias('bureau_count')
    )
    return bureau_agg

## Bureau_Balance Aggregations

In [15]:
def aggregate_bureau_balance_data(bureau_balance_df, join_col='SK_ID_BUREAU'):
    bureau_balance_agg = bureau_balance_df.groupBy(join_col).agg(
        F.mean('MONTHS_BALANCE').alias('mean_months_balance_bureau'),
        F.sum('MONTHS_BALANCE').alias('sum_months_balance_bureau'),
        F.max('MONTHS_BALANCE').alias('max_months_balance_bureau'),
        F.min('MONTHS_BALANCE').alias('min_months_balance_bureau'),
        F.count('*').alias('bureau_balance_count')
    )
    return bureau_balance_agg

## Credit Card Balance Aggregations

In [16]:
def aggregate_credit_card_balance_data(credit_card_balance_df, join_col='SK_ID_CURR'):
    credit_card_agg = credit_card_balance_df.groupBy(join_col).agg(
        F.mean('AMT_BALANCE').alias('mean_amt_balance_credit_card'),
        F.sum('AMT_BALANCE').alias('sum_amt_balance_credit_card'),
        F.max('AMT_BALANCE').alias('max_amt_balance_credit_card'),
        F.min('AMT_BALANCE').alias('min_amt_balance_credit_card'),
        F.count('*').alias('credit_card_count')
    )
    return credit_card_agg

## Installment Payments Aggregations

In [17]:
def aggregate_installments_payments_data(installments_payments_df, join_col='SK_ID_CURR'):
    installments_agg = installments_payments_df.groupBy(join_col).agg(
        F.mean('AMT_PAYMENT').alias('mean_amt_payment_installments'),
        F.sum('AMT_PAYMENT').alias('sum_amt_payment_installments'),
        F.max('AMT_PAYMENT').alias('max_amt_payment_installments'),
        F.min('AMT_PAYMENT').alias('min_amt_payment_installments'),
        F.count('*').alias('installments_count')
    )
    return installments_agg

## POS Cash Balance Aggregations

In [18]:
def aggregate_pos_cash_balance_data(pos_cash_balance_df, join_col='SK_ID_CURR'):
    pos_cash_agg = pos_cash_balance_df.groupBy(join_col).agg(
        F.mean('MONTHS_BALANCE').alias('mean_months_balance_pos_cash'),
        F.sum('MONTHS_BALANCE').alias('sum_months_balance_pos_cash'),
        F.max('MONTHS_BALANCE').alias('max_months_balance_pos_cash'),
        F.min('MONTHS_BALANCE').alias('min_months_balance_pos_cash'),
        F.count('*').alias('pos_cash_count')
    )
    return pos_cash_agg

## Previous Application Aggregations

In [19]:
def aggregate_previous_application_data(previous_application_df, join_col='SK_ID_CURR'):
    previous_application_agg = previous_application_df.groupBy(join_col).agg(
        F.mean('AMT_APPLICATION').alias('mean_amt_application_previous'),
        F.sum('AMT_APPLICATION').alias('sum_amt_application_previous'),
        F.max('AMT_APPLICATION').alias('max_amt_application_previous'),
        F.min('AMT_APPLICATION').alias('min_amt_application_previous'),
        F.count('*').alias('previous_application_count')
    )
    return previous_application_agg

# Handling New Null Values and Aggregations

In [20]:
def process_and_aggregate_data(df_filled_values, df_bureau, df_bureau_balance, df_credit_card_balance, df_installments_payments, df_pos_cash_balance, df_previous_application):
    # Aggregate Bureau Balance Data and Join with Bureau Data
    bureau_balance_agg = aggregate_bureau_balance_data(df_bureau_balance)
    df_bureau = df_bureau.join(bureau_balance_agg, on='SK_ID_BUREAU', how='left')
    
    # Aggregate Bureau Data and Join with Main DataFrame / Drop existing columns before joining to avoid ambiguity
    bureau_agg = aggregate_bureau_data(df_bureau)
    print("Columns to drop before joining bureau_agg:", bureau_agg.columns[1:])
    df_filled_values = df_filled_values.drop(*bureau_agg.columns[1:])
    df_filled_values = df_filled_values.join(bureau_agg, on='SK_ID_CURR', how='left')

    # Aggregate and Join Credit Card Balance Data / Drop existing columns before joining to avoid ambiguity
    credit_card_agg = aggregate_credit_card_balance_data(df_credit_card_balance)
    print("Columns to drop before joining credit_card_agg:", credit_card_agg.columns[1:])
    df_filled_values = df_filled_values.drop(*credit_card_agg.columns[1:])
    df_filled_values = df_filled_values.join(credit_card_agg, on='SK_ID_CURR', how='left')

    # Aggregate and Join Installments Payments Data / Drop existing columns before joining to avoid ambiguity
    installments_agg = aggregate_installments_payments_data(df_installments_payments)
    print("Columns to drop before joining installments_agg:", installments_agg.columns[1:])
    df_filled_values = df_filled_values.drop(*installments_agg.columns[1:])
    df_filled_values = df_filled_values.join(installments_agg, on='SK_ID_CURR', how='left')

    # Aggregate and Join POS Cash Balance Data / Drop existing columns before joining to avoid ambiguity
    pos_cash_agg = aggregate_pos_cash_balance_data(df_pos_cash_balance)
    print("Columns to drop before joining pos_cash_agg:", pos_cash_agg.columns[1:])
    df_filled_values = df_filled_values.drop(*pos_cash_agg.columns[1:])
    df_filled_values = df_filled_values.join(pos_cash_agg, on='SK_ID_CURR', how='left')

    # Aggregate and Join Previous Application Data / Drop existing columns before joining to avoid ambiguity
    previous_application_agg = aggregate_previous_application_data(df_previous_application)
    print("Columns to drop before joining previous_application_agg:", previous_application_agg.columns[1:])
    df_filled_values = df_filled_values.drop(*previous_application_agg.columns[1:])
    df_filled_values = df_filled_values.join(previous_application_agg, on='SK_ID_CURR', how='left')

    return df_filled_values


df_filled_values = process_and_aggregate_data(df_filled_values, df_bureau, df_bureau_balance, df_credit_card_balance, df_installments_payments, df_pos_cash_balance, df_previous_application)

# Fill missing values in the new aggregated columns with mtheir mean values
new_agg_columns = [col for col in df_filled_values.columns if col not in df_combined.columns]
df_filled_values = fill_missing_with_mean(df_filled_values, new_agg_columns)

Columns to drop before joining bureau_agg: ['mean_credit_sum_bureau', 'sum_credit_sum_bureau', 'max_days_credit_bureau', 'mean_days_credit_bureau', 'min_days_credit_bureau', 'bureau_count']
Columns to drop before joining credit_card_agg: ['mean_amt_balance_credit_card', 'sum_amt_balance_credit_card', 'max_amt_balance_credit_card', 'min_amt_balance_credit_card', 'credit_card_count']
Columns to drop before joining installments_agg: ['mean_amt_payment_installments', 'sum_amt_payment_installments', 'max_amt_payment_installments', 'min_amt_payment_installments', 'installments_count']
Columns to drop before joining pos_cash_agg: ['mean_months_balance_pos_cash', 'sum_months_balance_pos_cash', 'max_months_balance_pos_cash', 'min_months_balance_pos_cash', 'pos_cash_count']
Columns to drop before joining previous_application_agg: ['mean_amt_application_previous', 'sum_amt_application_previous', 'max_amt_application_previous', 'min_amt_application_previous', 'previous_application_count']


# Chapter 4 - Machine Learning/Deep Learning Implementations

## Splitting the data into Test and Train 
The dataset will be splitted into test and train regarding their TARGET feature. <br>
If the TARGET is NULL then the Row will be on Test Data. <br>
If the TARGET is Not Null then the Row will be on Train Data.

In [21]:
final_train_df = df_filled_values.filter(df_filled_values.TARGET.isNotNull())
final_test_df = df_filled_values.filter(df_filled_values.TARGET.isNull())

final_train_df.show()

+----------+------+------------------+-----------+------------+---------------+------------+----------------+----------+-----------+---------------+---------------+--------------------+--------------------+--------------------+-----------------+--------------------------+----------+-------------+-----------------+---------------+----------+--------------+---------------+----------------+----------+----------+---------------+--------------------+---------------------------+--------------------------+-----------------------+--------------------+-------------------+-------------------+----------------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+--------------------------+-------------------------+---

## Encoding Categorical Variables

In [22]:
categorical_columns = ['NAME_CONTRACT_TYPE', 'CODE_GENDER', 'FLAG_OWN_CAR', 'FLAG_OWN_REALTY', 
                       'NAME_TYPE_SUITE', 'NAME_INCOME_TYPE', 'NAME_EDUCATION_TYPE', 
                       'NAME_FAMILY_STATUS', 'NAME_HOUSING_TYPE', 'WEEKDAY_APPR_PROCESS_START', 'ORGANIZATION_TYPE']

# StringIndexer and OneHotEncoder for categorical columns
indexers = [StringIndexer(inputCol=col, outputCol=col + '_index', handleInvalid='skip') for col in categorical_columns]
encoders = [OneHotEncoder(inputCol=col + '_index', outputCol=col + '_ohe') for col in categorical_columns]

# Create a pipeline for encoding
pipeline = Pipeline(stages=indexers + encoders)

# Fit and transform the training data
train_df = pipeline.fit(final_train_df).transform(final_train_df)
# Transform the test data using the same fitted pipeline
test_df = pipeline.fit(final_train_df).transform(final_test_df)

# Drop the original and indexed categorical columns
cols_to_drop = categorical_columns + [col + '_index' for col in categorical_columns]
train_df = train_df.drop(*cols_to_drop)
test_df = test_df.drop(*cols_to_drop)

# Cast specific columns to double
columns_to_cast = ['AMT_REQ_CREDIT_BUREAU_HOUR', 'AMT_REQ_CREDIT_BUREAU_DAY', 'AMT_REQ_CREDIT_BUREAU_WEEK',
                   'AMT_REQ_CREDIT_BUREAU_MON', 'AMT_REQ_CREDIT_BUREAU_QRT', 'AMT_REQ_CREDIT_BUREAU_YEAR']

for col_name in columns_to_cast:
    train_df = train_df.withColumn(col_name, col(col_name).cast("double"))
    test_df = test_df.withColumn(col_name, col(col_name).cast("double"))

print("Encoding complete")

Encoding complete


## Assemble Feature Vector

In [23]:
ohe_columns = [col + '_ohe' for col in categorical_columns]
numerical_columns = [col for col in train_df.columns if col not in ohe_columns + ['SK_ID_CURR', 'TARGET']]

print(f"Numerical columns: {numerical_columns}")
print(f"One-hot encoded columns: {ohe_columns}")

# Remove existing 'features' and 'scaled_features' columns if they exist
for col in ['features', 'scaled_features']:
    if col in train_df.columns:
        train_df = train_df.drop(col)
    if col in test_df.columns:
        test_df = test_df.drop(col)

print("assembling feature vector")
assembler = VectorAssembler(inputCols=numerical_columns + ohe_columns, outputCol="features")
train_df = assembler.transform(train_df)
test_df = assembler.transform(test_df)
print("Feature vector assembly complete.")

Numerical columns: ['CNT_CHILDREN', 'AMT_INCOME_TOTAL', 'AMT_CREDIT', 'AMT_ANNUITY', 'AMT_GOODS_PRICE', 'REGION_POPULATION_RELATIVE', 'DAYS_BIRTH', 'DAYS_EMPLOYED', 'DAYS_REGISTRATION', 'DAYS_ID_PUBLISH', 'FLAG_MOBIL', 'FLAG_EMP_PHONE', 'FLAG_WORK_PHONE', 'FLAG_CONT_MOBILE', 'FLAG_PHONE', 'FLAG_EMAIL', 'CNT_FAM_MEMBERS', 'REGION_RATING_CLIENT', 'REGION_RATING_CLIENT_W_CITY', 'HOUR_APPR_PROCESS_START', 'EXT_SOURCE_2', 'EXT_SOURCE_3', 'DAYS_LAST_PHONE_CHANGE', 'FLAG_DOCUMENT_2', 'FLAG_DOCUMENT_3', 'FLAG_DOCUMENT_4', 'FLAG_DOCUMENT_5', 'FLAG_DOCUMENT_6', 'FLAG_DOCUMENT_7', 'FLAG_DOCUMENT_8', 'FLAG_DOCUMENT_9', 'FLAG_DOCUMENT_10', 'FLAG_DOCUMENT_11', 'FLAG_DOCUMENT_12', 'FLAG_DOCUMENT_13', 'FLAG_DOCUMENT_14', 'FLAG_DOCUMENT_15', 'FLAG_DOCUMENT_16', 'FLAG_DOCUMENT_17', 'FLAG_DOCUMENT_18', 'FLAG_DOCUMENT_19', 'FLAG_DOCUMENT_20', 'FLAG_DOCUMENT_21', 'AMT_REQ_CREDIT_BUREAU_HOUR', 'AMT_REQ_CREDIT_BUREAU_DAY', 'AMT_REQ_CREDIT_BUREAU_WEEK', 'AMT_REQ_CREDIT_BUREAU_MON', 'AMT_REQ_CREDIT_BUREAU_QRT'

## Feature Scaling

In [24]:
print("Scaling The features...")
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(train_df)
train_df = scaler_model.transform(train_df)
test_df = scaler_model.transform(test_df)
print("Scaling complete.")

Scaling The features...
Scaling complete.


## Building The Neural Network

In [25]:
# Neural Network Structure

layers = [
    173,           # Number of input features -> Check from the above Statement
    64,            # Hidden layer size
    32,            # Hidden layer size
    2              # Number of classes
]

print(f"Neural network layers: {layers}")

print ("Initialization of  Multilayer Perceptron Classifier")
mlp = MultilayerPerceptronClassifier(
    featuresCol='scaled_features',
    labelCol='TARGET',
    maxIter=100,
    layers=layers,
    blockSize=128,
    seed=1234
)


Neural network layers: [173, 64, 32, 2]
Initialization of  Multilayer Perceptron Classifier


## Train and Evaluate the Model on Training Set

Check for OverFitting/UnderFitting

In [26]:
print("Training the model...")
mlp_model = mlp.fit(train_df)
print("Model training complete.")

print("Making predictions on the training set and checking for Overfitting/Underfitting...")
train_predictions = mlp_model.transform(train_df)

print("Evaluating the model...")
# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol='TARGET', rawPredictionCol='rawPrediction', metricName='areaUnderROC')
auc_train = evaluator.evaluate(train_predictions)
print(f'Training AUC: {auc_train}')

Training the model...
Model training complete.
Making predictions on the training set and checking for Overfitting/Underfitting...
Evaluating the model...
Training AUC: 0.7372328302746267


## Make Predictions on the Test Set

In [27]:
print("Making predictions on the test set...")
# Make predictions on the test set 
test_predictions = mlp_model.transform(test_df)

# Show the predictions
test_predictions.select('SK_ID_CURR', 'prediction', 'probability').show()
test_predictions.show()

Making predictions on the test set...
+----------+----------+--------------------+
|SK_ID_CURR|prediction|         probability|
+----------+----------+--------------------+
|    100005|       0.0|[0.73998112303605...|
|    100042|       0.0|[0.97258686943522...|
|    100074|       0.0|[0.94627385290222...|
|    100170|       0.0|[0.91956905106258...|
|    100446|       0.0|[0.87987495561849...|
|    100447|       0.0|[0.97306517114034...|
|    100517|       0.0|[0.96662251605572...|
|    100592|       0.0|[0.97427596428720...|
|    100618|       0.0|[0.95579932249216...|
|    100711|       0.0|[0.91985896543316...|
|    100740|       0.0|[0.88138574058945...|
|    100797|       0.0|[0.91552717174622...|
|    100826|       0.0|[0.95675314334162...|
|    100836|       0.0|[0.73108613393753...|
|    100872|       0.0|[0.97264552355821...|
|    101055|       0.0|[0.90396919349455...|
|    101090|       0.0|[0.97322784196327...|
|    101128|       0.0|[0.96524994740616...|
|    101244|     

## Prepare the Submittion File

In [28]:
def delete_directory(path):
    if os.path.exists(path):
        shutil.rmtree(path)

# Prepare the submission file
submission = test_predictions.select('SK_ID_CURR', 'prediction').orderBy('SK_ID_CURR', ascending=True)
submission = submission.withColumnRenamed("prediction", "TARGET")

# Write the DataFrame to a temporary directory and then Merge + Save to a new one named final
temp_path = './temp_prediction'
final_path = './final_prediction.csv'

delete_directory(temp_path)
delete_directory(final_path)

submission.write.csv(temp_path, header=True)
combined_df = spark.read.csv(temp_path, header=True, inferSchema=True)
combined_df = combined_df.repartition(1)
combined_df.write.csv(final_path, header=True)

print("Single CSV file saved.")

Single CSV file saved.
