# **Loan approval analysis in PySpark**

Import modules

In [35]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnan, isnull, count, avg, round, trim, lower, expr, lit
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

Initialize Spark session

In [36]:
spark = SparkSession.builder.appName("LoanApprovalAnalysis").getOrCreate()

Load dataset

In [37]:
file_path = "/content/loan_approval_dataset.csv" # https://www.kaggle.com/datasets/architsharma01/loan-approval-prediction-dataset/data
df = spark.read.option("header", True).option("inferSchema", True).csv(file_path)

## **Data quality check**

In [38]:
df.printSchema()

root
 |-- loan_id: integer (nullable = true)
 |--  no_of_dependents: integer (nullable = true)
 |--  education: string (nullable = true)
 |--  self_employed: string (nullable = true)
 |--  income_annum: integer (nullable = true)
 |--  loan_amount: integer (nullable = true)
 |--  loan_term: integer (nullable = true)
 |--  cibil_score: integer (nullable = true)
 |--  residential_assets_value: integer (nullable = true)
 |--  commercial_assets_value: integer (nullable = true)
 |--  luxury_assets_value: integer (nullable = true)
 |--  bank_asset_value: integer (nullable = true)
 |--  loan_status: string (nullable = true)



In [39]:
df.show(5)

+-------+-----------------+-------------+--------------+-------------+------------+----------+------------+-------------------------+------------------------+--------------------+-----------------+------------+
|loan_id| no_of_dependents|    education| self_employed| income_annum| loan_amount| loan_term| cibil_score| residential_assets_value| commercial_assets_value| luxury_assets_value| bank_asset_value| loan_status|
+-------+-----------------+-------------+--------------+-------------+------------+----------+------------+-------------------------+------------------------+--------------------+-----------------+------------+
|      1|                2|     Graduate|            No|      9600000|    29900000|        12|         778|                  2400000|                17600000|            22700000|          8000000|    Approved|
|      2|                0| Not Graduate|           Yes|      4100000|    12200000|         8|         417|                  2700000|                 220000

Count missing values per column

In [40]:
df.select([count(when(isnull(c) | isnan(c), c)).alias(c) for c in df.columns]).show()

+-------+-----------------+----------+--------------+-------------+------------+----------+------------+-------------------------+------------------------+--------------------+-----------------+------------+
|loan_id| no_of_dependents| education| self_employed| income_annum| loan_amount| loan_term| cibil_score| residential_assets_value| commercial_assets_value| luxury_assets_value| bank_asset_value| loan_status|
+-------+-----------------+----------+--------------+-------------+------------+----------+------------+-------------------------+------------------------+--------------------+-----------------+------------+
|      0|                0|         0|             0|            0|           0|         0|           0|                        0|                       0|                   0|                0|           0|
+-------+-----------------+----------+--------------+-------------+------------+----------+------------+-------------------------+------------------------+-------------

In [41]:
total_rows = df.count()
print(f"Total rows: {total_rows}")

Total rows: 4269


In [42]:
print("Loan status distribution:")
df.groupBy(' loan_status').count().show()

Loan status distribution:
+------------+-----+
| loan_status|count|
+------------+-----+
|    Approved| 2656|
|    Rejected| 1613|
+------------+-----+



## **Data cleaning & preprocessing**

Trim column names

In [43]:
old_columns = df.columns
new_columns = [col_name.strip() for col_name in old_columns]

for old_name, new_name in zip(old_columns, new_columns):
    df = df.withColumnRenamed(old_name, new_name)

Normalize text columns

In [44]:
critical_columns = ["income_annum", "loan_amount", "loan_term", "cibil_score"]
df_clean = df.na.drop(subset=critical_columns)

text_cols = [c for c in ("loan_status", "self_employed", "education") if c in df_clean.columns]
for c in text_cols:
    df_clean = df_clean.withColumn(c, trim(lower(col(c))))

Standarize categorical encodings

In [45]:
df_clean = df_clean.withColumn("self_employed_numeric", when(col("self_employed") == "yes", 1).otherwise(0))
df_clean = df_clean.withColumn("education_numeric", when(col("education") == "graduate", 1).otherwise(0))


Clean and map loan_status to label (Approved=1, Rejected=0)

In [46]:
df_clean = df_clean.withColumn('label', when(col('loan_status') == 'approved', 1.0)
                                               .when(col('loan_status') == 'rejected', 0.0)
                                               .otherwise(None))


Feature engineering: loan-to-income ratio

In [47]:
df_clean = df_clean.withColumn("loan_to_income_ratio", round(col("loan_amount") / col("income_annum"), 4))

In [48]:
print("Schema after preprocessing:")
df_clean.printSchema()

Schema after preprocessing:
root
 |-- loan_id: integer (nullable = true)
 |-- no_of_dependents: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- self_employed: string (nullable = true)
 |-- income_annum: integer (nullable = true)
 |-- loan_amount: integer (nullable = true)
 |-- loan_term: integer (nullable = true)
 |-- cibil_score: integer (nullable = true)
 |-- residential_assets_value: integer (nullable = true)
 |-- commercial_assets_value: integer (nullable = true)
 |-- luxury_assets_value: integer (nullable = true)
 |-- bank_asset_value: integer (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- self_employed_numeric: integer (nullable = false)
 |-- education_numeric: integer (nullable = false)
 |-- label: double (nullable = true)
 |-- loan_to_income_ratio: double (nullable = true)



In [49]:
print("Sample after preprocessing:")
df_clean.select(df_clean.columns).show(5)

Sample after preprocessing:
+-------+----------------+------------+-------------+------------+-----------+---------+-----------+------------------------+-----------------------+-------------------+----------------+-----------+---------------------+-----------------+-----+--------------------+
|loan_id|no_of_dependents|   education|self_employed|income_annum|loan_amount|loan_term|cibil_score|residential_assets_value|commercial_assets_value|luxury_assets_value|bank_asset_value|loan_status|self_employed_numeric|education_numeric|label|loan_to_income_ratio|
+-------+----------------+------------+-------------+------------+-----------+---------+-----------+------------------------+-----------------------+-------------------+----------------+-----------+---------------------+-----------------+-----+--------------------+
|      1|               2|    graduate|           no|     9600000|   29900000|       12|        778|                 2400000|               17600000|           22700000|     

## **Exploratory aggregations**

Aggregate loan stats by status and education

In [50]:
agg_df = df_clean.groupBy("loan_status", "education") \
    .agg(
    round(avg("loan_amount"), 2).alias("avg_loan_amount"),
    round(avg("cibil_score"), 2).alias("avg_cibil_score"),
    count("loan_id").alias("count_loans")
  ).orderBy("loan_status", "education")


print("Aggregated statistics:")
agg_df.show()

Aggregated statistics:
+-----------+------------+---------------+---------------+-----------+
|loan_status|   education|avg_loan_amount|avg_cibil_score|count_loans|
+-----------+------------+---------------+---------------+-----------+
|   approved|    graduate|  1.523420463E7|         700.45|       1339|
|   approved|not graduate|  1.526051632E7|         706.53|       1317|
|   rejected|    graduate|  1.522074534E7|         430.62|        805|
|   rejected|not graduate|  1.467240099E7|         428.32|        808|
+-----------+------------+---------------+---------------+-----------+



Window functions: ranking incomes per self_employed status

In [51]:
windowSpec = Window.partitionBy("self_employed_numeric").orderBy(col("income_annum").desc())
df_ranked = df_clean.withColumn("income_rank", expr("rank() over (partition by self_employed_numeric order by income_annum desc)"))


print("Top 10 incomes per self_employed group:")
df_ranked.select("loan_id", "self_employed_numeric", "income_annum", "income_rank").show(10)

Top 10 incomes per self_employed group:
+-------+---------------------+------------+-----------+
|loan_id|self_employed_numeric|income_annum|income_rank|
+-------+---------------------+------------+-----------+
|    715|                    0|     9900000|          1|
|   1028|                    0|     9900000|          1|
|   1132|                    0|     9900000|          1|
|   1180|                    0|     9900000|          1|
|   1273|                    0|     9900000|          1|
|   1769|                    0|     9900000|          1|
|   1806|                    0|     9900000|          1|
|   1891|                    0|     9900000|          1|
|   1966|                    0|     9900000|          1|
|   2005|                    0|     9900000|          1|
+-------+---------------------+------------+-----------+
only showing top 10 rows



In [52]:
df_ranked.select("loan_id", "income_annum", "loan_amount", "loan_to_income_ratio").show(10)

+-------+------------+-----------+--------------------+
|loan_id|income_annum|loan_amount|loan_to_income_ratio|
+-------+------------+-----------+--------------------+
|      1|     9600000|   29900000|              3.1146|
|      2|     4100000|   12200000|              2.9756|
|      3|     9100000|   29700000|              3.2637|
|      4|     8200000|   30700000|              3.7439|
|      5|     9800000|   24200000|              2.4694|
|      6|     4800000|   13500000|              2.8125|
|      7|     8700000|   33000000|              3.7931|
|      8|     5700000|   15000000|              2.6316|
|      9|      800000|    2200000|                2.75|
|     10|     1100000|    4300000|              3.9091|
+-------+------------+-----------+--------------------+
only showing top 10 rows



## **Predictive modelling - logistic regression**

Assemble feature columns into a feature vector

In [53]:
feature_cols = [
    "income_annum", "loan_amount", "loan_term", "cibil_score",
    "self_employed_numeric", "education_numeric", "loan_to_income_ratio",
    "residential_assets_value", "commercial_assets_value",
    "luxury_assets_value", "bank_asset_value"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
df_features = assembler.transform(df_ranked)


Split data into train/test

In [54]:
train_data, test_data = df_features.randomSplit([0.7, 0.3], seed=42)
print(f"Training rows: {train_data.count()}, test rows: {test_data.count()}")

Training rows: 3057, test rows: 1212


Build logistic regression model

In [55]:
lr = LogisticRegression(featuresCol='features', labelCol='label', maxIter=50)
lr_model = lr.fit(train_data)

Predict on test

In [56]:
lr_pred = lr_model.transform(test_data)

Evaluate model with AUC, accuracy, precision, recall, F1

In [57]:
bce = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='rawPrediction', metricName='areaUnderROC')

mce_acc = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
mce_precision = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='weightedPrecision')
mce_recall = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='weightedRecall')
mce_f1 = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='f1')

auc_lr = bce.evaluate(lr_pred)
acc_lr = mce_acc.evaluate(lr_pred)
prec_lr = mce_precision.evaluate(lr_pred)
recall_lr = mce_recall.evaluate(lr_pred)
f1_lr = mce_f1.evaluate(lr_pred)

In [58]:
print('Logistic regression evaluation:')
print(f'AUC: {auc_lr:.4f}\nAccuracy: {acc_lr:.4f}\nPrecision: {prec_lr:.4f}\nRecall: {recall_lr:.4f}\nF1: {f1_lr:.4f}')

Logistic regression evaluation:
AUC: 0.9705
Accuracy: 0.9109
Precision: 0.9108
Recall: 0.9109
F1: 0.9109


Confusion matrix

In [59]:
print('Confusion matrix (label vs prediction) for logistic regression:')
lr_pred.groupBy('label', 'prediction').count().orderBy('label', 'prediction').show()

Confusion matrix (label vs prediction) for logistic regression:
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       0.0|  415|
|  0.0|       1.0|   55|
|  1.0|       0.0|   53|
|  1.0|       1.0|  689|
+-----+----------+-----+



Coefficients

In [60]:
lr_coefficients = lr_model.coefficients.toArray().tolist()
print('Logistic regression coefficients:')
for fname, coeff in zip(feature_cols, lr_coefficients):
  print(f'{fname}: {coeff:.6f}')

Logistic regression coefficients:
income_annum: -0.000000
loan_amount: 0.000000
loan_term: -0.160988
cibil_score: 0.024650
self_employed_numeric: 0.140746
education_numeric: 0.092441
loan_to_income_ratio: 0.869761
residential_assets_value: 0.000000
commercial_assets_value: 0.000000
luxury_assets_value: 0.000000
bank_asset_value: 0.000000


## **Predictive modelling - random forest**

Initialize and fit model

In [61]:
rf = RandomForestClassifier(featuresCol='features', labelCol='label', numTrees=100, maxDepth=8, seed=42)
rf_model = rf.fit(train_data)

Predict on test

In [62]:
rf_pred = rf_model.transform(test_data)

Evaluate model with AUC, accuracy, precision, recall, F1


In [63]:
auc_rf = bce.evaluate(rf_pred)
acc_rf = mce_acc.evaluate(rf_pred)
prec_rf = mce_precision.evaluate(rf_pred)
recall_rf = mce_recall.evaluate(rf_pred)
f1_rf = mce_f1.evaluate(rf_pred)

print('Random forest evaluation:')
print(f'AUC: {auc_rf:.4f}\nAccuracy: {acc_rf:.4f}\nPrecision: {prec_rf:.4f}\nRecall: {recall_rf:.4f}\nF1: {f1_rf:.4f}')

Random forest evaluation:
AUC: 0.9985
Accuracy: 0.9810
Precision: 0.9814
Recall: 0.9810
F1: 0.9809


Confusion matrix

In [64]:
print('Confusion matrix (label vs prediction) for random forest:')
rf_pred.groupBy('label', 'prediction').count().orderBy('label', 'prediction').show()

Confusion matrix (label vs prediction) for random forest:
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       0.0|  449|
|  0.0|       1.0|   21|
|  1.0|       0.0|    2|
|  1.0|       1.0|  740|
+-----+----------+-----+



Feature importances

In [65]:
importances = rf_model.featureImportances
fi_list = importances.toArray().tolist()
feat_imp = list(zip(feature_cols, fi_list))
feat_imp_sorted = sorted(feat_imp, key=lambda x: x[1], reverse=True)

print('Random forest feature importances:')
for fname, imp in feat_imp_sorted:
  print(f'{fname}: {imp:.6f}')

Random forest feature importances:
cibil_score: 0.811882
loan_term: 0.078479
loan_to_income_ratio: 0.056578
loan_amount: 0.010232
income_annum: 0.010112
commercial_assets_value: 0.008924
residential_assets_value: 0.007946
luxury_assets_value: 0.006470
bank_asset_value: 0.006134
education_numeric: 0.001632
self_employed_numeric: 0.001611


### **Cross-validation for random forest** (tuning numTrees and maxDepth)

Prepare parameter grid for cross-validation

In [66]:
paramGrid = ParamGridBuilder() \
.addGrid(rf.numTrees, [50, 100]) \
.addGrid(rf.maxDepth, [5, 8]) \
.build()

Initialize cross-validator and fit on training data

In [67]:
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=bce, numFolds=3)
cv_model = cv.fit(train_data)

Evaluate best random forest model on test data

In [68]:
best_rf = cv_model.bestModel

cv_pred = best_rf.transform(test_data)
auc_cv = bce.evaluate(cv_pred)
acc_cv = mce_acc.evaluate(cv_pred)
print(f"Cross-validated RF on test: AUC={auc_cv:.4f}, accuracy={acc_cv:.4f}")


Cross-validated RF on test: AUC=0.9985, accuracy=0.9810


Feature importances for best random forest model

In [69]:
best_fi = best_rf.featureImportances.toArray().tolist()
best_feat_imp = sorted(zip(feature_cols, best_fi), key=lambda x: x[1], reverse=True)
print("Best RF feature importances:")
for fname, imp in best_feat_imp:
  print(f"{fname}: {imp:.6f}")

Best RF feature importances:
cibil_score: 0.811882
loan_term: 0.078479
loan_to_income_ratio: 0.056578
loan_amount: 0.010232
income_annum: 0.010112
commercial_assets_value: 0.008924
residential_assets_value: 0.007946
luxury_assets_value: 0.006470
bank_asset_value: 0.006134
education_numeric: 0.001632
self_employed_numeric: 0.001611


Save best model

In [70]:
best_rf_path = "best_rf_model_spark"
best_rf.write().overwrite().save(best_rf_path)
print("Best RF model saved to:", best_rf_path)

Best RF model saved to: best_rf_model_spark


Compare feature averages by loan status (approved vs rejected)

In [71]:
print("Average feature values by loan status:")
avg_features = df_clean.groupBy("loan_status").agg(
    round(avg("income_annum"), 2).alias("avg_income"),
    round(avg("loan_amount"), 2).alias("avg_loan_amount"),
    round(avg("loan_term"), 2).alias("avg_loan_term"),
    round(avg("cibil_score"), 2).alias("avg_cibil_score"),
    round(avg("loan_to_income_ratio"), 3).alias("avg_loan_to_income_ratio")
)
avg_features.show()

Average feature values by loan status:
+-----------+----------+---------------+-------------+---------------+------------------------+
|loan_status|avg_income|avg_loan_amount|avg_loan_term|avg_cibil_score|avg_loan_to_income_ratio|
+-----------+----------+---------------+-------------+---------------+------------------------+
|   rejected|5113825.17|  1.494606324E7|        11.73|         429.47|                   2.918|
|   approved|5025903.61|  1.524725151E7|         10.4|         703.46|                   3.026|
+-----------+----------+---------------+-------------+---------------+------------------------+



Group applicants by cibil_score range and approval rate

In [72]:
print("Approval rate by cibil_score range:")

score_bins = df_clean.withColumn(
    "cibil_bin",
    when(col("cibil_score") < 500, "<500")
    .when((col("cibil_score") >= 500) & (col("cibil_score") < 650), "500-649")
    .when((col("cibil_score") >= 650) & (col("cibil_score") < 750), "650-749")
    .otherwise("750+")
)

score_bins = score_bins.withColumn(
    "cibil_order",
    when(col("cibil_bin") == "<500", lit(1))
    .when(col("cibil_bin") == "500-649", lit(2))
    .when(col("cibil_bin") == "650-749", lit(3))
    .when(col("cibil_bin") == "750+", lit(4))
)

approval_stats = score_bins.groupBy("cibil_bin", "cibil_order").agg(
    count(when(col("label") == 1.0, 1)).alias("approved"),
    count(when(col("label") == 0.0, 1)).alias("rejected")
).orderBy("cibil_order")

approval_rate = approval_stats.withColumn(
    "approval_rate",
    round(col("approved") / (col("approved") + col("rejected")), 4)
)

approval_rate.show()

Approval rate by cibil_score range:
+---------+-----------+--------+--------+-------------+
|cibil_bin|cibil_order|approved|rejected|approval_rate|
+---------+-----------+--------+--------+-------------+
|     <500|          1|     148|    1252|       0.1057|
|  500-649|          2|     718|     350|       0.6723|
|  650-749|          3|     740|       5|       0.9933|
|     750+|          4|    1050|       6|       0.9943|
+---------+-----------+--------+--------+-------------+



Stop Spark session

In [73]:
spark.stop()