In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier, DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline



In [2]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Loan Default Prediction") \
    .getOrCreate()

In [4]:
# Load data



train_df = spark.read.csv("Training Data.csv", header=True, inferSchema=True)
train_df.show()

+---+-------+---+----------+--------------+---------------+-------------+--------------------+-------------------+--------------+---------------+-----------------+---------+
| Id| Income|Age|Experience|Married/Single|House_Ownership|Car_Ownership|          Profession|               CITY|         STATE|CURRENT_JOB_YRS|CURRENT_HOUSE_YRS|Risk_Flag|
+---+-------+---+----------+--------------+---------------+-------------+--------------------+-------------------+--------------+---------------+-----------------+---------+
|  1|1303834| 23|         3|        single|         rented|           no| Mechanical_engineer|               Rewa|Madhya_Pradesh|              3|               13|        0|
|  2|7574516| 40|        10|        single|         rented|           no|  Software_Developer|           Parbhani|   Maharashtra|              9|               13|        0|
|  3|3991815| 66|         4|       married|         rented|           no|    Technical_writer|          Alappuzha|        Kerala| 

In [5]:
# EDA
train_df.describe().show()
train_df.printSchema()

+-------+-----------------+-----------------+------------------+------------------+--------------+---------------+-------------+--------------------+-----------+--------------+------------------+------------------+------------------+
|summary|               Id|           Income|               Age|        Experience|Married/Single|House_Ownership|Car_Ownership|          Profession|       CITY|         STATE|   CURRENT_JOB_YRS| CURRENT_HOUSE_YRS|         Risk_Flag|
+-------+-----------------+-----------------+------------------+------------------+--------------+---------------+-------------+--------------------+-----------+--------------+------------------+------------------+------------------+
|  count|           252000|           252000|            252000|            252000|        252000|         252000|       252000|              252000|     252000|        252000|            252000|            252000|            252000|
|   mean|         126000.5|4997116.665325397| 49.95407142857143|

In [10]:
train_df.dtypes

[('Income', 'int'),
 ('Age', 'int'),
 ('Experience', 'int'),
 ('Married/Single', 'string'),
 ('House_Ownership', 'string'),
 ('Car_Ownership', 'string'),
 ('Profession', 'string'),
 ('CITY', 'string'),
 ('STATE', 'string'),
 ('CURRENT_JOB_YRS', 'int'),
 ('CURRENT_HOUSE_YRS', 'int'),
 ('Risk_Flag', 'int')]

In [7]:
train_df = train_df.drop('id')

In [8]:
# Split the data into training and test sets (80% training, 20% test)
train_data, test_data = train_df.randomSplit([0.8, 0.2], seed=1234)


In [9]:
categorical_cols = [col for col,type in train_df.dtypes if type=='string']
numerical_cols = [col for col,type in train_df.dtypes if type=='int' and col !='Risk_Flag']

print(categorical_cols)
print(numerical_cols)


['Married/Single', 'House_Ownership', 'Car_Ownership', 'Profession', 'CITY', 'STATE']
['Income', 'Age', 'Experience', 'CURRENT_JOB_YRS', 'CURRENT_HOUSE_YRS']


In [11]:

indexers = []
encoders = []
encoded_feature_cols = []

# Create indexers and encoders for each categorical feature
for feature in categorical_cols:
    indexer = StringIndexer(inputCol=feature, outputCol=f"{feature}_index")
    encoder = OneHotEncoder(inputCol=f"{feature}_index", outputCol=f"{feature}_ohe")
    indexers.append(indexer)
    encoders.append(encoder)
    encoded_feature_cols.append(f"{feature}_ohe")
encoded_feature_cols

['Married/Single_ohe',
 'House_Ownership_ohe',
 'Car_Ownership_ohe',
 'Profession_ohe',
 'CITY_ohe',
 'STATE_ohe']

In [12]:
encoded_feature_cols.extend(numerical_cols)
encoded_feature_cols

['Married/Single_ohe',
 'House_Ownership_ohe',
 'Car_Ownership_ohe',
 'Profession_ohe',
 'CITY_ohe',
 'STATE_ohe',
 'Income',
 'Age',
 'Experience',
 'CURRENT_JOB_YRS',
 'CURRENT_HOUSE_YRS']

In [13]:

# Create a combined feature vector
assembler = VectorAssembler(inputCols=encoded_feature_cols, outputCol="features")

# scaling
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")


In [14]:
#define different models
lr = LogisticRegression(featuresCol='features', labelCol='Risk_Flag',rawPredictionCol='rawPrediction')
dt = DecisionTreeClassifier(featuresCol='features', labelCol='Risk_Flag', rawPredictionCol='rawPrediction')
rf = RandomForestClassifier(featuresCol='features', labelCol='Risk_Flag', rawPredictionCol='rawPrediction')
gb = GBTClassifier(featuresCol='features', labelCol='Risk_Flag')

models = [lr, dt, rf,gb]
model_names = ['Logistic Regression', 'Decision Tree', 'Random Forest','Gradient boost']


In [15]:
evaluator = BinaryClassificationEvaluator(labelCol='Risk_Flag', rawPredictionCol='rawPrediction', metricName='areaUnderROC')


In [16]:
def get_best_model(models,model_names,train_data,test_data):
    
    best_model = None
    best_model_name = None
    best_roc_auc = 0.0
    
    for model, model_name in zip(models, model_names):
        # Create a pipeline with the indexers, encoders, assembler, and model
        pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler, model])
        
        # Fit the pipeline model on the training data
        pipeline_model = pipeline.fit(train_data)
        
        # Make predictions on the test data
        predictions = pipeline_model.transform(test_data)
        
        # Evaluate the model
        roc_auc = evaluator.evaluate(predictions)
        print(f"ROC-AUC for {model_name}: {roc_auc}")
        
        # Update the best model if the current model's ROC-AUC is higher
        if roc_auc > best_roc_auc:
            best_model = pipeline_model
            best_model_name = model_name
            best_roc_auc = roc_auc
    
    print(f"Best Model: {best_model_name} with ROC-AUC: {best_roc_auc}")

    return best_model,best_model_name

In [17]:
best_model,best_model_name = get_best_model(models,model_names,train_data,test_data)
best_predictions = best_model.transform(test_data)
best_predictions.show()

ROC-AUC for Logistic Regression: 0.6247782472208662
ROC-AUC for Decision Tree: 0.475568605520929
ROC-AUC for Random Forest: 0.5778936615595347
ROC-AUC for Gradient boost: 0.6336409305218053
Best Model: Gradient boost with ROC-AUC: 0.6336409305218053
+------+---+----------+--------------+---------------+-------------+-------------+---------------+--------------+---------------+-----------------+---------+--------------------+---------------------+-------------------+----------------+----------+-----------+------------------+-------------------+-----------------+---------------+-----------------+---------------+--------------------+--------------------+--------------------+--------------------+----------+
|Income|Age|Experience|Married/Single|House_Ownership|Car_Ownership|   Profession|           CITY|         STATE|CURRENT_JOB_YRS|CURRENT_HOUSE_YRS|Risk_Flag|Married/Single_index|House_Ownership_index|Car_Ownership_index|Profession_index|CITY_index|STATE_index|Married/Single_ohe|House_Ow

In [19]:
best_predictions.toPandas().to_csv("predictions.csv")

In [20]:
def get_feature_importance(best_model,best_model_name):
    # Get feature importances if the best model is DecisionTree or RandomForest or Gradient boost
    if best_model_name in ['Decision Tree', 'Random Forest','Gradient boost']:
        # Extract the actual classifier model from the pipeline
        classifier_model = best_model.stages[-1]
        importances = classifier_model.featureImportances
        feature_importance = [(name, importance) for name, importance in zip(encoded_feature_cols, importances)]
        feature_importance.sort(key=lambda x: x[1], reverse=True)  # Sort by importance
        print(f"Feature importances for {best_model_name}:")
        for feature, importance in feature_importance:
            print(f"{feature}: {importance}")

    elif best_model_name == 'Logistic Regression':
        # For Logistic Regression, use the coefficients as feature importance
        classifier_model = best_model.stages[-1]
        coefficients = classifier_model.coefficients
        feature_importance = [(name, abs(coef)) for name, coef in zip(encoded_feature_cols, coefficients)]
        feature_importance.sort(key=lambda x: x[1], reverse=True)  # Sort by absolute coefficient value
        print(f"Feature importances for {best_model_name}:")
        for feature, importance in feature_importance:
            print(f"{feature}: {importance}")

In [21]:
get_feature_importance(best_model,best_model_name)

Feature importances for Gradient boost:
Profession_ohe: 0.020506304463426567
Married/Single_ohe: 0.018200832736264414
Car_Ownership_ohe: 0.007125909228949184
Income: 0.004932425579426436
House_Ownership_ohe: 0.0046896154779993305
CITY_ohe: 0.0008982305430057387
STATE_ohe: 0.0004909574796331443
Age: 0.00038890263716537656
Experience: 0.00032139821933720094
CURRENT_HOUSE_YRS: 1.2080082634772259e-08
CURRENT_JOB_YRS: 0.0


In [28]:
#train_df01 = train_data.show()

train_data.describe().show()

+-------+------------------+------------------+------------------+--------------+---------------+-------------+--------------------+-----------+--------------+------------------+------------------+-------------------+
|summary|            Income|               Age|        Experience|Married/Single|House_Ownership|Car_Ownership|          Profession|       CITY|         STATE|   CURRENT_JOB_YRS| CURRENT_HOUSE_YRS|          Risk_Flag|
+-------+------------------+------------------+------------------+--------------+---------------+-------------+--------------------+-----------+--------------+------------------+------------------+-------------------+
|  count|            201842|            201842|            201842|        201842|         201842|       201842|              201842|     201842|        201842|            201842|            201842|             201842|
|   mean| 4997859.481168439|49.949524875893026|10.078868620009711|          NULL|           NULL|         NULL|                N