## Set up Spark Session

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CreditCardApprovalPrediction") \
    .getOrCreate()

24/12/07 15:59:45 WARN Utils: Your hostname, omar-HP-Laptop-15-dw2xxx resolves to a loopback address: 127.0.1.1; using 192.168.1.21 instead (on interface wlo1)
24/12/07 15:59:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/07 15:59:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/07 15:59:48 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Ingest the Data

In [2]:

# Define file paths
path_to_application = 'data/raw/application_record.csv'
path_to_credit = 'data/raw/credit_record.csv'

# Ingest data into Spark DataFrames
applicant_data_spark = spark.read.option("header", "true").csv(path_to_application, inferSchema=True)
credit_data_spark = spark.read.option("header", "true").csv(path_to_credit, inferSchema=True)

# Show first few rows for verification
applicant_data_sparsample_datak.show(5)
credit_data_spark.show(5)


                                                                                

+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|    NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|  NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|
+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+
|5008804|          M|           Y|              Y|           0|        427500.0|             Working|    Higher education|      Civil marriage| Rented apartment|    -12005|        -4542|         1

## Data Cleaning and Transformation

In [3]:
from pyspark.sql.functions import col, when, count, lit, min, max, abs

# Drop duplicates
applicant_data_spark = applicant_data_spark.dropDuplicates(['ID'])

# Mode calculation for 'STATUS'
from pyspark.sql import functions as F

# Group by 'ID' and calculate mode (most frequent value) for 'STATUS'
status_mode_df = credit_data_spark.groupBy('ID').agg(
    F.expr("first(STATUS)").alias("mode_status")  # Simplified mode calculation in Spark
)

# Filter out 'X' values
status_mode_df = status_mode_df.filter(status_mode_df['mode_status'] != 'X')

# Merge applicant data with status mode
merged_spark_df = applicant_data_spark.join(status_mode_df, on="ID", how="inner")

# Apply the label encoding logic to 'mode_status'
merged_spark_df = merged_spark_df.withColumn(
    "label", when(merged_spark_df["mode_status"].isin("0", "1", "2", "3", "4", "5"), 0)
    .when(merged_spark_df["mode_status"] == "C", 1)
    .otherwise(lit(None))
)
merged_spark_df = merged_spark_df.join(credit_data_spark, on="ID", how="left")


# Show the DataFrame after transformations
merged_spark_df.show(5)


24/12/07 15:59:59 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/12/07 16:00:02 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

+-------+-----------+------------+---------------+------------+----------------+----------------+--------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+-----------+-----+--------------+------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|mode_status|label|MONTHS_BALANCE|STATUS|
+-------+-----------+------------+---------------+------------+----------------+----------------+--------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+-----------+-----+--------------+------+
|5008806|          M|           Y|              Y|           0|        112500.0|         Worki

## One-Hot Encoding and Ordinal Encoding

In [4]:
from pyspark.ml.feature import StringIndexer

# Ordinal Encoding for categorical columns
categorical_ordinal_columns = ['CODE_GENDER', 'FLAG_OWN_CAR', 'FLAG_OWN_REALTY']

indexers = [StringIndexer(inputCol=col, outputCol=col + "_encoded") for col in categorical_ordinal_columns]

# Apply StringIndexer
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=indexers)
indexed_df = pipeline.fit(merged_spark_df).transform(merged_spark_df)

indexed_df.show(5)


                                                                                

+-------+-----------+------------+---------------+------------+----------------+----------------+--------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+-----------+-----+--------------+------+-------------------+--------------------+-----------------------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|mode_status|label|MONTHS_BALANCE|STATUS|CODE_GENDER_encoded|FLAG_OWN_CAR_encoded|FLAG_OWN_REALTY_encoded|
+-------+-----------+------------+---------------+------------+----------------+----------------+--------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+-----

In [5]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# One-hot encoding for categorical columns
categorical_onehot_columns = ['NAME_INCOME_TYPE', 'NAME_EDUCATION_TYPE', 'NAME_FAMILY_STATUS', 'NAME_HOUSING_TYPE']

# First, use StringIndexer to convert to numeric values
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index") for col in categorical_onehot_columns]
indexer_pipeline = Pipeline(stages=indexers)
indexed_onehot_df = indexer_pipeline.fit(merged_spark_df).transform(merged_spark_df)

# Now apply OneHotEncoder
encoder = OneHotEncoder(inputCols=[col + "_index" for col in categorical_onehot_columns], 
                        outputCols=[col + "_onehot" for col in categorical_onehot_columns])

encoded_df = encoder.fit(indexed_onehot_df).transform(indexed_onehot_df)
encoded_df.show(5)




+-------+-----------+------------+---------------+------------+----------------+----------------+--------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+-----------+-----+--------------+------+----------------------+-------------------------+------------------------+-----------------------+-----------------------+--------------------------+-------------------------+------------------------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|mode_status|label|MONTHS_BALANCE|STATUS|NAME_INCOME_TYPE_index|NAME_EDUCATION_TYPE_index|NAME_FAMILY_STATUS_index|NAME_HOUSING_TYPE_index|NAME_INCOME_TYPE_onehot|NAME_EDUCATION_TYPE_onehot|NAME_FAMILY_STATUS_onehot|NAME_HOUSING_TYPE_oneh

                                                                                

## Feature Engineering

In [6]:
# Custom feature engineering
merged_spark_df = merged_spark_df.withColumn("AGE", abs(merged_spark_df['DAYS_BIRTH']) / 365)
merged_spark_df = merged_spark_df.withColumn("YEARS_EMPLOYED", abs(merged_spark_df['DAYS_EMPLOYED']) / 365)

# Drop unnecessary columns
merged_spark_df = merged_spark_df.drop('DAYS_BIRTH', 'DAYS_EMPLOYED')

# Create new features (e.g., 'INCOME_PER_FAM_MEMBER')
merged_spark_df = merged_spark_df.withColumn("INCOME_PER_FAM_MEMBER", merged_spark_df['AMT_INCOME_TOTAL'] / merged_spark_df['CNT_FAM_MEMBERS'])


# Feature engineering for 'CREDIT_HISTORY_LENGTH' and 'RECENT_ACTIVITY'
credit_history_length = merged_spark_df.groupBy('ID').agg(
    (max('MONTHS_BALANCE') - min('MONTHS_BALANCE')).alias('CREDIT_HISTORY_LENGTH')
)
recent_activity_flag = merged_spark_df.groupBy('ID').agg(
    (F.expr("max(CASE WHEN MONTHS_BALANCE >= -4 THEN 1 ELSE 0 END)")).alias('RECENT_ACTIVITY')
)

# Join these features back into the main dataframe
merged_spark_df = merged_spark_df.join(credit_history_length, on="ID", how="left")
merged_spark_df = merged_spark_df.join(recent_activity_flag, on="ID", how="left")

merged_spark_df.show(5)


[Stage 111:>                                                        (0 + 1) / 1]

+-------+-----------+------------+---------------+------------+----------------+----------------+--------------------+------------------+-----------------+----------+---------------+----------+----------+---------------+---------------+-----------+-----+--------------+------+-----------------+-----------------+---------------------+---------------------+---------------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|mode_status|label|MONTHS_BALANCE|STATUS|              AGE|   YEARS_EMPLOYED|INCOME_PER_FAM_MEMBER|CREDIT_HISTORY_LENGTH|RECENT_ACTIVITY|
+-------+-----------+------------+---------------+------------+----------------+----------------+--------------------+------------------+-----------------+----------+---------------+----------+----------+---------------+---------------+-----------+-----+

                                                                                

## Data Preparation (Train/Test Split)

In [8]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

# Select features for modeling
selected_features = ['INCOME_PER_FAM_MEMBER', 'AMT_INCOME_TOTAL', 'YEARS_EMPLOYED', 'AGE', 'CREDIT_HISTORY_LENGTH', 'RECENT_ACTIVITY']

# Create a VectorAssembler to combine features into a single vector column
assembler = VectorAssembler(inputCols=selected_features, outputCol="features")

# Apply VectorAssembler to prepare features
feature_df = assembler.transform(merged_spark_df)

# Split data into training and test sets (70% train, 30% test)
train_data, test_data = feature_df.randomSplit([0.7, 0.3], seed=42)

# Standardize the features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(train_data)
train_data_scaled = scaler_model.transform(train_data)
test_data_scaled = scaler_model.transform(test_data)
scaler_model.save("scaler")
train_data_scaled.show(5)


[Stage 161:>                                                        (0 + 1) / 1]

+-------+-----------+------------+---------------+------------+----------------+----------------+--------------------+------------------+-----------------+----------+---------------+----------+----------+---------------+---------------+-----------+-----+--------------+------+-----------------+-----------------+---------------------+---------------------+---------------+--------------------+--------------------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|mode_status|label|MONTHS_BALANCE|STATUS|              AGE|   YEARS_EMPLOYED|INCOME_PER_FAM_MEMBER|CREDIT_HISTORY_LENGTH|RECENT_ACTIVITY|            features|     scaled_features|
+-------+-----------+------------+---------------+------------+----------------+----------------+--------------------+------------------+-----------------+----------+----

                                                                                

### save processed data

In [10]:
# Define the paths where you want to save the processed data
train_data_path = "data/processed/train_data_scaled.parquet"
test_data_path = "data/processed/test_data_scaled.parquet"

# Save train_data_scaled as a Parquet file
train_data_scaled.write.parquet(train_data_path)
print(f"train_data_scaled saved to: {train_data_path}")

# Save test_data_scaled as a Parquet file
test_data_scaled.write.parquet(test_data_path)
print(f"test_data_scaled saved to: {test_data_path}")

                                                                                

train_data_scaled saved to: data/processed/train_data_scaled.parquet




test_data_scaled saved to: data/processed/test_data_scaled.parquet


                                                                                

## Model Evaluation

In [12]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from sklearn.metrics import classification_report
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Train Logistic Regression Model
lr = LogisticRegression(featuresCol='scaled_features', labelCol='label')
lr_model = lr.fit(train_data_scaled)

# Predictions
predictions = lr_model.transform(test_data_scaled)

# Convert Spark DataFrame to Pandas DataFrame
predictions_pd = predictions.select("label", "prediction").toPandas()

# Classification Report
y_true = predictions_pd['label']
y_pred = predictions_pd['prediction']
print("Classification Report:")
print(classification_report(y_true, y_pred))

# AUC Score
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction")
roc_auc = evaluator.evaluate(predictions)
print(f"ROC AUC Score: {roc_auc}")


24/12/07 10:09:41 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

Classification Report:
              precision    recall  f1-score   support

           0       0.70      0.62      0.66      2551
           1       0.83      0.87      0.85      5251

    accuracy                           0.79      7802
   macro avg       0.76      0.75      0.75      7802
weighted avg       0.78      0.79      0.78      7802



                                                                                

ROC AUC Score: 0.745055934166765


In [11]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Define the Random Forest Model
rf = RandomForestClassifier(featuresCol='scaled_features', labelCol='label', numTrees=100, maxDepth=10)

# Fit the model to the training data
rf_model = rf.fit(train_data_scaled)

# Generate predictions on the test data
rf_predictions = rf_model.transform(test_data_scaled)

# Evaluate using BinaryClassificationEvaluator (AUC)
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction")
rf_auc = evaluator.evaluate(rf_predictions)
print(f"Random Forest AUC Score: {rf_auc}")

# Convert predictions to Pandas DataFrame for classification report
rf_predictions_pd = rf_predictions.select("label", "prediction").toPandas()

from sklearn.metrics import classification_report
y_true_rf = rf_predictions_pd['label']
y_pred_rf = rf_predictions_pd['prediction']

print("Random Forest Classification Report:")
print(classification_report(y_true_rf, y_pred_rf))

24/12/07 16:05:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/12/07 16:05:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/12/07 16:05:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/12/07 16:05:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/12/07 16:05:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/12/07 16:05:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/12/07 16:05:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/12/07 16:05:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/12/07 16:05:57 WARN RowBasedKeyValueBatch: Calling spill() on

Random Forest AUC Score: 0.646850914468174


24/12/07 16:08:59 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
24/12/07 16:09:00 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/12/07 16:09:00 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/12/07 16:09:00 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/12/07 16:09:00 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/12/07 16:09:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/12/07 16:09:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                

Random Forest Classification Report:
              precision    recall  f1-score   support

           0       0.75      0.32      0.45     34098
           1       0.86      0.98      0.92    150697

    accuracy                           0.85    184795
   macro avg       0.81      0.65      0.68    184795
weighted avg       0.84      0.85      0.83    184795



### Save the best model

In [12]:
model_path = "rf_model"  
rf_model.save(model_path)
print(f"RandomForest model saved to: {model_path}")

24/12/07 16:10:40 WARN TaskSetManager: Stage 353 contains a task of very large size (1459 KiB). The maximum recommended task size is 1000 KiB.
[Stage 353:>                                                        (0 + 4) / 4]

RandomForest model saved to: rf_model


                                                                                

## Test the import

model import

In [13]:
from pyspark.ml.classification import RandomForestClassificationModel

rf_model = RandomForestClassificationModel.load(model_path)
print("RandomForest model loaded successfully!")


[Stage 363:>                                                        (0 + 1) / 1]

RandomForest model loaded successfully!


                                                                                

data import

In [14]:
test_data_path = "data/processed/test_data_scaled.parquet"
test_data = spark.read.parquet(test_data_path)
test_data.show(5)

+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------+----------+---------------+----------+----------+---------------+---------------+-----------+-----+--------------+------+-----------------+----------------+---------------------+---------------------+---------------+--------------------+--------------------+
|     ID|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|    NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|  NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|FLAG_MOBIL|FLAG_WORK_PHONE|FLAG_PHONE|FLAG_EMAIL|OCCUPATION_TYPE|CNT_FAM_MEMBERS|mode_status|label|MONTHS_BALANCE|STATUS|              AGE|  YEARS_EMPLOYED|INCOME_PER_FAM_MEMBER|CREDIT_HISTORY_LENGTH|RECENT_ACTIVITY|            features|     scaled_features|
+-------+-----------+------------+---------------+------------+----------------+--------------------+--------------------+--------------------+-----------------

scaler import 

In [15]:
from pyspark.ml.feature import StandardScalerModel
scaler = StandardScalerModel.load("scaler")
print("scaler loaded successfully!")

scaler loaded successfully!


In [16]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

rf_predictions = rf_model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction")
rf_auc = evaluator.evaluate(rf_predictions)
print(f"Random Forest AUC Score: {rf_auc}")
rf_predictions_pd = rf_predictions.select("label", "prediction").toPandas()

from sklearn.metrics import classification_report
y_true_rf = rf_predictions_pd['label']
y_pred_rf = rf_predictions_pd['prediction']

print("Random Forest Classification Report:")
print(classification_report(y_true_rf, y_pred_rf))

24/12/07 16:14:22 WARN DAGScheduler: Broadcasting large task binary with size 6.0 MiB
                                                                                

Random Forest AUC Score: 0.646850914468174


24/12/07 16:14:27 WARN DAGScheduler: Broadcasting large task binary with size 6.0 MiB
                                                                                

Random Forest Classification Report:
              precision    recall  f1-score   support

           0       0.75      0.32      0.45     34098
           1       0.86      0.98      0.92    150697

    accuracy                           0.85    184795
   macro avg       0.81      0.65      0.68    184795
weighted avg       0.84      0.85      0.83    184795

