#Installing Spark


In [None]:
!apt-get update > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget -q http://apache.osuosl.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

In [None]:
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").config('spark.ui.port', '4050').getOrCreate()
spark

In [None]:
sc = spark.sparkContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

#Loading Data

In [None]:
!gdown  --id 1ndXGlbvZ9afiqmhLBoHXt3E_HAU29Luu

Downloading...
From: https://drive.google.com/uc?id=1ndXGlbvZ9afiqmhLBoHXt3E_HAU29Luu
To: /content/loan_new.csv
  0% 0.00/33.8k [00:00<?, ?B/s]100% 33.8k/33.8k [00:00<00:00, 12.5MB/s]


In [None]:
df = sc.textFile("loan_new.csv")

In [None]:
type(df)

pyspark.rdd.RDD

In [None]:
df.take(5)

[',Loan_ID,Gender,Married,Dependents,Education,Self_Employed,ApplicantIncome,CoapplicantIncome,LoanAmount,Loan_Amount_Term,Credit_History,Property_Area,Loan_Status',
 '0,LP001002,1,0,0,0,0,5849,0.0,146.41216216216216,360.0,1.0,2,1',
 '1,LP001003,1,1,1,0,0,4583,1508.0,128.0,360.0,1.0,0,0',
 '2,LP001005,1,1,0,0,1,3000,0.0,66.0,360.0,1.0,2,1',
 '3,LP001006,1,1,0,1,0,2583,2358.0,120.0,360.0,1.0,2,1']

In [None]:
df.count()

615

In [None]:
header = df.first() #extract header
df = df.filter(lambda row:row != header)   #filter out header

In [None]:
df.take(5)

['0,LP001002,1,0,0,0,0,5849,0.0,146.41216216216216,360.0,1.0,2,1',
 '1,LP001003,1,1,1,0,0,4583,1508.0,128.0,360.0,1.0,0,0',
 '2,LP001005,1,1,0,0,1,3000,0.0,66.0,360.0,1.0,2,1',
 '3,LP001006,1,1,0,1,0,2583,2358.0,120.0,360.0,1.0,2,1',
 '4,LP001008,1,0,0,0,0,6000,0.0,141.0,360.0,1.0,2,1']

#Exploratory Data Analysis

In [None]:
import numpy as np

def parse_interaction(line):
    line_split = line.split(",")
    symbolic_indexes = [0,1,2,4,5,6,11,12,13]
    clean_line_split = [item for i,item in enumerate(line_split) if i not in symbolic_indexes]
    return np.array([float(x) for x in clean_line_split])

vector_data = df.map(parse_interaction)
vector_data.take(5)

[array([   0.        , 5849.        ,    0.        ,  146.41216216,
         360.        ]),
 array([1.000e+00, 4.583e+03, 1.508e+03, 1.280e+02, 3.600e+02]),
 array([1.0e+00, 3.0e+03, 0.0e+00, 6.6e+01, 3.6e+02]),
 array([1.000e+00, 2.583e+03, 2.358e+03, 1.200e+02, 3.600e+02]),
 array([   0., 6000.,    0.,  141.,  360.])]

In [None]:
from pyspark.mllib.stat import Statistics 
from math import sqrt 


summary = Statistics.colStats(vector_data)

In [None]:
summary.mean()

array([6.53094463e-01, 5.40345928e+03, 1.62124580e+03, 1.46412162e+02,
       3.42410423e+02])

In [None]:
print (" Dependent Statistics:")
print (" Mean: ", round(summary.mean()[0],3))
print (" St. deviation: ", round(sqrt(summary.variance()[0]),3))
print (" Max value: ", round(summary.max()[0],3))
print (" Min value: ", round(summary.min()[0],3))
print (" Total value count: ", summary.count())
print (" Number of non-zero values: ", summary.numNonzeros()[0])

 Dependent Statistics:
 Mean:  0.653
 St. deviation:  0.476
 Max value:  1.0
 Min value:  0.0
 Total value count:  614
 Number of non-zero values:  401.0


In [None]:
print (" ApplicantIncome Statistics:")
print (" Mean: ", round(summary.mean()[1],3))
print (" St. deviation: ", round(sqrt(summary.variance()[1]),3))
print (" Max value: ", round(summary.max()[1],3))
print (" Min value: ", round(summary.min()[1],3))
print (" Total value count: ", summary.count())
print (" Number of non-zero values: ", summary.numNonzeros()[1])

 ApplicantIncome Statistics:
 Mean:  5403.459
 St. deviation:  6109.042
 Max value:  81000.0
 Min value:  150.0
 Total value count:  614
 Number of non-zero values:  614.0


In [None]:
print (" Co-ApplicantIncome Statistics:")
print (" Mean: ", round(summary.mean()[2],3))
print (" St. deviation: ", round(sqrt(summary.variance()[2]),3))
print (" Max value: ", round(summary.max()[2],3))
print (" Min value: ", round(summary.min()[2],3))
print (" Total value count: ", summary.count())
print (" Number of non-zero values: ", summary.numNonzeros()[2])

 Co-ApplicantIncome Statistics:
 Mean:  1621.246
 St. deviation:  2926.248
 Max value:  41667.0
 Min value:  0.0
 Total value count:  614
 Number of non-zero values:  341.0


In [None]:
print (" LoanAmount Statistics:")
print (" Mean: ", round(summary.mean()[3],3))
print (" St. deviation: ", round(sqrt(summary.variance()[3]),3))
print (" Max value: ", round(summary.max()[3],3))
print (" Min value: ", round(summary.min()[3],3))
print (" Total value count: ", summary.count())
print (" Number of non-zero values: ", summary.numNonzeros()[3])

 LoanAmount Statistics:
 Mean:  146.412
 St. deviation:  84.037
 Max value:  700.0
 Min value:  9.0
 Total value count:  614
 Number of non-zero values:  614.0


In [None]:
print (" LoanAmountTerm Statistics:")
print (" Mean: ", round(summary.mean()[4],3))
print (" St. deviation: ", round(sqrt(summary.variance()[4]),3))
print (" Max value: ", round(summary.max()[4],3))
print (" Min value: ", round(summary.min()[4],3))
print (" Total value count: ", summary.count())
print (" Number of non-zero values: ", summary.numNonzeros()[4])

 LoanAmountTerm Statistics:
 Mean:  342.41
 St. deviation:  64.429
 Max value:  480.0
 Min value:  12.0
 Total value count:  614
 Number of non-zero values:  614.0


#splitting the data

In [None]:
df_train, df_test = df.randomSplit([0.70,0.30])

In [None]:
df_train.take(10)

['0,LP001002,1,0,0,0,0,5849,0.0,146.41216216216216,360.0,1.0,2,1',
 '2,LP001005,1,1,0,0,1,3000,0.0,66.0,360.0,1.0,2,1',
 '5,LP001011,1,1,2,0,1,5417,4196.0,267.0,360.0,1.0,2,1',
 '6,LP001013,1,1,0,1,0,2333,1516.0,95.0,360.0,1.0,2,1',
 '8,LP001018,1,1,2,0,0,4006,1526.0,168.0,360.0,1.0,2,1',
 '10,LP001024,1,1,2,0,0,3200,700.0,70.0,360.0,1.0,2,1',
 '11,LP001027,1,1,2,0,0,2500,1840.0,109.0,360.0,1.0,2,1',
 '13,LP001029,1,0,0,0,0,1853,2840.0,114.0,360.0,1.0,0,0',
 '15,LP001032,1,0,0,0,0,4950,0.0,125.0,360.0,1.0,2,1',
 '16,LP001034,1,0,1,1,0,3596,0.0,100.0,240.0,1.0,2,1']

In [None]:
df_train.count()

391

In [None]:
df_test.count()

223

#Removing the categorical data 

In [None]:
from pyspark.mllib.regression import LabeledPoint
from numpy import array

def parse_interaction(line):
    line_split = line.split(",")
    clean_line_split = line_split[2:13]
    Loan_Status = 1.0
    if line_split[13]=='0':
        Loan_Status = 0.0
    return LabeledPoint(Loan_Status, array([float(x) for x in clean_line_split]))

In [None]:
training_data = df_train.map(parse_interaction)

In [None]:
df_train.take(5)

['0,LP001002,1,0,0,0,0,5849,0.0,146.41216216216216,360.0,1.0,2,1',
 '2,LP001005,1,1,0,0,1,3000,0.0,66.0,360.0,1.0,2,1',
 '5,LP001011,1,1,2,0,1,5417,4196.0,267.0,360.0,1.0,2,1',
 '6,LP001013,1,1,0,1,0,2333,1516.0,95.0,360.0,1.0,2,1',
 '8,LP001018,1,1,2,0,0,4006,1526.0,168.0,360.0,1.0,2,1']

In [None]:
training_data.take(5)

[LabeledPoint(1.0, [1.0,0.0,0.0,0.0,0.0,5849.0,0.0,146.41216216216216,360.0,1.0,2.0]),
 LabeledPoint(1.0, [1.0,1.0,0.0,0.0,1.0,3000.0,0.0,66.0,360.0,1.0,2.0]),
 LabeledPoint(1.0, [1.0,1.0,2.0,0.0,1.0,5417.0,4196.0,267.0,360.0,1.0,2.0]),
 LabeledPoint(1.0, [1.0,1.0,0.0,1.0,0.0,2333.0,1516.0,95.0,360.0,1.0,2.0]),
 LabeledPoint(1.0, [1.0,1.0,2.0,0.0,0.0,4006.0,1526.0,168.0,360.0,1.0,2.0])]

In [None]:
test_data = df_test.map(parse_interaction)

In [None]:
test_data.take(5)

[LabeledPoint(0.0, [1.0,1.0,1.0,0.0,0.0,4583.0,1508.0,128.0,360.0,1.0,0.0]),
 LabeledPoint(1.0, [1.0,1.0,0.0,1.0,0.0,2583.0,2358.0,120.0,360.0,1.0,2.0]),
 LabeledPoint(1.0, [1.0,0.0,0.0,0.0,0.0,6000.0,0.0,141.0,360.0,1.0,2.0]),
 LabeledPoint(0.0, [1.0,1.0,3.0,0.0,0.0,3036.0,2504.0,158.0,360.0,0.0,1.0]),
 LabeledPoint(0.0, [1.0,1.0,1.0,0.0,0.0,12841.0,10968.0,349.0,360.0,1.0,1.0])]

#Training the Logistic Regression Model

In [None]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from time import time

t0 = time()
logit_model = LogisticRegressionWithLBFGS.train(training_data)
tt = time() - t0

print ("Classifier trained in ",round(tt,3), " seconds")

Classifier trained in  4.028  seconds


#Evaluating the Model

In [None]:
labels_and_preds = test_data.map(lambda p: (p.label, logit_model.predict(p.features)))

In [None]:
labels_and_preds.take(5)

[(0.0, 1), (1.0, 1), (1.0, 1), (0.0, 0), (0.0, 1)]

In [None]:
t0 = time()
test_accuracy = labels_and_preds.filter(lambda x : x[0] == x[1]).count() / float(test_data.count())
tt = time() - t0

print ("Prediction made in ",round(tt,3)," seconds. Test accuracy is ",round(test_accuracy,3))

Prediction made in  0.59  seconds. Test accuracy is  0.807
