## 1. Import Libraries

In [1]:
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.sql.functions import *

## 2. Load Data and Convert to Spark Data Frame

In [2]:
# load data as dataframe
loan_df = spark.read.csv("loan.csv", header=True)

In [3]:
loan_df.rdd.getNumPartitions()

8

## 3. Create response variable and features
### 3.1 Remove some columns based on EDA results

In [4]:
loan_df2=loan_df.select(
    'loan_status', # response variable
    'grade', # dummy
    'sub_grade', # dummy
    'emp_length',
    'home_ownership', # dummy    
    'purpose',  # dummy
    'verification_status', # dummy
    loan_df.loan_amnt.cast("float"),
    loan_df.funded_amnt.cast("float"),
    loan_df.funded_amnt_inv.cast("float"),
    loan_df.int_rate.cast("float"),
    loan_df.installment.cast("float"),
    loan_df.annual_inc.cast("float"),
    loan_df.dti.cast("float"),
    loan_df.delinq_2yrs.cast("integer"),  
    loan_df.inq_last_6mths.cast("integer"),
    loan_df.open_acc.cast("integer"), 
    loan_df.pub_rec.cast("integer"),
    loan_df.revol_bal.cast("float"), 
    loan_df.revol_util.cast("float"),
    loan_df.total_acc.cast("integer"),
    loan_df.acc_now_delinq.cast("integer"),
    loan_df.tot_coll_amt.cast("float"),
    loan_df.tot_cur_bal.cast("float"),
    loan_df.total_rev_hi_lim.cast("integer")
)

### 3.2 Create response variable and remove rows with no valid response variable 

In [5]:
def whetherpaid(x):
    if x in ['Default', 'Charged Off', 'Does not meet the credit policy. Status:Charged Off']:
        return 1
    elif x in ['Does not meet the credit policy. Status:Fully Paid', 'Fully Paid']:
        return 0
    else:
        return -1
    
paidflag = udf(lambda x: whetherpaid(x))

In [6]:
loan_df3 = loan_df2.withColumn('paid_flag', paidflag('loan_status').
                               cast("integer")).where("paid_flag != -1").drop('loan_status')

In [7]:
loan_df3.printSchema()


root
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- loan_amnt: float (nullable = true)
 |-- funded_amnt: float (nullable = true)
 |-- funded_amnt_inv: float (nullable = true)
 |-- int_rate: float (nullable = true)
 |-- installment: float (nullable = true)
 |-- annual_inc: float (nullable = true)
 |-- dti: float (nullable = true)
 |-- delinq_2yrs: integer (nullable = true)
 |-- inq_last_6mths: integer (nullable = true)
 |-- open_acc: integer (nullable = true)
 |-- pub_rec: integer (nullable = true)
 |-- revol_bal: float (nullable = true)
 |-- revol_util: float (nullable = true)
 |-- total_acc: integer (nullable = true)
 |-- acc_now_delinq: integer (nullable = true)
 |-- tot_coll_amt: float (nullable = true)
 |-- tot_cur_bal: float (nullable = true)
 |-- total_rev_hi_lim: integ

### 3.3 Add more features
#### 3.3.1 Create a numeric feature for "emp_length"

In [8]:
import re
def convert_to_int(s):
    s = re.sub('\\D', '', s)  #remove any non-digital character
    #\d matches any digital, #\D matches any non-digital
    try:
        return s
    except ValueError:
        return None

emp_to_num = udf(convert_to_int)
loan_df4 = loan_df3.withColumn('emp_len',emp_to_num('emp_length').cast('integer')).drop('emp_length')

#### 3.3.2 Regroup home_ownership

In [9]:
def home_ownership_func(x):
    if x in ['ANY','OTHER','NONE']:
        return 'Other'
    else: 
        return x

home_ownership = udf(home_ownership_func)  #fixed a function
loan_df5 = loan_df4.withColumn('home_ownership',home_ownership('home_ownership'))

In [10]:
loan_df5.select('home_ownership').groupBy('home_ownership').count().show()

+--------------+------+
|home_ownership| count|
+--------------+------+
|           OWN| 22282|
|         Other|   228|
|          RENT|107831|
|      MORTGAGE|126598|
+--------------+------+



#### 3.3.3 Add loan_inc_ratio and installment_inc_ratio as new features

In [11]:
def calculate_ratio(a, b):
    try:
        return a/float(b)
    except TypeError:
        return None
    except ZeroDivisionError:
        return None

def calculate_monthly_ratio(a, b):
    try:
        b = float(b) / 12
        return calculate_ratio(a, b)
    except TypeError:
        return None

# User-defined functions to compute ratios
ratio = udf(calculate_ratio) # compute loan amount / annual income
monthly_ratio = udf(calculate_monthly_ratio) # compute installment / monthly income

In [12]:
loan_df6 = loan_df5.withColumn('loan_inc_ratio',ratio('loan_amnt','annual_inc').cast('float'))
loan_df6 = loan_df6.withColumn('instal_inc_ratio',monthly_ratio('installment','annual_inc').cast('float'))

#### 3.3.6 Create dummy variables
* Use StringIndexer to encode a string column of labels to a column of label indices, and most frequent label gets index 0.

* Use OneHotEncoder to convert indices into dummy vectors

In [13]:
convert_list = [      
               'purpose',
               'grade',
               'sub_grade',
               'verification_status',
               'home_ownership',
                ]

for item in convert_list:
    indexer = StringIndexer(inputCol=item, outputCol=item + 'Index')    
    loan_df6 = indexer.fit(loan_df6).transform(loan_df6).drop(item)
    onehotenc = OneHotEncoder(inputCol=item + 'Index', outputCol=item + "-onehot", dropLast=False)
    loan_df6 = onehotenc.transform(loan_df6).drop(item +'Index')
    print item + ' converted'

purpose converted
grade converted
sub_grade converted
verification_status converted
home_ownership converted


In [14]:
loan_df6 = loan_df6.fillna(0.0, ['tot_coll_amt','tot_cur_bal', 'total_rev_hi_lim'])
loan_df6 = loan_df6.dropna()

In [15]:
loan_df_final = loan_df6.select(
    'paid_flag',
    'loan_amnt',
    'funded_amnt',
    'funded_amnt_inv',
    'int_rate',
    'installment',
    'annual_inc',
    'dti',
    'delinq_2yrs',
    'inq_last_6mths',
    'open_acc',
    'pub_rec',
    'revol_bal',
    'revol_util',
    'total_acc',
    'acc_now_delinq',
    'tot_coll_amt',
    'tot_cur_bal',
    'total_rev_hi_lim',
    'emp_len',
    'loan_inc_ratio',
    'instal_inc_ratio',
    'purpose-onehot',
    'grade-onehot',
    'sub_grade-onehot',
    'verification_status-onehot',
    'home_ownership-onehot'
)

## 4. Modeling

In [37]:
# import ML packages
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler

### 4.1 Splitting and Formatting Data

In [17]:
# create the final dataset
loan_df_final = loan_df_final.withColumnRenamed('paid_flag','label')

In [18]:
va = VectorAssembler(outputCol='features',inputCols=loan_df_final.columns[1:])
data = va.transform(loan_df_final).select('features','label')

In [19]:
# Creating training and test sets by random split 0.8,0.2
data_sets=data.randomSplit([0.8,0.2])

data_train=data_sets[0].cache()
data_test=data_sets[1].cache()

In [20]:
print 'training dataset fraud propotition:', float(data_train.filter('label=1').count())/float((data_train.filter('label=1').count() + data_train.filter('label=0').count()))
print 'test dataset fraud propotition:', float(data_test.filter('label=1').count())/float((data_test.filter('label=1').count() + data_test.filter('label=0').count()))

data_train_paid = data_train.filter(data_train.label == 0)
data_train_not_paid = data_train.filter(data_train.label == 1)

num_samples_train = data_train.count()
half_num_samples_train = num_samples_train / 2

data_train_paid = data_train_paid.sample(True, float(half_num_samples_train) / float(data_train_paid.count())) 
data_train_not_paid = data_train_not_paid.sample(True, float(half_num_samples_train) / float(data_train_not_paid.count()))

training dataset fraud propotition: 0.180456353232
test dataset fraud propotition: 0.181495671917


In [21]:
# union two random sample sets together, paid & unpaid
data_train = data_train_paid.union(data_train_not_paid)  

# total observations remain close enough to the original dataset
data_train.count() 

197091

### 4.2 Random Forest

In [22]:
# Fit random forest
rf = RandomForestClassifier()
evaluator_rf = BinaryClassificationEvaluator()
cv = CrossValidator().setEstimator(rf).setEvaluator(evaluator_rf).setNumFolds(5)
paramGrid = ParamGridBuilder().addGrid(rf.numTrees,[31]).addGrid(rf.maxDepth,[5,15]).build()

cv.setEstimatorParamMaps(paramGrid)
cvmodel_rf = cv.fit(data_train)

In [23]:
# Make predictions
predictions_rf = cvmodel_rf.bestModel.transform(data_test)
prediction_list_rf = predictions_rf.collect()

In [24]:
# create a list of tuples as (prediction, label)
predictionAndLabels_rf = []
for item in prediction_list_rf:
    predictionAndLabels_rf.append((float(item[4]), float(item[1])))

In [25]:
score=evaluator_rf.evaluate(predictions_rf)
print evaluator_rf.getMetricName()+":"+str(score)

areaUnderROC:0.698704209361


In [26]:
metrics_rf = MulticlassMetrics(sc.parallelize(predictionAndLabels_rf))
print metrics_rf.confusionMatrix().toArray()
print metrics_rf.precision(1)
print metrics_rf.recall(1)
print metrics_rf.accuracy

[[ 25856.  14520.]
 [  3168.   5785.]]
0.284905195765
0.646152127778
0.641427963267


In [34]:
rfmodel=cvmodel_rf.bestModel

pyspark.ml.classification.RandomForestClassificationModel

In [35]:
importance_list = list(enumerate(rfmodel.featureImportances))
importance_list.sort(key=lambda tup: -tup[1])
top_importances = importance_list[:15]
top_importances

[(3, 0.17016971772952169),
 (19, 0.070145635212076132),
 (288, 0.066363043067363459),
 (20, 0.06182702391140641),
 (6, 0.050874038220614787),
 (12, 0.041545190442791008),
 (5, 0.038451809983694456),
 (286, 0.028610698537262409),
 (16, 0.023086853530975979),
 (4, 0.021965160807738617),
 (13, 0.021719786779805569),
 (291, 0.021054837479358976),
 (11, 0.020758402764765083),
 (1, 0.020715153617940719),
 (0, 0.020619211375286568)]

### 4.3 Logistic Regression

In [38]:
lr = LogisticRegression()
evaluator = BinaryClassificationEvaluator()

cv = CrossValidator().setEstimator(lr).setEvaluator(evaluator).setNumFolds(5)
paramGrid = ParamGridBuilder().addGrid(lr.maxIter,[100]).addGrid(lr.regParam,[0.1,0.01,0.001,1]).build()

cv.setEstimatorParamMaps(paramGrid)
cvmodel = cv.fit(data_train)

In [64]:
#cvmodel.bestModel.coefficients

In [41]:
predictions = cvmodel.bestModel.transform(data_test)
score = evaluator.evaluate(cvmodel.bestModel.transform(data_test))
print evaluator.getMetricName()+":"+str(score)

areaUnderROC:0.706364985953


In [42]:
prediction_list = predictions.collect()
predictionAndLabels = []
for item in prediction_list:
    predictionAndLabels.append((float(item[4]), float(item[1])))

In [43]:
metrics = MulticlassMetrics(sc.parallelize(predictionAndLabels))
print metrics.confusionMatrix().toArray()
print metrics.precision(1)
print metrics.recall(1)
print metrics.accuracy

[[ 25876.  14500.]
 [  3066.   5887.]]
0.288762446657
0.657544956998
0.64390115348
