In [1]:
# ปรับแต่งค่าการทำงานของ Spark
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("ChiSqSelector:Gender vs. Route").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "1000m").\
        config("spark.executor.cores", "2").\
        config("spark.cores.max", "6").\
        getOrCreate()

23/06/25 03:18:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
from pyspark.sql import functions as sparkf
from pyspark.sql.types import *

## 1. Business Understaing
    
    1.1 Problem Statement: ต้องการทราบว่า ลูกหนี้แต่ละรายจะมาจ่ายหนี้ครบตามสัญญาเงินกู้ (Fully-paid) หรือไม่มาจ่ายฯ (Charged-off)
    1.2 Project Objective: การจัดเก็บหนี้ดีขึ้นช่วยเพิ่มรายได้ให้กับกิจการ
    1.3 Task of Data Science: Binary Classification
    1.4 Cleansing Policy: ธุรกิจมี columns ที่แนะนำว่ามีความสัมพันธ์/ส่งผลต่อการชำระหนี้คืนตามสัญญา, ลบได้ทั้ง row หากมี missing ใน columns และแทนที่ได้ตามความเหมาะสม
    1.5 Success Criteria: มี Recall/Sensitivity ไม่น้อยกว่า 0.65 บน Testing set แบบ Hold-out

## 2. Data Understanding
    
    - มี Label เป็น column: loan_status
    - มีขนาดใหญ่ (volume)เกินกว่าเทคโนโลยีปัจจุบัน (Python) จะทำงานได้อย่างมีประสิทธิภาพ จึงต้องใช้ Spark ร่วมด้วย
    - CSV เป็น semi-structural data ที่มี header ซึ่งสามารถนำไปพัฒนาเป็น schema ของ structural data (Spark DataFrame) ได้
    - Data Dict.: https://docs.google.com/spreadsheets/d/1qtZBSJ-JS7S2tGC0W9Yxp992LmrDaAwGcJB419Htbbw/edit#gid=1163295822

In [3]:
#กำหนด columns ที่ธุรกิจให้คำแนะนำฯ ไว้

businessAttrs_df = ["loan_amnt","term","int_rate"\
                                ,"installment","grade","emp_length",\
                           "home_ownership","annual_inc"\
                                ,"verification_status","loan_status",\
                           "purpose","addr_state","dti","delinq_2yrs"\
                                ,"earliest_cr_line",\
                           "open_acc","pub_rec"\
                                ,"revol_bal","revol_util","total_acc","issue_d",'acc_now_delinq']

In [4]:
#! apt-get install wget -y

In [5]:
#! wget https://storage.googleapis.com/ntclass/LoanStats_web.csv

In [6]:
# Spark อ่านข้อมูลจาก .csv แล้ว convert เป็น DataFrame

raw_df = spark.read.option('header',True)\
.option("quote", "\"")\
.option('mode','DROPMALFORMED')\
.option('inferSchema',True)\
.csv('LoanStats_web.csv')\
.select(businessAttrs_df)

                                                                                

### 2.1 Univariate Analysis

### 2.2 Bivariate/Multivariate Analysis

# 3. Data Preparation

#### 3.1 Cleansing and Transformation - Batch 1

In [7]:
#### ใหม่ ณ 16 JAN 2023
# ใช้ Spark ปรับแต่งค่าใน column: annual_inc, loan_status, revol_util, int_rate และ issue_d
# ใช้ Spark สร้าง column ใหม่ขึ้นมา คือ calculatedDTI

crunched_df = raw_df\
.filter(sparkf.col('annual_inc') != 0)\
.filter((sparkf.col('loan_status') == 'Fully Paid')|(sparkf.col('loan_status') == 'Charged Off'))\
.withColumn('calculatedDTI',sparkf.col('loan_amnt')/sparkf.col('annual_inc'))\
.withColumn('revol_util',sparkf.when(sparkf.col('revol_util').isNotNull(),sparkf.regexp_replace(sparkf.col('revol_util'),'%',''))\
                  .otherwise(sparkf.col('revol_util')))\
.filter(sparkf.col('revol_util') < 120)\
.withColumn('int_rate',sparkf.when(sparkf.col('int_rate').isNotNull(),sparkf.regexp_replace(sparkf.col('int_rate'),'%',''))\
                  .otherwise(sparkf.col('int_rate')))\
.withColumn('issue_d',sparkf.regexp_extract(sparkf.col('issue_d'),'\d+',0))\
.withColumn('revol_util',sparkf.col('revol_util').cast(IntegerType()))\
.withColumn('int_rate',sparkf.col('int_rate').cast(FloatType()))\
.withColumn('issue_d',sparkf.col('issue_d').cast(IntegerType()))\
.withColumn('delinq_2yrs',sparkf.col('delinq_2yrs').cast(FloatType()))\
.dropna()

In [8]:
crunched_df.describe().toPandas().transpose()

                                                                                

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
loan_amnt,698342,14499.922351798976,9223.066319877242,1000,40000
term,698342,,,36 months,60 months
int_rate,698342,13.231893765288989,5.174107541702221,5.31,30.99
installment,698342,446.6912345097158,282.599606452106,30.12,1715.42
grade,698342,,,A,G
emp_length,698342,,,1 year,
home_ownership,698342,,,ANY,RENT
annual_inc,698342,79530.96412655976,78144.23143077889,23.0,9573072.0
verification_status,698342,,,Not Verified,Verified


In [9]:
from pyspark.ml.feature import VectorAssembler, ChiSqSelector, StringIndexer

#### 3.2 Cleansing and Transformation - Batch 2

In [10]:
# Spark สร้าง ABT - sensivity: 0.6932773109243697

final_df = crunched_df\
.withColumn('term',sparkf.when(sparkf.col('term').isNotNull(),sparkf.regexp_replace(sparkf.col('term'),' months','')).otherwise(sparkf.col('term')))\
.withColumn('term',sparkf.col('term').cast(IntegerType()))\
.filter(sparkf.col('loan_amnt') < 40000.0)\
.filter(sparkf.col('annual_inc')<150000)\
.filter(((sparkf.col('int_rate')<22)&(sparkf.col('loan_status')=='Fully Paid'))\
        |((sparkf.col('loan_status')=='Charged Off')&(sparkf.col('int_rate')<28)))\
.withColumn('is_debtConsolidation', sparkf.when(sparkf.col('purpose')=='debt_consolidation',1).otherwise(0))

# 4. Modeling & 5. Model Evaluation

In [11]:
# Spark ทำ Hold-out Tesing set

candidate_training_df = final_df.filter(sparkf.col('issue_d') < 2019)
testing_df = final_df.filter(sparkf.col('issue_d') >= 2019)

In [12]:
candidate_training_df.groupBy('loan_status').count().show()

23/06/25 03:19:36 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-----------+------+
|loan_status| count|
+-----------+------+
| Fully Paid|470712|
|Charged Off|131831|
+-----------+------+



                                                                                

In [13]:
# Spark ทำการสุ่มเลือกข้อมูลเฉพาะจาก training set เพื่อลดจำนวน Fully Paid

FullyPaid_training_df = candidate_training_df.filter(sparkf.col('loan_status') == 'Fully Paid').sample(0.30)

ChargedOff_training_df = candidate_training_df.filter(sparkf.col('loan_status') == 'Charged Off')

FullyPaid_training_num = FullyPaid_training_df.count()

ChargedOff_training_num = ChargedOff_training_df.count()

training_df = FullyPaid_training_df.union(ChargedOff_training_df)

                                                                                

In [14]:
# Spark ทำสรุปจำนวน row ของแต่ละ class ที่มีความ Balance Class มากขึ้น
training_df.groupBy('loan_status').count().show()

                                                                                

+-----------+------+
|loan_status| count|
+-----------+------+
| Fully Paid|141169|
|Charged Off|131831|
+-----------+------+



In [15]:
training_df.describe().toPandas().transpose()

                                                                                

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
loan_amnt,273000,13853.784706959706,8441.588510843314,1000,39925
term,273000,41.456967032967036,10.05927881352971,36,60
int_rate,273000,13.616009426151876,4.70093571023568,5.31,27.99
installment,273000,426.647873516492,259.4131935256982,30.12,1506.65
grade,273000,,,A,G
emp_length,273000,,,1 year,
home_ownership,273000,,,ANY,RENT
annual_inc,273000,66914.06534054944,28968.962744565815,25.0,149999.0
verification_status,273000,,,Not Verified,Verified


In [16]:
testing_df.describe().toPandas().transpose()

                                                                                

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
loan_amnt,6716,13053.867629541393,8579.982755367357,1000,38975
term,6716,41.710541989279335,10.220482942364024,36,60
int_rate,6716,12.78878348380629,4.202868875110118,6.46,27.27
installment,6716,388.8376280524116,246.2480716289674,30.64,1336.43
grade,6716,,,A,E
emp_length,6716,,,1 year,
home_ownership,6716,,,ANY,RENT
annual_inc,6716,70493.97817301966,30007.79471769585,250.0,149925.0
verification_status,6716,,,Not Verified,Verified


#### Feature Transformation (Encoding)

In [17]:
numeric_candidateFeatures_list = ['int_rate','term','loan_amnt','annual_inc']
#numeric_candidateFeatures_list = ['int_rate','installment','loan_amnt','annual_inc','revol_util','delinq_2yrs','is_debtConsolidation','term']

In [18]:
cat_candidateFeatures_list = ['grade','home_ownership','verification_status','addr_state']
#cat_candidateFeatures_list = []

In [19]:
from pyspark.ml.feature import *
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.classification import *

In [20]:
label_Indexer = StringIndexer(inputCol='loan_status',outputCol='label')

In [21]:
cat_Indexer = StringIndexer(inputCols=cat_candidateFeatures_list,outputCols=['grade_index','home_ownership_index'\
                                                                             ,'verification_status_index','addr_state_index'])
cat_OneHot  = OneHotEncoder(inputCols=['grade_index','home_ownership_index'\
                                                                             ,'verification_status_index','addr_state_index']\
                            ,outputCols=['grade_binarized','home_ownership_binarized'\
                                         ,'verification_status_binarized','addr_state_binarized'])

In [22]:
#feature_list = numeric_candidateFeatures_list+['grade_binarized','home_ownership_binarized','verification_status_binarized','addr_state_binarized','cat_delinq_2yrs_binarized']
feature_list = numeric_candidateFeatures_list

In [23]:
feature_list

['int_rate', 'term', 'loan_amnt', 'annual_inc']

In [24]:
feature_Assembler = VectorAssembler(inputCols=feature_list,outputCol='features')

In [25]:
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

#### Algorithm: Decision Tree Classifier

In [26]:
algo = DecisionTreeClassifier(featuresCol='features',labelCol='label')

In [27]:
pipeline_transformation = Pipeline().setStages([cat_Indexer,cat_OneHot,feature_Assembler,label_Indexer,algo])

In [28]:
model = pipeline_transformation.fit(training_df)

                                                                                

In [29]:
result_df = model.transform(testing_df)

In [30]:
result_df.select('loan_status','label','prediction').filter(sparkf.col('label') == 1)\
.groupBy('loan_status','label','prediction').count().show()

                                                                                

+-----------+-----+----------+-----+
|loan_status|label|prediction|count|
+-----------+-----+----------+-----+
|Charged Off|  1.0|       1.0|  334|
|Charged Off|  1.0|       0.0|  142|
+-----------+-----+----------+-----+



In [31]:
TP = result_df.select('loan_status','label','prediction')\
.filter((sparkf.col('label') == 1)&(sparkf.col('prediction') == 1))\
.groupBy('loan_status','label','prediction').count().take(1)[0]['count']

                                                                                

In [32]:
ALL_POSITIVE = result_df.select('loan_status','label','prediction').filter((sparkf.col('label') == 1)).count()

                                                                                

In [33]:
ALL_NEGATIVE = result_df.select('loan_status','label','prediction').filter((sparkf.col('label') == 0)).count()

                                                                                

In [34]:
ALL_POSITIVE+ALL_NEGATIVE

6716

In [35]:
result_df.count()

                                                                                

6716

In [36]:
SENSITIVITY = TP/ALL_POSITIVE

In [37]:
SENSITIVITY

0.7016806722689075

In [38]:
TN = result_df.select('loan_status','label','prediction')\
.filter((sparkf.col('label') == 0)&(sparkf.col('prediction') == 0))\
.groupBy('loan_status','label','prediction').count().take(1)[0]['count']

                                                                                

In [39]:
SPECIFICITY = TN/ALL_NEGATIVE

In [40]:
SPECIFICITY

0.5376602564102564

In [41]:
import pandas as pd

In [42]:
featureImportances = model.stages[4].featureImportances.toArray()

In [43]:
ExtractFeatureImp(featureImportances, result_df, "features").head(30)

Unnamed: 0,idx,name,score
0,0,int_rate,0.977885
1,1,term,0.016704
2,2,loan_amnt,0.004788
3,3,annual_inc,0.000624
