# Exploratory Data Analysis & Feature Engineering

## Introduction
This notebook focuses on exploratory data analysis (EDA) and feature engineering for the given dataset.  
The goal is to understand the data structure, identify key patterns and issues, and prepare a clean, well-structured feature set suitable for downstream modeling.

The notebook does **not** include model training or evaluation. All steps are limited to data inspection, preprocessing, and feature construction.

## Scope
The analysis includes:
- Initial data inspection and validation
- Handling missing values and inconsistencies
- Feature transformation and creation
- Preparation of a modeling-ready dataset


In [1]:
import kagglehub
import pandas as pd
import numpy as np
import os
from pprint import pprint
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [2]:
#path = kagglehub.dataset_download("ranadeep/credit-risk-dataset")

In [3]:
# We'll use Spark, our file is too big for Pandas
spark = SparkSession\
    .builder\
    .appName('Spark')\
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/21 22:08:30 WARN Utils: Your hostname, mazer1x-v15xv17xrnx, resolves to a loopback address: 127.0.1.1; using 192.168.0.107 instead (on interface wlp0s20f3)
25/12/21 22:08:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/21 22:08:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df = spark.read.csv('../data/loan/loan.csv',header=True) # our main dataset
columns_desc_pd = pd.read_excel('../data/LCDataDictionary.xlsx')
columns_describe = spark.createDataFrame(columns_desc_pd)

  warn(msg)


## EDA | NULL and NaN fill/drop

### Checking Column Types, Missing Values, and Uniqueness

In this step, we examine the data types of each column, assess the proportion of missing values (NaN), and analyze the sparsity of unique values to understand the distribution and variability in the dataset.


In [5]:
# Let's check the column types
columns = {}
for _,t in df.dtypes:
    if t not in columns: columns[t]=0
    columns[t]+=1
pprint(columns)

{'string': 74}


In [6]:
# And let's look at % NaNs
nan_percentage = df.select([
    (F.round(
        F.sum(
            (F.col(c).isNull() | (F.col(c) == "NaN") | (F.col(c) == "")).cast("int")
        ) / df.count(),
        2
    )* 100).alias(c)
    for c in df.columns
])

                                                                                

In [7]:
rows_dict = (nan_percentage.collect()[0]).asDict()

25/12/21 22:09:11 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [8]:
# Output columns with % NaNs > 10
c = 0
columns_to_drop = []
for k,v in rows_dict.items(): 
    if v > 10: 
        c+=1
        columns_to_drop.append(k)
        print(k,v)

desc 52.0
mths_since_last_delinq 57.99999999999999
mths_since_last_record 89.0
next_pymnt_d 66.0
mths_since_last_major_derog 88.0
annual_inc_joint 100.0
dti_joint 100.0
verification_status_joint 100.0
tot_coll_amt 47.0
tot_cur_bal 47.0
open_acc_6m 100.0
open_il_6m 100.0
open_il_12m 100.0
open_il_24m 100.0
mths_since_rcnt_il 100.0
total_bal_il 100.0
il_util 100.0
open_rv_12m 100.0
open_rv_24m 100.0
max_bal_bc 100.0
all_util 100.0
total_rev_hi_lim 47.0
inq_fi 100.0
total_cu_tl 100.0
inq_last_12m 100.0


In [9]:
print(len(df.columns),c) # Drop columns due to large % of NaNs

74 25


In [10]:
df = df.drop(*columns_to_drop)

In [11]:
# Now we will drop columns with a large % of unique values, this will not help in predictions.

In [12]:
for i in df.columns:
    print(i,df.select(i).distinct().count())
df.select("*").count()

                                                                                

id 91113


                                                                                

member_id 91113
loan_amnt 1095
funded_amnt 1155
funded_amnt_inv 9373
term 2
int_rate 448
installment 25056
grade 8
sub_grade 36
emp_title 53610
emp_length 13
home_ownership 6
annual_inc 9282
verification_status 4
issue_d 60
loan_status 10
pymnt_plan 3
url 91113
purpose 268
title 32200
zip_code 1076
addr_state 269
dti 3697
delinq_2yrs 228
earliest_cr_line 758
inq_last_6mths 219
open_acc 144
pub_rec 110
revol_bal 34069
revol_util 1257
total_acc 226
initial_list_status 157
out_prncp 25128
out_prncp_inv 25330
total_pymnt 86617
total_pymnt_inv 86075
total_rec_prncp 37763
total_rec_int 80117
total_rec_late_fee 2915
recoveries 6805
collection_recovery_fee 5129
last_pymnt_d 188
last_pymnt_amnt 60930
last_credit_pull_d 208
collections_12_mths_ex_med 149
policy_code 77
application_type 55
acc_now_delinq 41


91113

### Columns analysis and casting to 'double'
**Columns marked with `?` require extra verification before dropping:**  
`id`, `member_id`, `emp_title`, `url`, `title(?)`, `zip_code`, `delinq_2yrs(?)`, `inq_last_6mths(?)`, `open_acc(?)`, `initial_list_status(?)`, `issue_d`, `pymnt_plan`

**Post-issue columns** *(contain information generated after loan issuance and should be removed to avoid data leakage):*  
`out_prncp`, `out_prncp_inv`, `total_pymnt`, `total_pymnt_inv`, `total_rec_prncp`, `total_rec_int`, `total_rec_late_fee`,  
`recoveries`, `collection_recovery_fee`, `last_pymnt_d`, `last_pymnt_amnt`, `collections_12_mths_ex_med`, `last_credit_pull_d`

**Note:** We will start by dropping the post-issue columns first.


In [13]:
post_issue_columns = ['out_prncp',
    'out_prncp_inv',
    'total_pymnt',
    'total_pymnt_inv',
    'total_rec_prncp',
    'total_rec_int',
    'total_rec_late_fee',
    'recoveries',
    'collection_recovery_fee',
    'last_pymnt_d',
    'last_pymnt_amnt',
    'collections_12_mths_ex_med',
    'last_credit_pull_d'
] + [
    'id',
    'member_id',
    'emp_title',
    'url',
    'zip_code',
    'issue_d',
    'pymnt_plan'
]

In [14]:
df = df.drop(*post_issue_columns)

In [15]:
# Analysis of "problematic" columns

In [16]:
columns_to_cast = ['delinq_2yrs','inq_last_6mths','open_acc','initial_list_status']

In [17]:
for i in columns_to_cast:
    df1 = df.withColumn(i, F.col(i).try_cast('double'))
df1.select([
    F.count(
        F.when(
            F.col(c).isNull(), 1)
    ).alias(c)
    for c in columns_to_cast
]).show()

+-----------+--------------+--------+-------------------+
|delinq_2yrs|inq_last_6mths|open_acc|initial_list_status|
+-----------+--------------+--------+-------------------+
|         31|            31|     111|              90904|
+-----------+--------------+--------+-------------------+



In [18]:
# So, we can drop initial_list_status and remove columns with NULL-s: delinq_2yrs,inq_last_6mths,open_acc

In [19]:
df = df.drop('initial_list_status')

In [20]:
df = df.dropna() # data with NULL // NaN is less than 1% of the data
df.select([
    F.count(
        F.when(
            F.col(c).isNull(), 1)
    ).alias(c)
    for c in df.columns
]).show(vertical=True)



-RECORD 0------------------
 loan_amnt           | 0   
 funded_amnt         | 0   
 funded_amnt_inv     | 0   
 term                | 0   
 int_rate            | 0   
 installment         | 0   
 grade               | 0   
 sub_grade           | 0   
 emp_length          | 0   
 home_ownership      | 0   
 annual_inc          | 0   
 verification_status | 0   
 loan_status         | 0   
 purpose             | 0   
 title               | 0   
 addr_state          | 0   
 dti                 | 0   
 delinq_2yrs         | 0   
 earliest_cr_line    | 0   
 inq_last_6mths      | 0   
 open_acc            | 0   
 pub_rec             | 0   
 revol_bal           | 0   
 revol_util          | 0   
 total_acc           | 0   
 policy_code         | 0   
 application_type    | 0   
 acc_now_delinq      | 0   



                                                                                

In [21]:
# Now let's convert the data into the formats required for GB

In [22]:
df.show(1,vertical=True)

-RECORD 0--------------------------
 loan_amnt           | 5000.0      
 funded_amnt         | 5000.0      
 funded_amnt_inv     | 4975.0      
 term                |  36 months  
 int_rate            | 10.65       
 installment         | 162.87      
 grade               | B           
 sub_grade           | B2          
 emp_length          | 10+ years   
 home_ownership      | RENT        
 annual_inc          | 24000.0     
 verification_status | Verified    
 loan_status         | Fully Paid  
 purpose             | credit_card 
 title               | Computer    
 addr_state          | AZ          
 dti                 | 27.65       
 delinq_2yrs         | 0.0         
 earliest_cr_line    | Jan-1985    
 inq_last_6mths      | 1.0         
 open_acc            | 3.0         
 pub_rec             | 0.0         
 revol_bal           | 13648.0     
 revol_util          | 83.7        
 total_acc           | 9.0         
 policy_code         | 1.0         
 application_type    | INDIV

In [23]:
double_cols = ['loan_amnt','funded_amnt','funded_amnt_inv','int_rate','installment','annual_inc','dti','delinq_2yrs',
           'inq_last_6mths','open_acc','pub_rec','revol_bal','revol_util','total_acc','policy_code','acc_now_delinq']

In [24]:
for i in double_cols:
    df = df.withColumn(i, F.col(i).try_cast('double'))

# Count the number of NaNs.
df.select([
    F.count(
        F.when(
            F.col(c).isNull(), 1)
    ).alias(c)
    for c in double_cols
]).show(vertical=True)



-RECORD 0--------------
 loan_amnt       | 0   
 funded_amnt     | 0   
 funded_amnt_inv | 0   
 int_rate        | 0   
 installment     | 0   
 annual_inc      | 0   
 dti             | 54  
 delinq_2yrs     | 54  
 inq_last_6mths  | 54  
 open_acc        | 52  
 pub_rec         | 43  
 revol_bal       | 30  
 revol_util      | 41  
 total_acc       | 36  
 policy_code     | 7   
 acc_now_delinq  | 20  



                                                                                

In [25]:
df = df.dropna()

### EDA Summary

- Column types have been analyzed
- Columns with more than 10% missing values have been removed
- Post-issue columns have been excluded to prevent data leakage
- Numerical columns have been cast to type `double`
- Remaining rows with NaN values (<1% of the data) have been removed

## Feature Engineering | String type to numerical

### Inspecting String Columns

First, we identify all columns with string data types to inspect their contents and understand categorical variables.


In [26]:
df.dtypes

[('loan_amnt', 'double'),
 ('funded_amnt', 'double'),
 ('funded_amnt_inv', 'double'),
 ('term', 'string'),
 ('int_rate', 'double'),
 ('installment', 'double'),
 ('grade', 'string'),
 ('sub_grade', 'string'),
 ('emp_length', 'string'),
 ('home_ownership', 'string'),
 ('annual_inc', 'double'),
 ('verification_status', 'string'),
 ('loan_status', 'string'),
 ('purpose', 'string'),
 ('title', 'string'),
 ('addr_state', 'string'),
 ('dti', 'double'),
 ('delinq_2yrs', 'double'),
 ('earliest_cr_line', 'string'),
 ('inq_last_6mths', 'double'),
 ('open_acc', 'double'),
 ('pub_rec', 'double'),
 ('revol_bal', 'double'),
 ('revol_util', 'double'),
 ('total_acc', 'double'),
 ('policy_code', 'double'),
 ('application_type', 'string'),
 ('acc_now_delinq', 'double')]

In [27]:
str_cols = []
for i in df.dtypes:
    if i[1] == 'string': str_cols+=[i[0]]

In [28]:
df.select(*str_cols).show(1,vertical=True)

-RECORD 0--------------------------
 term                |  36 months  
 grade               | B           
 sub_grade           | B2          
 emp_length          | 10+ years   
 home_ownership      | RENT        
 verification_status | Verified    
 loan_status         | Fully Paid  
 purpose             | credit_card 
 title               | Computer    
 addr_state          | AZ          
 earliest_cr_line    | Jan-1985    
 application_type    | INDIVIDUAL  
only showing top 1 row


### Preparing Data for Gradient Boosted Trees (GBT)

We outline the transformations needed for each feature to make the dataset suitable for GBT:

- `emp_length`: ordinal encoding
- `term`: ordinal encoding
- `grade` + `sub_grade`: ordinal encoding (only `sub_grade` will be used)
- `home_ownership`: one-hot encoding
- `verification_status`: ordinal encoding
- `purpose`: one-hot encoding
- `title`: drop (too many unique values)
- `addr_state`: map to regions (West/East/Midwest/South) → one-hot encoding
- `earliest_cr_line`: convert to years as integer
- `application_type`: drop (only one unique value)


### Ordinal Encoding: emp_length

We map employment length to ordinal values and fill missing values (`n/a`) with the median.

In [29]:
emp_length_mapping = {
    "< 1 year": 0,
    "1 year": 1,
    "2 years": 2,
    "3 years": 3,
    "4 years": 4,
    "5 years": 5,
    "6 years": 6,
    "7 years": 7,
    "8 years": 8,
    "9 years": 9,
    "10+ years": 10
}

In [30]:
emp_length_mapping.items()
emp_map = []
for i in emp_length_mapping.items(): emp_map += i
mapping_expr = F.create_map([F.lit(x) for x in emp_map])

In [31]:
df = df.withColumn(
    'emp_length',
    mapping_expr[F.col('emp_length')]
)
df = df.fillna(df.agg(F.median(F.col('emp_length'))).collect()[0][0]) # we have n/a values

                                                                                

### Ordinal Encoding: term

We remove the " months" text and cast the column to numeric type.

In [32]:
df = df.withColumn(
    'term',
    F.regexp_replace(F.col('term'),' months','').try_cast('double')
)

### Ordinal Encoding: sub_grade + grade

We convert sub_grade values into an ordinal numeric scale.  
Grades will be mapped as follows: E5=1, E4=2, …, A1=35.


In [33]:
# grade
count_arr = []
c = 1
for i in ['G','F','E','D','C','B','A']:
    for j in range(1,5+1):
        count_arr+=[f'{i}{j}']+[c]
        c+=1
grade_expr = F.create_map([F.lit(i) for i in count_arr])

df = df.withColumn(
    'un_grade',
    grade_expr[F.col('sub_grade')]
)

### Ordinal Encoding: verification_status

Mapping verification status to ordinal numeric values for model compatibility:
- 'Not Verified' → 1
- 'Source Verified' → 2
- 'Verified' → 3


In [34]:
# verification_status
verification_map = ['Not Verified',1,'Source Verified',2,'Verified',3]
verification_expr = F.create_map([F.lit(i) for i in verification_map]) 
df = df.withColumn(
    'verification_status',
    verification_expr[F.col('verification_status')]
)

### One-Hot Encoding: home_ownership and purpose

We convert categorical string variables into numeric vectors using StringIndexer + OneHotEncoder.

In [35]:
# home_ownership
indexer = StringIndexer(inputCol='home_ownership',outputCol='home_ownership_ind')
df = indexer.fit(df).transform(df)
encoder = OneHotEncoder(inputCol='home_ownership_ind',outputCol='home_ownership_ohe')
df = encoder.fit(df).transform(df)

                                                                                

In [36]:
# purpose
indexer = StringIndexer(inputCol='purpose',outputCol='purpose_ind')
df = indexer.fit(df).transform(df)
encoder = OneHotEncoder(inputCol='purpose_ind',outputCol='purpose_ohe')
df = encoder.fit(df).transform(df)

                                                                                

### Date Transformation: earliest_cr_line

We convert the earliest credit line to a date format and calculate the number of years since the credit line was opened.

In [37]:
# earliest_cr_line
df = df.withColumn(
    "earliest_cr_line_date",
    F.to_date(F.concat(F.lit("01-"), F.col("earliest_cr_line")), "dd-MMM-yyyy")
)
df = df.withColumn(
    "years_since",
    F.round(F.months_between(F.current_date(), "earliest_cr_line_date") / 12, 2)
)

### Mapping States to Regions

We group US states into regions: West, East, Midwest, South, and Other.  
Then we apply one-hot encoding to the `region` feature.


In [38]:
# region
df = df.withColumn(
    "region",
    F.when(F.col("addr_state").isin(
        "WA","OR","CA","NV","ID","MT","WY","UT","CO","AK","HI",'AZ'), "West")
     .when(F.col("addr_state").isin(
        "NY","NJ","PA","MA","CT","RI","NH","VT","ME","DC","MD","DE"), "East")
     .when(F.col("addr_state").isin(
        "IL","IN","MI","OH","WI","IA","KS","MN","MO","NE","ND","SD"), "Midwest")
     .when(F.col("addr_state").isin(
        "TX","FL","GA","AL","MS","LA","OK","AR","TN","SC","NC","KY","VA","WV","NM"), "South")
     .otherwise("Other")
)
indexer = StringIndexer(inputCol='region',outputCol='region_ind')
df = indexer.fit(df).transform(df)
encoder = OneHotEncoder(inputCol='region_ind',outputCol='region_ohe')
df = encoder.fit(df).transform(df)

                                                                                

In [39]:
df = df.drop(*['sub_grade','grade','title','application_type','home_ownership','purpose',
               'earliest_cr_line','earliest_cr_line_date','region','purpose_ind',
               'home_ownership_ind','addr_state','region_ind'])

### Target Variable: loan_status

We will use a **binary classification** for `loan_status` to prevent data leakage.  
Instead of a ternary classification, we focus on distinguishing between two classes.


In [40]:
# Drop rows with 'Does not meet the criteria'
df = df.filter(~F.col('loan_status').startswith("Does not meet the"))

# Create binary target: 1 = paid/current, 0 = default/late
df = df.withColumn(
    'loan_status_bin',
    F.when(F.col('loan_status').isin('Fully Paid', 'Current'), 1).otherwise(0)
)

# Check distribution
df.groupBy('loan_status_bin').count().show()

                                                                                

+---------------+-----+
|loan_status_bin|count|
+---------------+-----+
|              1|76630|
|              0|11419|
+---------------+-----+



In [41]:
df.write \
  .mode("overwrite") \
  .parquet("../data/loans_ml")

25/12/21 22:09:56 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/12/21 22:09:56 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/12/21 22:09:56 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/12/21 22:09:56 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/12/21 22:09:56 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
25/12/21 22:09:56 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 58.46% for 13 writers
25/12/21 22:09:56 WARN MemoryManager: Total allocation exceeds 95.