In [0]:
df = spark.table("finance.silver_credit_risk")
display(df)

person_age,person_income,person_home_ownership,person_emp_length,loan_intent,loan_grade,loan_amnt,loan_int_rate,loan_percent_income,loan_status
22,59000,RENT,123.0,PERSONAL,D,35000,16.02,0.59,1
21,9600,OWN,5.0,EDUCATION,B,1000,11.14,0.1,0
25,9600,MORTGAGE,1.0,MEDICAL,C,5500,12.87,0.57,1
23,65500,RENT,4.0,MEDICAL,C,35000,15.23,0.53,1
24,54400,RENT,8.0,MEDICAL,C,35000,14.27,0.55,1
21,9900,OWN,2.0,VENTURE,A,2500,7.14,0.25,1
26,77100,RENT,8.0,EDUCATION,B,35000,12.42,0.45,1
24,78956,RENT,5.0,MEDICAL,B,35000,11.11,0.44,1
24,83000,RENT,8.0,PERSONAL,A,35000,8.9,0.42,1
21,10000,OWN,6.0,VENTURE,D,1600,14.74,0.16,1


In [0]:
categorical_cols = [
    "person_home_ownership",
    "loan_intent",
    "loan_grade"
]

numeric_cols = [
    "person_age",
    "person_income",
    "person_emp_length",
    "loan_amnt",
    "loan_int_rate",
    "loan_percent_income"
]

In [0]:
df_clean = df.dropna(subset=categorical_cols + numeric_cols + ["loan_status"])

In [0]:
from pyspark.ml.feature import StringIndexer

indexers = [
    StringIndexer(
        inputCol=c,
        outputCol=f"{c}_idx",
        handleInvalid="keep"
    )
    for c in categorical_cols
]

In [0]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=[f"{c}_idx" for c in categorical_cols] + numeric_cols,
    outputCol="features"
)

In [0]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=indexers + [assembler])

df_ml = pipeline.fit(df_clean).transform(df_clean)

[0;31m---------------------------------------------------------------------------[0m
[0;31mSparkException[0m                            Traceback (most recent call last)
File [0;32m<command-8520161078220026>, line 5[0m
[1;32m      1[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01mml[39;00m [38;5;28;01mimport[39;00m Pipeline
[1;32m      3[0m pipeline [38;5;241m=[39m Pipeline(stages[38;5;241m=[39mindexers [38;5;241m+[39m [assembler])
[0;32m----> 5[0m df_ml [38;5;241m=[39m pipeline[38;5;241m.[39mfit(df_clean)[38;5;241m.[39mtransform(df_clean)

File [0;32m/databricks/python_shell/lib/dbruntime/MLWorkloadsInstrumentation/_pyspark.py:30[0m, in [0;36m_create_patch_function.<locals>.patched_method[0;34m(self, *args, **kwargs)[0m
[1;32m     28[0m call_succeeded [38;5;241m=[39m [38;5;28;01mFalse[39;00m
[1;32m     29[0m [38;5;28;01mtry[39;00m:
[0;32m---> 30[0m     result [38;5;241m=[39m original_method([38;

In [0]:
df_ml = df_ml.withColumnRenamed("loan_status", "label")

In [0]:
%sql
DROP TABLE IF EXISTS finance.ml_ready_credit_risk;

In [0]:
df_ml.printSchema()


root
 |-- person_age: integer (nullable = true)
 |-- person_income: integer (nullable = true)
 |-- person_home_ownership: string (nullable = true)
 |-- person_emp_length: double (nullable = true)
 |-- loan_intent: string (nullable = true)
 |-- loan_grade: string (nullable = true)
 |-- loan_amnt: integer (nullable = true)
 |-- loan_int_rate: double (nullable = true)
 |-- loan_percent_income: double (nullable = true)
 |-- label: integer (nullable = true)
 |-- person_home_ownership_idx: double (nullable = false)
 |-- loan_intent_idx: double (nullable = false)
 |-- loan_grade_idx: double (nullable = false)
 |-- features: vectorudt (nullable = true)



In [0]:
(
df_ml
.select("features", "label")
.write
.format("delta")
.mode("overwrite")
.saveAsTable("finance.ml_ready_credit_risk")
)

In [0]:
df = spark.table("finance.ml_ready_credit_risk")
df.printSchema()

root
 |-- features: vectorudt (nullable = true)
 |-- label: integer (nullable = true)



# Train Model

In [0]:
train_df, test_df = df_ml.randomSplit([0.7, 0.3], seed=42)

In [0]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    maxIter=20,
    regParam=0.01
)

lr_model = lr.fit(train_df)

In [0]:
predictions = lr_model.transform(test_df)

predictions.select(
    "label",
    "probability",
    "prediction"
).show(5, truncate=False)

+-----+----------------------------------------+----------+
|label|probability                             |prediction|
+-----+----------------------------------------+----------+
|0    |[0.8407178833246529,0.15928211667534709]|0.0       |
|0    |[0.7618081535678236,0.2381918464321764] |0.0       |
|0    |[0.8222459697021831,0.1777540302978169] |0.0       |
|0    |[0.9148709105484696,0.08512908945153042]|0.0       |
|0    |[0.9597279947592466,0.04027200524075336]|0.0       |
+-----+----------------------------------------+----------+
only showing top 5 rows


In [0]:
predictions = lr_model.transform(test_df)

predictions.select(
    "label",
    "probability",
    "prediction"
).show(5, truncate=False)

+-----+----------------------------------------+----------+
|label|probability                             |prediction|
+-----+----------------------------------------+----------+
|0    |[0.8407178833246529,0.15928211667534709]|0.0       |
|0    |[0.7618081535678236,0.2381918464321764] |0.0       |
|0    |[0.8222459697021831,0.1777540302978169] |0.0       |
|0    |[0.9148709105484696,0.08512908945153042]|0.0       |
|0    |[0.9597279947592466,0.04027200524075336]|0.0       |
+-----+----------------------------------------+----------+
only showing top 5 rows


In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    metricName="areaUnderROC"
)

auc = evaluator.evaluate(predictions)
auc

0.8584189755334034

In [0]:
import mlflow
import mlflow.spark

In [0]:
lr_model.coefficients

DenseVector([-0.7827, 0.1276, 0.6467, -0.0032, -0.0, -0.0107, -0.0, 0.0656, 9.2119])

In [0]:
lr_model.intercept

-3.9279857292922467

In [0]:
predictions.groupBy("prediction").count().show()
predictions.groupBy("label").count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0| 7503|
|       1.0| 1131|
+----------+-----+

+-----+-----+
|label|count|
+-----+-----+
|    1| 1809|
|    0| 6825|
+-----+-----+

