## Creating Schemas and volume

In [0]:
# Creating Schema creditrisk
spark.sql("""
CREATE SCHEMA IF NOT EXISTS workspace.creditrisk
""")

DataFrame[]

In [0]:
#Creating Volume creditrisk_data in the schema creditrisk
spark.sql("""
CREATE VOLUME IF NOT EXISTS workspace.creditrisk.creditrisk_data
""")

DataFrame[]

## Bronze Layer - Raw Data Ingestion

In [0]:
# Reading the csv file
df = spark.read.option("header", True)\
    .option("inferSchema","True").csv("/Volumes/workspace/creditrisk/creditrisk_data/credit_risk_dataset.csv")


In [0]:
df.printSchema()

root
 |-- person_age: integer (nullable = true)
 |-- person_income: integer (nullable = true)
 |-- person_home_ownership: string (nullable = true)
 |-- person_emp_length: double (nullable = true)
 |-- loan_intent: string (nullable = true)
 |-- loan_grade: string (nullable = true)
 |-- loan_amnt: integer (nullable = true)
 |-- loan_int_rate: double (nullable = true)
 |-- loan_status: integer (nullable = true)
 |-- loan_percent_income: double (nullable = true)
 |-- cb_person_default_on_file: string (nullable = true)
 |-- cb_person_cred_hist_length: integer (nullable = true)



In [0]:
# creating bronze table
df.write.format("delta") \
  .mode("overwrite") \
  .saveAsTable("creditrisk.bronze_creditrisk")

In [0]:
%sql
--- schema comments
COMMENT ON TABLE creditrisk.bronze_creditrisk IS 'Raw ingested data layer';

## Silver Layer Creation

In [0]:
from pyspark.sql.functions import col, sum, when, median, expr


In [0]:
# Checking which columns have null data and how much null data is present
from pyspark.sql.functions import col, sum

null_summary = {
    c: df.filter(col(c).isNull()).count()
    for c in df.columns
    if df.filter(col(c).isNull()).count() > 0
}
print(null_summary)

{'person_emp_length': 895, 'loan_int_rate': 3116}


In [0]:
# Since the column loan_int_rate has null values, we are calculating the median of the int_rate by grouping the loan_grade and loan_intent
#  and replacing the null values with the median obtained.
# Calculating the median of the column loan_int_rate
from pyspark.sql.functions import col, when, median, expr
rate_lookup = \
  spark.table("creditrisk.bronze_creditrisk").filter(col("loan_int_rate").isNotNull())\
      .groupBy("loan_grade", "loan_intent")\
      .agg(expr("percentile_approx(loan_int_rate, 0.5)").alias("median_rate"))
#rate_lookup = rate_lookup.withColumnRenamed("median_rate", "loan_int_rate")
rate_lookup.show()

+----------+-----------------+-----------+
|loan_grade|      loan_intent|median_rate|
+----------+-----------------+-----------+
|         D|         PERSONAL|      15.31|
|         B|        EDUCATION|      10.99|
|         C|          MEDICAL|      13.49|
|         A|          VENTURE|       7.49|
|         B|          MEDICAL|      10.99|
|         A|         PERSONAL|       7.49|
|         D|          VENTURE|      15.31|
|         B|          VENTURE|      10.99|
|         A|  HOMEIMPROVEMENT|       7.49|
|         E|        EDUCATION|      16.77|
|         A|        EDUCATION|       7.49|
|         B|DEBTCONSOLIDATION|      10.99|
|         F|          MEDICAL|      18.43|
|         D|DEBTCONSOLIDATION|      15.33|
|         E|         PERSONAL|      16.82|
|         D|        EDUCATION|      15.31|
|         C|  HOMEIMPROVEMENT|      13.49|
|         E|          VENTURE|       16.7|
|         B|  HOMEIMPROVEMENT|      10.99|
|         C|        EDUCATION|      13.47|
+----------

In [0]:
# Silver Layer – Cleaning & Feature Engineering
# The columns person_age and person_emp_length are cleaned and filtered.(person age above 75 is filtered)
# The column is_emp_length_missing is created to indicate if the person_emp_length is missing.
# The column cb_default_flag is created to indicate if the person has defaulted on a credit before.
# The rate_lookup is joined with the silver table to get the median rate for the loan_grade and loan_intent
from pyspark.sql.functions import col, when, median
df = df.join(rate_lookup, ["loan_grade", "loan_intent"], "left")
df_silver = df\
.filter((col("person_age")<=75)) \
.withColumn(
    "is_emp_length_missing", 
    when(col("person_emp_length").isNull() | (col("person_emp_length") == 0), 1).otherwise(0))\
.fillna({"person_emp_length": 0})\
.filter((col("person_emp_length") <= 60))\
  .withColumn("cb_default_flag",
              when(col("cb_person_default_on_file")=="Y",1).otherwise(0))\
  .withColumn(
    "loan_int_rate",
    when(col("loan_int_rate").isNull(), col("median_rate"))
    .otherwise(col("loan_int_rate"))
).drop("median_rate")

In [0]:
# Checking if there are any null values in the silver table by creating a function named null_summary
def null_summary(df):
    from pyspark.sql.functions import col, sum
    return df.select([
        sum(col(c).isNull().cast("int")).alias(c)
        for c in df.columns
    ])


In [0]:
display(null_summary(df))

loan_grade,loan_intent,person_age,person_income,person_home_ownership,person_emp_length,loan_amnt,loan_int_rate,loan_status,loan_percent_income,cb_person_default_on_file,cb_person_cred_hist_length,median_rate
0,0,0,0,0,895,0,3116,0,0,0,0,0


In [0]:
from pyspark.sql.functions import col, sum

null_summary_silver = {
    c: df_silver.filter(col(c).isNull()).count()
    for c in df_silver.columns
    if df_silver.filter(col(c).isNull()).count() > 0
}
print(null_summary_silver)

{}


In [0]:
%sql
DROP TABLE IF EXISTS creditrisk.silver_creditrisk;

In [0]:
# creating silver table
df_silver.write.format("delta") \
  .mode("overwrite") \
  .saveAsTable("creditrisk.silver_creditrisk")

In [0]:
%sql
--- schema comments
COMMENT ON TABLE creditrisk.silver_creditrisk IS 'Cleaned and transformed data layer';

In [0]:
df_silver.count()

32569

In [0]:
df.count()-df_silver.count()

12

## Gold Layer Creation

In [0]:
df_gold = spark.table("creditrisk.silver_creditrisk") \
    .select(
        "person_age",
        "person_income",
        "person_home_ownership",
        "person_emp_length",
        "loan_intent",
        "loan_grade",
        "loan_amnt",
        "loan_int_rate",
        "loan_percent_income",
        "cb_default_flag",
        "cb_person_cred_hist_length",
        "loan_status"
    )


In [0]:
df_gold.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("creditrisk.gold_creditrisk")


In [0]:
--- From the visualisation it can be observed clearly that the default rate is higher for loan_grade G and loan_intent Debt_consolidation
%sql
SELECT loan_grade,loan_intent,
       COUNT(*) AS total_loans,
       ROUND(AVG(loan_status), 3) AS default_rate
FROM creditrisk.gold_creditrisk
GROUP BY loan_grade,loan_intent
ORDER BY default_rate DESC;


loan_grade,loan_intent,total_loans,default_rate
G,EDUCATION,9,1.0
G,HOMEIMPROVEMENT,9,1.0
G,DEBTCONSOLIDATION,10,1.0
E,DEBTCONSOLIDATION,144,1.0
F,DEBTCONSOLIDATION,43,1.0
G,PERSONAL,10,1.0
G,MEDICAL,12,1.0
F,MEDICAL,52,0.962
E,MEDICAL,167,0.958
G,VENTURE,14,0.929


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
# Gold Layer – Feature Engineering
# The column prior_default_high_risk_flag is created to indicate if the person has defaulted on a credit before and the loan_percent_income is above 0.6
# The column affordability_bucket is created to indicate the affordability of the loan based on loan_percent_income
from pyspark.sql.functions import when, col

gold_analytics = spark.table("creditrisk.gold_creditrisk") \
    .withColumn(
        "affordability_bucket",
        when(col("loan_percent_income") < 0.2, "LOW_RISK")
        .when(col("loan_percent_income") < 0.6, "MEDIUM_RISK")
        .otherwise("HIGH_RISK")
    )\
    .withColumn(
        "prior_default_high_risk_flag",
        when(
            (col("cb_default_flag") == 1) & 
            (col("loan_percent_income") > 0.6),
            1
        ).otherwise(0)
    )


In [0]:
gold_analytics.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("creditrisk.gold_analytics")


In [0]:
gold_ml = spark.table("creditrisk.gold_analytics") \
    .select(
        "person_age",
        "person_income",
        "person_emp_length",
        "loan_amnt",
        "loan_int_rate",
        "loan_percent_income",
        "cb_default_flag",
        "cb_person_cred_hist_length",
        "prior_default_high_risk_flag",
        "loan_status"
    )


In [0]:
# gold_ml table is created for model training
gold_ml.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("creditrisk.gold_ml")

In [0]:
spark.table("creditrisk.gold_ml") \
     .groupBy("prior_default_high_risk_flag") \
     .agg({"loan_status": "avg", "*": "count"}) \
     .display()

prior_default_high_risk_flag,avg(loan_status),count(1)
0,0.2138041932092275,32338
1,0.8354978354978355,231


In [0]:
spark.table("creditrisk.gold_ml").count()


32569

In [0]:
spark.table("creditrisk.gold_ml").describe().display()


summary,person_age,person_income,person_emp_length,loan_amnt,loan_int_rate,loan_percent_income,cb_default_flag,cb_person_cred_hist_length,prior_default_high_risk_flag,loan_status
count,32569.0,32569.0,32569.0,32569.0,32569.0,32569.0,32569.0,32569.0,32569.0,32569.0
mean,27.71003101108416,65879.2051951242,4.649114188338604,9588.221928828028,11.013989683440236,0.1702047959716345,0.1763640271423746,5.80112990880899,0.0070926340999109,0.2182136387362215
stddev,6.167853755405373,52535.0954222557,4.0491183939985484,6320.515714588764,3.212490387678649,0.106758722809599,0.3811354316661953,4.047035688442502,0.0839198717589676,0.4130395680407736
min,20.0,4000.0,0.0,500.0,5.42,0.0,0.0,2.0,0.0,0.0
max,73.0,2039784.0,38.0,35000.0,23.22,0.83,1.0,30.0,1.0,1.0
