In [72]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

In [73]:
spark = SparkSession.builder.appName('loan_prediction').getOrCreate() #initializing the session

In [74]:
df = spark.read.csv('train.csv', header = True, inferSchema=True) #Reading CSV File
df.show(5)

+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
| Loan_ID|Gender|Married|Dependents|   Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|LP001002|  Male|     No|         0|    Graduate|           No|           5849|              0.0|      NULL|             360|             1|        Urban|          Y|
|LP001003|  Male|    Yes|         1|    Graduate|           No|           4583|           1508.0|       128|             360|             1|        Rural|          N|
|LP001005|  Male|    Yes|         0|    Graduate|          Yes|           3000|              0.0|        66|             360|             1|        Urban|          Y

In [75]:
df.printSchema()

root
 |-- Loan_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Married: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = true)
 |-- ApplicantIncome: integer (nullable = true)
 |-- CoapplicantIncome: double (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- Loan_Amount_Term: integer (nullable = true)
 |-- Credit_History: integer (nullable = true)
 |-- Property_Area: string (nullable = true)
 |-- Loan_Status: string (nullable = true)



### Data Analysis

In [76]:
df.dtypes

[('Loan_ID', 'string'),
 ('Gender', 'string'),
 ('Married', 'string'),
 ('Dependents', 'string'),
 ('Education', 'string'),
 ('Self_Employed', 'string'),
 ('ApplicantIncome', 'int'),
 ('CoapplicantIncome', 'double'),
 ('LoanAmount', 'int'),
 ('Loan_Amount_Term', 'int'),
 ('Credit_History', 'int'),
 ('Property_Area', 'string'),
 ('Loan_Status', 'string')]

In [77]:
df.groupBy('Loan_Status').count().show() #display count based on Loan_Status

+-----------+-----+
|Loan_Status|count|
+-----------+-----+
|          Y|  422|
|          N|  192|
+-----------+-----+



In [78]:
df.select("Credit_History", "Loan_Status").groupBy('Loan_Status').agg(F.avg('Credit_History')).show() #Checking Average Credit History on Loan Status

+-----------+-------------------+
|Loan_Status|avg(Credit_History)|
+-----------+-------------------+
|          Y| 0.9818181818181818|
|          N| 0.5418994413407822|
+-----------+-------------------+



In [79]:
df.select('Gender','Loan_Status').groupBy('Loan_Status','Gender').count().show()

+-----------+------+-----+
|Loan_Status|Gender|count|
+-----------+------+-----+
|          N|Female|   37|
|          Y|  NULL|    8|
|          Y|Female|   75|
|          N|  NULL|    5|
|          Y|  Male|  339|
|          N|  Male|  150|
+-----------+------+-----+



### Perform SQL Operations

In [80]:
import pyspark.sql as sparksql

In [81]:
df.createOrReplaceTempView('table')

In [82]:
spark.sql("Select * from table limit 5").show()

+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
| Loan_ID|Gender|Married|Dependents|   Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|LP001002|  Male|     No|         0|    Graduate|           No|           5849|              0.0|      NULL|             360|             1|        Urban|          Y|
|LP001003|  Male|    Yes|         1|    Graduate|           No|           4583|           1508.0|       128|             360|             1|        Rural|          N|
|LP001005|  Male|    Yes|         0|    Graduate|          Yes|           3000|              0.0|        66|             360|             1|        Urban|          Y

In [83]:
spark.sql('Select Loan_ID from table where Credit_History = 1').show()

+--------+
| Loan_ID|
+--------+
|LP001002|
|LP001003|
|LP001005|
|LP001006|
|LP001008|
|LP001011|
|LP001013|
|LP001018|
|LP001020|
|LP001024|
|LP001027|
|LP001028|
|LP001029|
|LP001030|
|LP001032|
|LP001038|
|LP001041|
|LP001046|
|LP001066|
|LP001068|
+--------+
only showing top 20 rows



In [84]:
spark.sql('Select count(Loan_ID) from table where Credit_History = 1').show()

+--------------+
|count(Loan_ID)|
+--------------+
|           475|
+--------------+



### Data Preprocessing

In [85]:
df.select([F.count(F.when(F.col(c).isNull(),c)).alias(c) for c in df.columns]).show() #checking null values in each column

+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|Loan_ID|Gender|Married|Dependents|Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|
+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|      0|    13|      3|        15|        0|           32|              0|                0|        22|              14|            50|            0|          0|
+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+



In [86]:
#get mean value of a column
mean_loanAmount = df.select(F.mean(df['LoanAmount'])).collect()[0][0]
mean_loanAmount

146.41216216216216

In [87]:
df = df.na.fill(mean_loanAmount,['LoanAmount'])

In [88]:
#filling null values for numerical columns and categorical columns
numerical_cols = ['LoanAmount','Loan_Amount_Term']
categorical_cols =['Gender','Married','Dependents','Self_Employed','Credit_History']


In [89]:
for col in numerical_cols:
    df_mean = df.select(F.mean(df[col])).collect()[0][0]
    df = df.na.fill(df_mean,[col])

In [90]:
for col in categorical_cols:
    df_mode = df.groupBy(col).count().orderBy("count", ascending = False).first()[0]
    df = df.na.fill(df_mode,[col])

In [91]:
df.show()

+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
| Loan_ID|Gender|Married|Dependents|   Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|LP001002|  Male|     No|         0|    Graduate|           No|           5849|              0.0|       146|             360|             1|        Urban|          Y|
|LP001003|  Male|    Yes|         1|    Graduate|           No|           4583|           1508.0|       128|             360|             1|        Rural|          N|
|LP001005|  Male|    Yes|         0|    Graduate|          Yes|           3000|              0.0|        66|             360|             1|        Urban|          Y

In [92]:
df.select([F.count(F.when(F.col(c).isNull(),c)).alias(c) for c in df.columns]).show() #checking null values in each column

+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|Loan_ID|Gender|Married|Dependents|Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|
+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|      0|     0|      0|         0|        0|            0|              0|                0|         0|               0|             0|            0|          0|
+-------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+



In [93]:
df = df.withColumn('TotalIncome', F.col('ApplicantIncome')+F.col('CoapplicantIncome')) #Creating a new column with combined income of applicant and coapplicant
df.show(10)

+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+-----------+
| Loan_ID|Gender|Married|Dependents|   Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|TotalIncome|
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+-----------+
|LP001002|  Male|     No|         0|    Graduate|           No|           5849|              0.0|       146|             360|             1|        Urban|          Y|     5849.0|
|LP001003|  Male|    Yes|         1|    Graduate|           No|           4583|           1508.0|       128|             360|             1|        Rural|          N|     6091.0|
|LP001005|  Male|    Yes|         0|    Graduate|          Yes|           3000|              0.0|        

In [94]:
df = df.withColumn('Loan_Status', F.when(df['Loan_Status']=='Y', 1).otherwise(0)) #changing loan status to numerical value
df.show(10)

+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+-----------+
| Loan_ID|Gender|Married|Dependents|   Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|TotalIncome|
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+-----------+
|LP001002|  Male|     No|         0|    Graduate|           No|           5849|              0.0|       146|             360|             1|        Urban|          1|     5849.0|
|LP001003|  Male|    Yes|         1|    Graduate|           No|           4583|           1508.0|       128|             360|             1|        Rural|          0|     6091.0|
|LP001005|  Male|    Yes|         0|    Graduate|          Yes|           3000|              0.0|        

### Feature Engineering

In [95]:
df.printSchema()

root
 |-- Loan_ID: string (nullable = true)
 |-- Gender: string (nullable = false)
 |-- Married: string (nullable = false)
 |-- Dependents: string (nullable = false)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = false)
 |-- ApplicantIncome: integer (nullable = true)
 |-- CoapplicantIncome: double (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- Loan_Amount_Term: integer (nullable = true)
 |-- Credit_History: integer (nullable = true)
 |-- Property_Area: string (nullable = true)
 |-- Loan_Status: integer (nullable = false)
 |-- TotalIncome: double (nullable = true)



In [96]:
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline

In [97]:
numerical_columns = ['LoanAmount','Loan_Amount_Term','ApplicantIncome','CoapplicantIncome','TotalIncome']
categorical_columns =['Gender','Married','Dependents','Self_Employed','Credit_History','Property_Area']


#index the string columns
indexers = [StringIndexer(inputCol=col, outputCol="{0}_index".format(col)) for col in categorical_columns]

#encode the indexed values
encoders = [OneHotEncoder(dropLast=False, inputCol=indexer.getOutputCol(), outputCol="{0}_encoded".format(indexer.getOutputCol())) for indexer in indexers]

input_cols = [encoder.getOutputCol() for encoder in encoders] + numerical_columns

#vectorize the encoded values
assembler = VectorAssembler(inputCols=input_cols, outputCol="feature")

In [98]:
#create the pipeline to transform the data
pipeline = Pipeline(stages = indexers + encoders + [assembler])

In [99]:
data_model = pipeline.fit(df)

In [100]:
transformed_df = data_model.transform(df)

In [101]:
transformed_df.show(1)

+--------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+-----------+------------+-------------+----------------+-------------------+--------------------+-------------------+--------------------+---------------------+------------------------+---------------------------+----------------------------+---------------------------+--------------------+
| Loan_ID|Gender|Married|Dependents|Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|TotalIncome|Gender_index|Married_index|Dependents_index|Self_Employed_index|Credit_History_index|Property_Area_index|Gender_index_encoded|Married_index_encoded|Dependents_index_encoded|Self_Employed_index_encoded|Credit_History_index_encoded|Property_Area_index_encoded|             feature|
+--------+------+-------+----------+---------+-------------+---------------+--------------

In [102]:
transformed_df = transformed_df.select(['feature','Loan_Status'])

In [103]:
train_data,test_data = transformed_df.randomSplit([0.9,0.1], seed = 42) 

### Model Training and Testing

In [104]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [105]:
#Logistic Regression
lr = LogisticRegression(featuresCol= 'feature', labelCol='Loan_Status')
lr_model = lr.fit(train_data)

In [106]:
lr_predict = lr_model.transform(test_data)
lr_predict.show(5)

+--------------------+-----------+--------------------+--------------------+----------+
|             feature|Loan_Status|       rawPrediction|         probability|prediction|
+--------------------+-----------+--------------------+--------------------+----------+
|(20,[0,2,4,8,10,1...|          1|[-2.1342024710992...|[0.10581669665579...|       1.0|
|(20,[0,2,4,8,10,1...|          1|[-2.1263775416765...|[0.10655937471684...|       1.0|
|(20,[0,2,4,8,10,1...|          1|[-1.6643699192816...|[0.15917625909110...|       1.0|
|(20,[0,2,4,8,10,1...|          1|[-1.4283229574072...|[0.19336012139024...|       1.0|
|(20,[0,2,4,8,10,1...|          1|[-1.4948772407310...|[0.18319080851177...|       1.0|
+--------------------+-----------+--------------------+--------------------+----------+
only showing top 5 rows



In [107]:
auc = BinaryClassificationEvaluator().setLabelCol('Loan_Status')
print('AUC:', str(auc.evaluate(lr_predict)))

AUC: 0.7226890756302521


In [108]:
#RandomForest Classifier
rf = RandomForestClassifier(featuresCol='feature', labelCol='Loan_Status')
rf_model = rf.fit(train_data)

In [110]:
rf_predict = rf_model.transform(test_data)
auc = BinaryClassificationEvaluator().setLabelCol('Loan_Status')
print('AUC:', str(auc.evaluate(rf_predict)))

AUC: 0.7710084033613446
