In [0]:
# Load as Delta table (since workspace.default.use_case_2 is a managed table)
loan_df = spark.table("workspace.default.use_case_2")

print("✅ Dataset loaded successfully!")
print(f"Total Rows: {loan_df.count()}")
print(f"Total Columns: {len(loan_df.columns)}")

display(loan_df.limit(5))


✅ Dataset loaded successfully!
Total Rows: 500
Total Columns: 14


loan_id,loan_amount,term,interest_rate,annual_income,credit_score,employment_length,home_ownership,purpose,dti,delinq_2yrs,revol_util,total_acc,loan_status
1001,17795,60,10.83,93781,829,10,OWN,small_business,21.83,3,67.74,37,0
1002,2860,36,10.87,92750,696,9,OWN,car,28.19,4,56.58,7,1
1003,40158,60,6.45,86262,807,14,OWN,small_business,9.15,2,92.91,39,0
1004,46732,36,13.68,137424,645,0,OWN,small_business,15.11,3,38.73,9,0
1005,13284,36,13.48,66600,841,7,MORTGAGE,debt_consolidation,18.92,0,6.62,31,0


In [0]:
loan_df.printSchema()

# Show summary statistics for numeric columns
display(loan_df.describe())


root
 |-- loan_id: long (nullable = true)
 |-- loan_amount: long (nullable = true)
 |-- term: long (nullable = true)
 |-- interest_rate: double (nullable = true)
 |-- annual_income: long (nullable = true)
 |-- credit_score: long (nullable = true)
 |-- employment_length: long (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- dti: double (nullable = true)
 |-- delinq_2yrs: long (nullable = true)
 |-- revol_util: double (nullable = true)
 |-- total_acc: long (nullable = true)
 |-- loan_status: long (nullable = true)



summary,loan_id,loan_amount,term,interest_rate,annual_income,credit_score,employment_length,home_ownership,purpose,dti,delinq_2yrs,revol_util,total_acc,loan_status
count,500.0,500.0,500.0,500.0,500.0,500.0,500.0,500,500,500.0,500.0,500.0,500.0,500.0
mean,1250.5,25182.84,47.136,13.162299999999997,85356.386,716.988,6.976,,,22.35414000000001,2.648,49.30747999999995,28.204,0.328
stddev,144.4818327679989,14417.506124990145,11.980842423233042,3.980613253475838,37947.28408410441,77.29938062163879,4.190291288462454,,,9.956617534376406,1.7058475405684193,28.976990596292683,12.89258785672165,0.4699550142629211
min,1001.0,2161.0,36.0,6.0,20526.0,580.0,0.0,MORTGAGE,car,5.0,0.0,0.0,5.0,0.0
max,1500.0,49893.0,60.0,19.99,149839.0,849.0,14.0,RENT,small_business,39.9,5.0,99.56,49.0,1.0


In [0]:
from pyspark.sql.functions import col, sum

missing = loan_df.select(
    *[sum(col(c).isNull().cast("int")).alias(c) for c in loan_df.columns]
)
display(missing)


loan_id,loan_amount,term,interest_rate,annual_income,credit_score,employment_length,home_ownership,purpose,dti,delinq_2yrs,revol_util,total_acc,loan_status
0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [0]:
display(
    loan_df.groupBy("loan_status")
    .count()
    .withColumnRenamed("count", "Total Loans")
)


loan_status,Total Loans
0,336
1,164


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import avg

avg_df = (
    loan_df.groupBy("loan_status")
    .agg(
        avg("loan_amount").alias("Avg_Loan_Amount"),
        avg("annual_income").alias("Avg_Income"),
        avg("credit_score").alias("Avg_Credit_Score")
    )
)
display(avg_df)


loan_status,Avg_Loan_Amount,Avg_Income,Avg_Credit_Score
0,25045.354166666668,82862.58035714286,715.452380952381
1,25464.51829268293,90465.64634146342,720.1341463414634


In [0]:
# Home ownership vs default
display(loan_df.groupBy("home_ownership", "loan_status").count().orderBy("home_ownership"))

# Loan purpose vs default
display(loan_df.groupBy("purpose", "loan_status").count().orderBy("purpose"))


home_ownership,loan_status,count
MORTGAGE,1,61
MORTGAGE,0,108
OWN,1,47
OWN,0,107
RENT,1,56
RENT,0,121


Databricks visualization. Run in Databricks to view.

purpose,loan_status,count
car,0,73
car,1,31
credit_card,0,59
credit_card,1,34
debt_consolidation,0,68
debt_consolidation,1,31
home_improvement,0,62
home_improvement,1,38
small_business,0,74
small_business,1,30


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

numeric_cols = [
    "loan_amount", "term", "interest_rate", "annual_income",
    "credit_score", "employment_length", "dti", "delinq_2yrs",
    "revol_util", "total_acc", "loan_status"
]

# Combine numeric features
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
vector_df = assembler.transform(loan_df).select("features")

# Correlation matrix
corr = Correlation.corr(vector_df, "features", "pearson").head()[0].toArray()

import pandas as pd
import numpy as np

corr_matrix = pd.DataFrame(corr, index=numeric_cols, columns=numeric_cols)
display(corr_matrix)


loan_amount,term,interest_rate,annual_income,credit_score,employment_length,dti,delinq_2yrs,revol_util,total_acc,loan_status
1.0,-0.0645876993883177,-0.0154780884621328,-0.0400814623381463,0.0047338362416476,0.0694281488163177,-0.1295440661212241,0.0417685363372584,0.0004489827380607736,-0.0284186014146377,0.0136631315508187
-0.0645876993883177,1.0,0.0004199369368955725,0.0806301738215838,-0.0424927945581817,0.0101244614353936,0.0239332620236059,0.0015626121815671,-0.0044976979159793,0.0388196992854411,0.0248063955346112
-0.0154780884621328,0.0004199369368955725,1.0,-0.0106200675107144,0.0427723594449999,-0.0426949716956689,0.0555281250689845,-0.0183495934686525,0.0224150873375902,0.0315486392486489,-0.0099918169984244
-0.0400814623381463,0.0806301738215838,-0.0106200675107144,1.0,0.0011317015357626,0.0125207782445027,0.0084323083988491,0.0014398423598813,-0.0322522876557637,-0.0477283873479142,0.0941595444665831
0.0047338362416476,-0.0424927945581817,0.0427723594449999,0.0011317015357626,1.0,0.0844329250959651,-0.0035196282925402,0.00028705769993494926,-0.0094588113254268,0.035150405083651,0.0284636060794771
0.0694281488163177,0.0101244614353936,-0.0426949716956689,0.0125207782445027,0.0844329250959651,1.0,-0.0212365464170525,0.0187212751903661,0.0180270190821875,-0.0095167912976374,0.0213055456975492
-0.1295440661212241,0.0239332620236059,0.0555281250689845,0.0084323083988491,-0.0035196282925402,-0.0212365464170525,1.0,-0.0677515438778307,-0.0535180858401016,0.0781163724219485,0.0297447344003102
0.0417685363372584,0.0015626121815671,-0.0183495934686525,0.0014398423598813,0.00028705769993494926,0.0187212751903661,-0.0677515438778307,1.0,0.0368992258447592,0.1113412845852411,0.0818130190947367
0.0004489827380607736,-0.0044976979159793,0.0224150873375902,-0.0322522876557637,-0.0094588113254268,0.0180270190821875,-0.0535180858401016,0.0368992258447592,1.0,0.0206341296543775,0.0291190391012909
-0.0284186014146377,0.0388196992854411,0.0315486392486489,-0.0477283873479142,0.035150405083651,-0.0095167912976374,0.0781163724219485,0.1113412845852411,0.0206341296543775,1.0,0.0213480859951172


In [0]:
from pyspark.sql.functions import when, col

# Create income categories
loan_df_cat = loan_df.withColumn(
    "loan_bracket",
    when(col("loan_amount") < 10000, "Low") \
    .when((col("loan_amount") >= 10000) & (col("loan_amount") < 30000), "Medium") \
    .otherwise("High")
)

display(
    loan_df_cat.groupBy("loan_bracket", "loan_status").count().orderBy("loan_bracket")
)


loan_bracket,loan_status,count
High,1,63
High,0,132
Low,0,71
Low,1,30
Medium,0,133
Medium,1,71


In [0]:
loan_df_inc = loan_df.withColumn(
    "income_bracket",
    when(col("annual_income") < 40000, "Low Income")
    .when((col("annual_income") >= 40000) & (col("annual_income") < 80000), "Mid Income")
    .otherwise("High Income")
)

display(
    loan_df_inc.groupBy("income_bracket", "loan_status").count().orderBy("income_bracket")
)


income_bracket,loan_status,count
High Income,1,100
High Income,0,173
Low Income,1,16
Low Income,0,50
Mid Income,0,113
Mid Income,1,48


In [0]:
display(
    loan_df.groupBy("loan_status")
    .agg(
        avg("interest_rate").alias("Avg_Interest_Rate"),
        avg("credit_score").alias("Avg_Credit_Score"),
        avg("dti").alias("Avg_DTI"),
        avg("revol_util").alias("Avg_Revolving_Utilization")
    )
)


loan_status,Avg_Interest_Rate,Avg_Credit_Score,Avg_DTI,Avg_Revolving_Utilization
0,13.190059523809524,715.452380952381,22.14744047619046,48.71857142857144
1,13.105426829268293,720.1341463414634,22.77762195121953,50.51402439024388


In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# Define categorical and numeric columns
categorical_cols = ["home_ownership", "purpose"]
numeric_cols = [
    "loan_amount", "term", "interest_rate", "annual_income",
    "credit_score", "employment_length", "dti", "delinq_2yrs",
    "revol_util", "total_acc"
]

# Index and encode categorical variables
indexers = [StringIndexer(inputCol=c, outputCol=c + "_index") for c in categorical_cols]
encoders = [OneHotEncoder(inputCols=[c + "_index"], outputCols=[c + "_vec"]) for c in categorical_cols]

# Combine all features into a single vector
assembler = VectorAssembler(
    inputCols=numeric_cols + [c + "_vec" for c in categorical_cols],
    outputCol="features"
)

# Create a pipeline for preprocessing
pipeline = Pipeline(stages=indexers + encoders + [assembler])
processed_df = pipeline.fit(loan_df).transform(loan_df)

# Check the transformed dataset
display(processed_df.select("loan_status", "features").limit(5))


loan_status,features
0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""17795.0"",""60.0"",""10.83"",""93781.0"",""829.0"",""10.0"",""21.83"",""3.0"",""67.74"",""37.0"",""0.0"",""0.0"",""0.0"",""1.0"",""0.0"",""0.0""]}"
1,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""2860.0"",""36.0"",""10.87"",""92750.0"",""696.0"",""9.0"",""28.19"",""4.0"",""56.58"",""7.0"",""0.0"",""0.0"",""1.0"",""0.0"",""0.0"",""0.0""]}"
0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""40158.0"",""60.0"",""6.45"",""86262.0"",""807.0"",""14.0"",""9.15"",""2.0"",""92.91"",""39.0"",""0.0"",""0.0"",""0.0"",""1.0"",""0.0"",""0.0""]}"
0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""46732.0"",""36.0"",""13.68"",""137424.0"",""645.0"",""0.0"",""15.11"",""3.0"",""38.73"",""9.0"",""0.0"",""0.0"",""0.0"",""1.0"",""0.0"",""0.0""]}"
0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""13284.0"",""36.0"",""13.48"",""66600.0"",""841.0"",""7.0"",""18.92"",""0.0"",""6.62"",""31.0"",""0.0"",""1.0"",""0.0"",""0.0"",""0.0"",""1.0""]}"


In [0]:
train_df, test_df = processed_df.randomSplit([0.8, 0.2], seed=42)
print(f"Training Records: {train_df.count()} | Test Records: {test_df.count()}")


Training Records: 426 | Test Records: 74


In [0]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="loan_status", featuresCol="features", maxIter=20)
lr_model = lr.fit(train_df)

lr_predictions = lr_model.transform(test_df)
display(lr_predictions.select("loan_status", "prediction", "probability"))


loan_status,prediction,probability
0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.7243457386887432"",""0.2756542613112568""]}"
1,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.5975686992244155"",""0.40243130077558453""]}"
0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.7272737813695767"",""0.27272621863042334""]}"
0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.5051166299561264"",""0.4948833700438736""]}"
0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.7969455152566023"",""0.2030544847433977""]}"
0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.6496282941951177"",""0.3503717058048823""]}"
1,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.6728722695600436"",""0.32712773043995635""]}"
0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.7427524726002623"",""0.25724752739973766""]}"
1,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.7516965357595067"",""0.24830346424049332""]}"
0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.8047502428004418"",""0.19524975719955817""]}"


In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# AUC
evaluator = BinaryClassificationEvaluator(labelCol="loan_status", rawPredictionCol="rawPrediction")
auc_lr = evaluator.evaluate(lr_predictions)
print(f"✅ Logistic Regression AUC: {auc_lr:.3f}")

# Accuracy, Precision, Recall
accuracy_eval = MulticlassClassificationEvaluator(labelCol="loan_status", metricName="accuracy")
precision_eval = MulticlassClassificationEvaluator(labelCol="loan_status", metricName="weightedPrecision")
recall_eval = MulticlassClassificationEvaluator(labelCol="loan_status", metricName="weightedRecall")

print(f"Accuracy: {accuracy_eval.evaluate(lr_predictions):.3f}")
print(f"Precision: {precision_eval.evaluate(lr_predictions):.3f}")
print(f"Recall: {recall_eval.evaluate(lr_predictions):.3f}")


✅ Logistic Regression AUC: 0.502
Accuracy: 0.649
Precision: 0.535
Recall: 0.649


In [0]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="loan_status", featuresCol="features", numTrees=50, maxDepth=8, seed=42)
rf_model = rf.fit(train_df)
rf_predictions = rf_model.transform(test_df)

# Evaluate Random Forest
auc_rf = evaluator.evaluate(rf_predictions)
print(f"✅ Random Forest AUC: {auc_rf:.3f}")

print(f"Accuracy: {accuracy_eval.evaluate(rf_predictions):.3f}")
print(f"Precision: {precision_eval.evaluate(rf_predictions):.3f}")
print(f"Recall: {recall_eval.evaluate(rf_predictions):.3f}")


✅ Random Forest AUC: 0.486
Accuracy: 0.662
Precision: 0.590
Recall: 0.662


In [0]:
import pandas as pd

feature_imp = list(zip(assembler.getInputCols(), rf_model.featureImportances.toArray()))
feature_imp_df = pd.DataFrame(feature_imp, columns=["Feature", "Importance"]).sort_values(by="Importance", ascending=False)
display(feature_imp_df)


Feature,Importance
loan_amount,0.1246845664084654
revol_util,0.1173526757738784
interest_rate,0.108302755125023
credit_score,0.1057503542872819
dti,0.0974255566861589
total_acc,0.0972075487811039
annual_income,0.0970261814897551
employment_length,0.0928663643982175
delinq_2yrs,0.0562984277687351
term,0.0167976900542668


Databricks visualization. Run in Databricks to view.

In [0]:
# Confusion Matrix for Random Forest
confusion_rf = (
    rf_predictions.groupBy("loan_status", "prediction")
    .count()
    .orderBy("loan_status", "prediction")
)

print("✅ Confusion Matrix (Random Forest)")
display(confusion_rf)


✅ Confusion Matrix (Random Forest)


loan_status,prediction,count
0,0.0,47
0,1.0,3
1,0.0,22
1,1.0,2
