## Logistic Regression

### Aim:
The project aims to make use of Python and Spark to extract insights from the data.

Secondly, to learn how to use ML Pipeline which provides a uniform set of high-level APIs on top of DataFrames.

And in the end, to predict whether the loan applicant can replay the loan or not using logistic regression.

### Attributes in the dataset:
Loan id, Gender, Married, Dependents, Education, Self Employed, Applicant income, Coapplicant income, Loan Amount,Credit History, Property_Area, Loan_Status


In [382]:
import findspark
findspark.init("/home/meghdad/spark-2.4.5-bin-hadoop2.7")

In [383]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("loan_prediction").getOrCreate()

In [386]:
df=spark.read.csv("loan_prediction.csv",header=True,inferSchema=True)

## number of rows and columns
print(df.count(),len(df.columns))

## variable types
df.printSchema()

614 13
root
 |-- Loan_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Married: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = true)
 |-- ApplicantIncome: integer (nullable = true)
 |-- CoapplicantIncome: double (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- Loan_Amount_Term: integer (nullable = true)
 |-- Credit_History: integer (nullable = true)
 |-- Property_Area: string (nullable = true)
 |-- Loan_Status: string (nullable = true)



In [387]:
from pyspark.sql.functions import when
## create integer target variable 
df=df.withColumn("target",when(df["Loan_Status"]=="Y",1).otherwise(0))
df.groupBy("target").count().show()

+------+-----+
|target|count|
+------+-----+
|     1|  422|
|     0|  192|
+------+-----+



In [389]:
## delete loan_ID and Loan_Status 
df=df.drop("Loan_ID","Loan_Status")

for item in df.head(1)[0]:
    print(item)

Male
No
0
Graduate
No
5849
0.0
None
360
1
Urban
1


In [390]:
## missing values
from pyspark.sql.functions import col,sum
df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()


+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+------+
|Gender|Married|Dependents|Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|target|
+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+------+
|    13|      3|        15|        0|           32|              0|                0|        22|              14|            50|            0|     0|
+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+------+



In [391]:
## seperating string and numeric columns
string_cols=[item[0] for item in df.dtypes if item[1].startswith("string") ]
numeric_cols=[item[0] for item in df.dtypes if item[1].startswith("int" or "double")]

## exclude target variable 
numeric_cols=numeric_cols[0:-1]

In [392]:
## filling missing values with numeric column's mean
for item in numeric_cols:
    mean_col=df.groupBy().mean(item).collect()[0][0]
    print(mean_col)
    df=df.na.fill(mean_col,subset=[item])

5403.459283387622
146.41216216216216
342.0
0.8421985815602837


In [393]:
## filling missing values with string column's mode
for item in string_cols:
    df_mode=df.groupBy(item).count()
    mode_col=df_mode.orderBy(df_mode["Count"].desc()).collect()[0][0]
    print(mode_col)
    df=df.na.fill(mode_col,subset=[item])

Male
Yes
0
Graduate
No
Semiurban


### Setting Up DataFrame for Machine Learning

In [394]:
from pyspark.ml.feature import (StringIndexer,
                                OneHotEncoder,
                                VectorAssembler)

In [398]:
## the index of string values multiple columns 
indexers=[StringIndexer(inputCol=col,
                        outputCol="{0}_Index".format(col)) 
          for col in string_cols]

## the encode of indexed values multiple columns
encoders=[OneHotEncoder(inputCol=indexer.getOutputCol(),
                       outputCol="{0}_Vec".format(indexer.getOutputCol())) 
         for indexer in indexers]

## combine numeric and encoded string columns
assb_cols=numeric_cols+[encoder.getOutputCol() for encoder in encoders]

## input columns for vector assembler
for item in assb_cols:
    print(item)

assembler=VectorAssembler(inputCols=assb_cols,outputCol="features")

ApplicantIncome
LoanAmount
Loan_Amount_Term
Credit_History
Gender_Index_Vec
Married_Index_Vec
Dependents_Index_Vec
Education_Index_Vec
Self_Employed_Index_Vec
Property_Area_Index_Vec


### Test Train Split

In [432]:
train_data, test_data = df.randomSplit([0.7,0.3])

### Fit the model

In [433]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [510]:
log_reg=LogisticRegression(featuresCol="features",labelCol="target")

pipeline=Pipeline(stages=indexers+encoders+[assembler,log_reg])

In [512]:
fit_model=pipeline.fit(train_data)
predictions=fit_model.transform(test_data)

DataFrame[Gender: string, Married: string, Dependents: string, Education: string, Self_Employed: string, ApplicantIncome: int, CoapplicantIncome: double, LoanAmount: int, Loan_Amount_Term: int, Credit_History: int, Property_Area: string, target: int, Gender_Index: double, Married_Index: double, Dependents_Index: double, Education_Index: double, Self_Employed_Index: double, Property_Area_Index: double, Gender_Index_Vec: vector, Married_Index_Vec: vector, Dependents_Index_Vec: vector, Education_Index_Vec: vector, Self_Employed_Index_Vec: vector, Property_Area_Index_Vec: vector, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [436]:
predictions.select("prediction","target").show(5)

+----------+------+
|prediction|target|
+----------+------+
|       1.0|     1|
|       0.0|     0|
|       0.0|     1|
|       1.0|     1|
|       1.0|     1|
+----------+------+
only showing top 5 rows



### Confusion matrix

In [437]:
from pyspark.sql.types import FloatType
from pyspark.mllib.evaluation import MulticlassMetrics
## need to cast to float type and order by prediction else won't work
preds_and_labels=predictions.select("prediction","target").withColumn("label",col("target").cast(FloatType())).orderBy("prediction")
## select only prediction and label columns
preds_and_labels=preds_and_labels.select("prediction","label")
metrics=MulticlassMetrics(preds_and_labels.rdd.map(tuple))

print(metrics.confusionMatrix().toArray())

[[ 24.  18.]
 [  8. 113.]]


### Evaluators

In [533]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [534]:
my_eval=BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="target")
AUC=my_eval.evaluate(predictions)
print("AUC is :", AUC)

AUC is : 0.7526564344746163


### Predict on new data
In case of prediction on new data the pipeline should not include classifier.

In [546]:
pipeline=Pipeline(stages=indexers+encoders+[assembler])

In [547]:
final_data=pipeline.fit(df).transform(df)
final_data.select("features").show(10)

+--------------------+
|            features|
+--------------------+
|[5849.0,146.0,360...|
|[4583.0,128.0,360...|
|[3000.0,66.0,360....|
|[2583.0,120.0,360...|
|[6000.0,141.0,360...|
|[5417.0,267.0,360...|
|[2333.0,95.0,360....|
|[3036.0,158.0,360...|
|[4006.0,168.0,360...|
|[12841.0,349.0,36...|
+--------------------+
only showing top 10 rows



In [548]:
log_reg=LogisticRegression(featuresCol="features",labelCol="target")
final_model=log_reg.fit(final_data)

In [557]:
unlabeled=spark.read.csv("unlabeled_loan.csv",header=True,inferSchema=True)

print("No. of rows: ",unlabeled.count(),", No. of columns: ",len(unlabeled.columns))

No. of rows:  367 , No. of columns:  12


In [558]:
data=pipeline.fit(unlabeled).transform(unlabeled)
data.printSchema()

root
 |-- Loan_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Married: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Self_Employed: string (nullable = true)
 |-- ApplicantIncome: integer (nullable = true)
 |-- CoapplicantIncome: integer (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- Loan_Amount_Term: integer (nullable = true)
 |-- Credit_History: integer (nullable = true)
 |-- Property_Area: string (nullable = true)
 |-- Gender_Index: double (nullable = false)
 |-- Married_Index: double (nullable = false)
 |-- Dependents_Index: double (nullable = false)
 |-- Education_Index: double (nullable = false)
 |-- Self_Employed_Index: double (nullable = false)
 |-- Property_Area_Index: double (nullable = false)
 |-- Gender_Index_Vec: vector (nullable = true)
 |-- Married_Index_Vec: vector (nullable = true)
 |-- Dependents_Index_Vec: vector (nullable = true)
 |-- Education_Index_Vec: ve

In [577]:
unlabeled_predictions=final_model.transform(data)