 ## ------------------------------------------------------------
 ## Name       : Ndivhuwo Mphephu
 ## Student No : 27211976
 ## Module     : MIT 805 (Big Dta Processing)
 ## Due Date    : 15 November 2020
 ## ------------------------------------------------------------

## Project Description:

<p style='text-align: justify;'>  This is a project for MIT 805 (Big Data Processing) course at the University of Pretoria. The requirement for this course is to solve a real life big data problem using Big Data frameworks and techniques learnt during the course. Specific task include coding the MapReduce algorithm to extract and summarize your data and also visualising the dataset. The project requires selecting a dataset with size between 1GB and 10GB. Therefore, a dataset of historical loans from Lending Club was selected. This is an open source dataset from lending club with loans recorded from 2007 till the fourth quarter of 2019. The data is available to download on the following website https://www.kaggle.com/denychaen/lending-club-loans-rejects-data. </p>

<p style='text-align: justify;'> The Lending Club is one of the biggest peer-to-peer (P2P) lending platform by market share in the United States of America. The P2P industry has grown significantly since its inception in 2007 prompting more people to have interest in get to this market place. With billions of dollars issued in annual loans, there are significant opportunities to capitalize on this alternative investment option however it remains investor’s responsibility to understand the risks involved in the lending business within this market. This project is aimed at using big data techniques to extract information from massive records of loans issued over the years of platform’s operation. The dataset contains 2,650,550 loans that have been issued where each loan has 150 attributes. The dataset is 1.9GB in size and this makes it hard to process in a machine with low computing power. The main objective of this project is to understanding different characteristics of borrowers in this market through performing visualisation   on various characteristics of borrowers and also apply machine learning models to predict the types of borrowers who are likely to default on their loans.</p>

##### Technology requirements: findSpark 1.2.0, Spark 2.4.7, Spark MLlib, Python, Seaborn and pyplot.

## Set up the Spark environment and Import the required packages and modules

In [1]:
import findspark
findspark.init()

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import isnan, when, count, col, year, quarter, lit, to_date, to_timestamp, concat, avg
from pyspark.sql.types import DateType, TimestampType
from pyspark import SparkContext
from pyspark import SparkConf

sc = SparkContext()  
SparkContext.setSystemProperty('spark.executor.memory', '8g')
SparkContext.setSystemProperty('spark.driver.memory', '8g')

spark_conf = SparkConf().setAll(pairs = [('spark.executor.memory', '8g'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','8g')])
spark = SparkSession.builder.master("local[*]").config(conf = spark_conf).appName("MIT 805 Project").getOrCreate()
sqlContext = SQLContext(spark)
spark.sparkContext.setLogLevel('ERROR')
#sc = spark.sparkContext

# Packages for Machine Learning
import pandas as pd
from pyspark.sql import Row
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorSlicer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, TrainValidationSplit, ParamGridBuilder

import warnings
warnings.filterwarnings('ignore')

%matplotlib notebook   

import datetime
import numpy as np
import pandas as pd
from pandas import DataFrame as df
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(color_codes=True)
from scipy import stats
import chart_studio.plotly as py #import plotly.plotly as py
import plotly.graph_objs as go
from plotly.offline import init_notebook_mode,iplot
#init_notebook_mode(connected=True)

## Load the data into spark environment

In [2]:
#LC_Data = sc.textFile("file:///C:/Users/Ndivhuwo Mphephu/Desktop/Lending Club Data/LendingClub Data (2007-2019Q3)/LendingClubP1.csv")
LC_Loan_part = spark.read.csv("file:///C:/Users/Ndivhuwo Mphephu/Desktop/Lending Club Data/LendingClub Data (2007-2019Q3)/LendingClubP1.csv", header=True, inferSchema=True)
#LC_Loan_part = spark.read.csv("file:///C:/Users/Ndivhuwo Mphephu/Desktop/Lending Club Data/LendingClub Data (2007-2019Q3)/Accepted_Loans_2007-2019Q3.csv", header=True, inferSchema=True)
LC_Loan_part.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- member_id: string (nullable = true)
 |-- loan_amnt: double (nullable = true)
 |-- funded_amnt: double (nullable = true)
 |-- funded_amnt_inv: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- issue_d: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- pymnt_plan: string (nullable = true)
 |-- url: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- title: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- addr_state: strin

### Apply the MapReduce algorithm to check the number of missing vales per column


In [3]:
# print Missing Values
def CheckMissingValues(df):
    df = df.rdd.flatMap(lambda r: r.asDict().items())
    return df.filter(lambda kv: kv[1] is None).mapValues(lambda x: 1).reduceByKey(lambda x,y: x+y).collect()

CheckMissingValues(LC_Loan_part)

[('desc', 249987),
 ('mths_since_last_record', 205730),
 ('verification_status_joint', 249489),
 ('sec_app_revol_util', 250000),
 ('hardship_reason', 248268),
 ('deferral_term', 248268),
 ('hardship_start_date', 248268),
 ('payment_plan_start_date', 248268),
 ('hardship_loan_status', 248268),
 ('hardship_payoff_balance_amount', 248268),
 ('hardship_last_payment_amount', 248268),
 ('settlement_status', 242838),
 ('settlement_amount', 242838),
 ('emp_title', 14767),
 ('mo_sin_old_il_acct', 7124),
 ('open_il_12m', 228628),
 ('open_rv_12m', 228628),
 ('open_rv_24m', 228628),
 ('dti', 2),
 ('num_rev_accts', 1),
 ('debt_settlement_flag', 1),
 ('sec_app_earliest_cr_line', 250000),
 ('sec_app_inq_last_6mths', 250000),
 ('sec_app_open_acc', 250000),
 ('sec_app_num_rev_accts', 250000),
 ('sec_app_collections_12_mths_ex_med', 250000),
 ('sec_app_mths_since_last_major_derog', 250000),
 ('hardship_amount', 248268),
 ('settlement_date', 242838),
 ('settlement_percentage', 242838),
 ('settlement_term

### Find the feature columns which has more than 50% empty data

In [4]:
# Find list of columns which has more than 50% of data missing.
def findMissingValueCols(df):
    missingValueColumns = []
    for column in df.columns:
        nullRows = df.where(col(column).isNull()).count()
        #print(column, "--", nullRows)
        if nullRows > df.count()*0.5 : # i.e. if ALL values are NULL
            missingValueColumns.append(column)
    return missingValueColumns


In [5]:
# Cleaning up the data

######### 1. Removing all the features which has more than 50% of the data empty ##########
missing_cols = missing_cols = findMissingValueCols(LC_Loan_part)
LC_Loan_Filtered = LC_Loan_part.drop(*missing_cols)

######### 2. Removing unique ID and url columns ##########
LC_Loan_Filtered = LC_Loan_Filtered.drop("id", "member_id", "issue_d","url")


######### 3. Removing Other insignificant columns ########## 
# application_type has only INDIVIDUAL, can be removed. 
# pymnt_plan & initial_list_status has only one category "n" & "f". Keeping state feature instead of zip_code.
# removing date fileds as well. policy_code has only one category "1"
LC_Loan_Filtered = LC_Loan_Filtered.drop("emp_title", "title", "zip_code", "earliest_cr_line", "last_pymnt_d", 
                                     "next_pymnt_d", "last_credit_pull_d", "policy_code" )


######### 4. Missing data imputation for tot_cur_bal ########## 
# 90% of the missing data in "tot_cur_bal", "tot_coll_amt" column can be filled with 0 since their loan status is "Fully Paid" OR "Charged Off"
LC_Loan_Filtered = LC_Loan_Filtered.withColumn("tot_cur_bal", when((col("tot_cur_bal").isNull() & 
                                                               col("loan_status").isin("Fully Paid", "Charged Off")), lit("0"))
                                          .otherwise(col("tot_cur_bal")))

LC_Loan_Filtered = LC_Loan_Filtered.withColumn("tot_coll_amt", when((col("tot_coll_amt").isNull() & 
                                                               col("loan_status").isin("Fully Paid", "Charged Off")), lit("0"))
                                          .otherwise(col("tot_coll_amt")))

# Inputing mean value for "total_rev_hi_lim"
mean = int(LC_Loan_Filtered.select(avg("total_rev_hi_lim")).take(1)[0][0])
LC_Loan_Filtered= LC_Loan_Filtered.withColumn("total_rev_hi_lim", when(col("total_rev_hi_lim").isNull(), lit(mean))
                                           .otherwise(col("total_rev_hi_lim")))

######### 5. Removing loan observations which still have missing data. (~ 0.8% records) ##########
#print("Total Loan Observations - ", loanDFForModel.count())
LC_Loan_Filtered= LC_Loan_Filtered.dropna(how="any") 



######### 6. Adding the lable column to dataframe. 1- defalut and 0-paid/current ##########
LC_Loan_Filtered = LC_Loan_Filtered.withColumn("isDefault", when(col("loan_status").isin("Default","Charged Off", "Late (31-120 days)", "Late (16-30 days)", 
                                                                     "Does not meet the credit policy. Status:Charged Off"), 1)
                           .otherwise(0))

######### 7. Changing the feature datatype from string to numeric ########## 
LC_Loan_Filtered = LC_Loan_Filtered.withColumn("loan_amnt",LC_Loan_Filtered["loan_amnt"].cast('float'))
LC_Loan_Filtered = LC_Loan_Filtered.withColumn("funded_amnt",LC_Loan_Filtered["funded_amnt"].cast('float'))
LC_Loan_Filtered = LC_Loan_Filtered.withColumn("funded_amnt_inv",LC_Loan_Filtered["funded_amnt_inv"].cast('float'))
LC_Loan_Filtered = LC_Loan_Filtered.withColumn("int_rate",LC_Loan_Filtered["int_rate"].cast('float'))
LC_Loan_Filtered = LC_Loan_Filtered.withColumn("installment",LC_Loan_Filtered["installment"].cast('float'))
LC_Loan_Filtered = LC_Loan_Filtered.withColumn("annual_inc",LC_Loan_Filtered["annual_inc"].cast('float'))
LC_Loan_Filtered = LC_Loan_Filtered.withColumn("dti",LC_Loan_Filtered["dti"].cast('float'))
LC_Loan_Filtered = LC_Loan_Filtered.withColumn("delinq_2yrs",LC_Loan_Filtered["delinq_2yrs"].cast('float'))
LC_Loan_Filtered = LC_Loan_Filtered.withColumn("inq_last_6mths",LC_Loan_Filtered["inq_last_6mths"].cast('float'))
LC_Loan_Filtered = LC_Loan_Filtered.withColumn("open_acc",LC_Loan_Filtered["open_acc"].cast('float'))
LC_Loan_Filtered = LC_Loan_Filtered.withColumn("pub_rec",LC_Loan_Filtered["pub_rec"].cast('float'))
LC_Loan_Filtered = LC_Loan_Filtered.withColumn("revol_bal",LC_Loan_Filtered["revol_bal"].cast('float'))
LC_Loan_Filtered = LC_Loan_Filtered.withColumn("revol_util",LC_Loan_Filtered["revol_util"].cast('float'))
LC_Loan_Filtered = LC_Loan_Filtered.withColumn("total_acc",LC_Loan_Filtered["total_acc"].cast('float'))

######### 8. Removing the fileds which are related to the current loan ########## 
LC_Loan_Filtered  =LC_Loan_Filtered.drop("out_prncp", "out_prncp_inv", "total_pymnt", "total_pymnt_inv", "total_rec_prncp", 
                                     "total_rec_int", "total_rec_late_fee", "recoveries", "collection_recovery_fee",
                                    "last_pymnt_amnt", "collections_12_mths_ex_med", "acc_now_delinq", "tot_coll_amt",
                                    "tot_cur_bal", "total_rev_hi_lim")

In [10]:
LC_Loan_Filtered.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- loan_amnt: float (nullable = true)
 |-- funded_amnt: float (nullable = true)
 |-- funded_amnt_inv: float (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: float (nullable = true)
 |-- installment: float (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: float (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- pymnt_plan: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: float (nullable = true)
 |-- delinq_2yrs: float (nullable = true)
 |-- fico_range_low: string (nullable = true)
 |-- fico_range_high: double (nullable = true)
 |-- inq_last_6mths: float (nullable = true)
 |-- mths_since_last_delinq: double (nullable = true)
 |-- open_acc: float (

### Count the number of each loan status in the data

In [11]:
# Check the distinct loan statust in the dataset
def NumberOfLoanStatus(df):
    Loan_status = df.select("loan_status").rdd
    Loan_status.map(lambda r: (r['loan_status'], 1))
    return Loan_status.countByKey()

NumberOfLoanStatus(LC_Loan_Filtered) 

defaultdict(int, {'Fully Paid': 71038, 'Charged Off': 19162})

This results shows that the dominating loans statust are `Fully Paid`, `Current` and `Charged-Off`. It is therefore not neccesary to keep all the loan status when building the model. Only these three loan status are worth keeping however the `current` loans are not interesting to investors as they need to know about the completed loans and those that are written-off. Therefore we filter the data to contaain only charged off and paid up loans and encode the statuse with bolean values (`Ture` for charged off and `False` for paid up). 

In [12]:
def FilterLoanStatus(df):
    df = df.filter("loan_status == 'Fully Paid' or loan_status == 'Charged Off' ")
    return NumberOfLoanStatus(df)

FilterLoanStatus(LC_Loan_Filtered)

defaultdict(int, {'Fully Paid': 71038, 'Charged Off': 19162})

In [13]:
def LoanStatusFilter(df):
    # = df.filter("loan_status == 'Fully Paid' or loan_status == 'Charged Off' ")
    return df.filter("loan_status == 'Fully Paid' or loan_status == 'Charged Off' ")

LC_Loan_Filtered = LoanStatusFilter(LC_Loan_Filtered)

### MapReduce algorithm to count the number of appearance of string features in the dataframe

In [14]:
# Mapreduce for counts of columns values in data frame
def ValueCounts(df, col):
    count = df.rdd.map(lambda row: (row[col], 1)).reduceByKey(lambda x,y: x+y).sortBy(lambda x: -x[1]).collect()
    return count

ValueCounts(LC_Loan_Filtered, 'loan_status')

[('Fully Paid', 71038), ('Charged Off', 19162)]

In [18]:
ValueCounts(LC_Loan_Filtered, 'grade')

[('B', 27870),
 ('C', 26491),
 ('A', 13523),
 ('D', 12636),
 ('E', 7065),
 ('F', 2126),
 ('G', 489)]

In [19]:
ValueCounts(LC_Loan_Filtered, 'sub_grade')

[('C1', 6016),
 ('B4', 5726),
 ('B3', 5721),
 ('C2', 5601),
 ('B5', 5562),
 ('B2', 5495),
 ('C3', 5406),
 ('B1', 5366),
 ('C4', 5337),
 ('A5', 4670),
 ('C5', 4131),
 ('D1', 3533),
 ('A4', 2897),
 ('D2', 2601),
 ('D3', 2393),
 ('D4', 2198),
 ('A3', 2023),
 ('A1', 2000),
 ('A2', 1933),
 ('D5', 1911),
 ('E1', 1877),
 ('E2', 1645),
 ('E3', 1465),
 ('E4', 1180),
 ('E5', 898),
 ('F1', 734),
 ('F2', 523),
 ('F3', 363),
 ('F4', 272),
 ('F5', 234),
 ('G1', 166),
 ('G2', 132),
 ('G3', 82),
 ('G4', 64),
 ('G5', 45)]

In [15]:
######### 8. Extarct the feature matrix by droping loan_status ########## 
Features = LC_Loan_Filtered.drop("loan_status")

In [16]:
Features.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- loan_amnt: float (nullable = true)
 |-- funded_amnt: float (nullable = true)
 |-- funded_amnt_inv: float (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: float (nullable = true)
 |-- installment: float (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: float (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- pymnt_plan: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: float (nullable = true)
 |-- delinq_2yrs: float (nullable = true)
 |-- fico_range_low: string (nullable = true)
 |-- fico_range_high: double (nullable = true)
 |-- inq_last_6mths: float (nullable = true)
 |-- mths_since_last_delinq: double (nullable = true)
 |-- open_acc: float (nullable = true)
 |-- pub_rec: float (nulla

### Feature selection using random forest

In [None]:
def replace_row(row, repdict):
    d = row.asDict()
    for k, torep in repdict.items():
        if k in d:
            d[k] = torep[d[k]]
    return Row(**d)

def df_replace(df, repdict):
    return df.rdd.map(lambda r: replace_row(r, repdict))