<a href="https://colab.research.google.com/github/Kamran-imaz/login-Page/blob/master/Loan_prediction_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Table of Contents**


---


*   Demonstration of Loan Approval Analysis with Decision Tree
*   Setting up environments

*   File Path and Dataset Loading
*   Data Preprocessing
*   Implementing Logistic Regression
*   Implementing Random Forest
*   Implementing Decision Tree




In [134]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

def train_decision_tree_model(file_path):

    #spark setup creation..
    spark = SparkSession.builder.master("local[*]").getOrCreate()
    data = spark.read.csv(file_path, header=True, inferSchema=True)
    data = data.select([data[col].alias(col.strip()) for col in data.columns])

    # here we are doing the data preprocessing using string indexer and onehot encoding.....
    indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid='keep').fit(data) for column in ["education", "self_employed", "loan_status"]]
    pipeline = Pipeline(stages=indexers)
    data = pipeline.fit(data).transform(data)

    encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_encoded") for column in ["education", "self_employed"]]
    pipeline = Pipeline(stages=encoders)
    data = pipeline.fit(data).transform(data)

    assembler = VectorAssembler(inputCols=['no_of_dependents','education_encoded','self_employed_encoded',
                                           'income_annum','loan_amount','loan_term','cibil_score','residential_assets_value',
                                           'commercial_assets_value','luxury_assets_value','bank_asset_value'], outputCol='features')
    data = assembler.transform(data)

    # Splitting data into train and test sets
    train_data, test_data = data.randomSplit([0.8,0.7], seed=42)

    # Training
    dt = DecisionTreeClassifier(labelCol='loan_status_index', featuresCol='features')
    model = dt.fit(train_data)

    return model

def predict_loan_probability(model, parameters):
    # Setting of session
    spark = SparkSession.builder.master("local[*]").getOrCreate()

    #schema for prediction DataFrame
    schema = StructType([
        StructField('no_of_dependents', IntegerType(), True),
        StructField('education', StringType(), True),
        StructField('self_employed', StringType(), True),
        StructField('income_annum', IntegerType(), True),
        StructField('loan_amount', IntegerType(), True),
        StructField('loan_term', IntegerType(), True),
        StructField('cibil_score', IntegerType(), True),
        StructField('residential_assets_value', IntegerType(), True),
        StructField('commercial_assets_value', IntegerType(), True),
        StructField('luxury_assets_value', IntegerType(), True),
        StructField('bank_asset_value', IntegerType(), True)
    ])
    # Create a DataFrame with the provided parameters
    data = [parameters]
    predict_df = spark.createDataFrame(data, schema)

    # Apply the same transformations and assembler used during training
    indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid='keep').fit(predict_df) for column in ["education", "self_employed"]]
    pipeline = Pipeline(stages=indexers)
    predict_df = pipeline.fit(predict_df).transform(predict_df)

    encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_encoded") for column in ["education", "self_employed"]]
    pipeline = Pipeline(stages=encoders)
    predict_df = pipeline.fit(predict_df).transform(predict_df)

    assembler = VectorAssembler(inputCols=['no_of_dependents','education_encoded','self_employed_encoded',
                                           'income_annum','loan_amount','loan_term','cibil_score','residential_assets_value',
                                           'commercial_assets_value','luxury_assets_value','bank_asset_value'], outputCol='features')
    predict_df = assembler.transform(predict_df)

    # Prediction
    predictions = model.transform(predict_df)
    probability = predictions.select('probability').collect()[0][0]

    return probability




In [136]:
# Example data
file_path = '/content/loan_approval_dataset.csv'
trained_model = train_decision_tree_model(file_path)

parameters = (1, 'Graduate', 'No', 50000, 200000, 36, 750, 300000, 500000, 100000, 800000)

probability = predict_loan_probability(trained_model, parameters)
print('Probability of getting loan:', probability[1]*100)

Probability of getting loan: 6.0606060606060606


**Setting up Environments**

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark
!pip install -q findspark

import os
import findspark
from pyspark.sql import SparkSession

# Set up the environment
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
findspark.init()

# Create a Spark session
spark = SparkSession.builder.master("local[*]").getOrCreate()


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=c880a2d855ff61ea1be98c441aa9245c7580bfcdc603962139af1d5b4c263b9c
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


**File Path and Dataset Loading**

In [63]:
file_path = '/content/loan_approval_dataset.csv'

In [64]:
data = spark.read.csv(file_path,header=True,inferSchema=True)
data.show()

+-------+-----------------+-------------+--------------+-------------+------------+----------+------------+-------------------------+------------------------+--------------------+-----------------+------------+
|loan_id| no_of_dependents|    education| self_employed| income_annum| loan_amount| loan_term| cibil_score| residential_assets_value| commercial_assets_value| luxury_assets_value| bank_asset_value| loan_status|
+-------+-----------------+-------------+--------------+-------------+------------+----------+------------+-------------------------+------------------------+--------------------+-----------------+------------+
|      1|                2|     Graduate|            No|      9600000|    29900000|        12|         778|                  2400000|                17600000|            22700000|          8000000|    Approved|
|      2|                0| Not Graduate|           Yes|      4100000|    12200000|         8|         417|                  2700000|                 220000

**Data Preprocessing**

In [66]:
#data preprocessing

from pyspark.ml.feature import StringIndexer, OneHotEncoder,VectorAssembler
from pyspark.ml import Pipeline
data = data.select([data[col].alias(col.strip()) for col in data.columns])
indexers =[StringIndexer(inputCol=column,outputCol=column+"_index").fit(data) for column in ["education","self_employed", "loan_status"]]
pipeline = Pipeline(stages=indexers)
data = pipeline.fit(data).transform(data)

In [67]:
data.show()

+-------+----------------+-------------+-------------+------------+-----------+---------+-----------+------------------------+-----------------------+-------------------+----------------+-----------+---------------+-------------------+-----------------+
|loan_id|no_of_dependents|    education|self_employed|income_annum|loan_amount|loan_term|cibil_score|residential_assets_value|commercial_assets_value|luxury_assets_value|bank_asset_value|loan_status|education_index|self_employed_index|loan_status_index|
+-------+----------------+-------------+-------------+------------+-----------+---------+-----------+------------------------+-----------------------+-------------------+----------------+-----------+---------------+-------------------+-----------------+
|      1|               2|     Graduate|           No|     9600000|   29900000|       12|        778|                 2400000|               17600000|           22700000|         8000000|   Approved|            0.0|                1.0|   

In [68]:
encoders = [OneHotEncoder(inputCol=column+"_index",outputCol=column+"_encoded") for column in ["education","self_employed"]]
pipeline = Pipeline(stages=encoders)
data = pipeline.fit(data).transform(data)


In [69]:
data.show()

+-------+----------------+-------------+-------------+------------+-----------+---------+-----------+------------------------+-----------------------+-------------------+----------------+-----------+---------------+-------------------+-----------------+-----------------+---------------------+
|loan_id|no_of_dependents|    education|self_employed|income_annum|loan_amount|loan_term|cibil_score|residential_assets_value|commercial_assets_value|luxury_assets_value|bank_asset_value|loan_status|education_index|self_employed_index|loan_status_index|education_encoded|self_employed_encoded|
+-------+----------------+-------------+-------------+------------+-----------+---------+-----------+------------------------+-----------------------+-------------------+----------------+-----------+---------------+-------------------+-----------------+-----------------+---------------------+
|      1|               2|     Graduate|           No|     9600000|   29900000|       12|        778|                 

In [70]:
assembler = VectorAssembler(inputCols=['no_of_dependents','education_encoded','self_employed_encoded','income_annum','loan_amount','loan_term','cibil_score','residential_assets_value','commercial_assets_value','luxury_assets_value','bank_asset_value'],outputCol='features')
data = assembler.transform(data)
data.show()

+-------+----------------+-------------+-------------+------------+-----------+---------+-----------+------------------------+-----------------------+-------------------+----------------+-----------+---------------+-------------------+-----------------+-----------------+---------------------+--------------------+
|loan_id|no_of_dependents|    education|self_employed|income_annum|loan_amount|loan_term|cibil_score|residential_assets_value|commercial_assets_value|luxury_assets_value|bank_asset_value|loan_status|education_index|self_employed_index|loan_status_index|education_encoded|self_employed_encoded|            features|
+-------+----------------+-------------+-------------+------------+-----------+---------+-----------+------------------------+-----------------------+-------------------+----------------+-----------+---------------+-------------------+-----------------+-----------------+---------------------+--------------------+
|      1|               2|     Graduate|           No| 

**Implementing Logistic Regression**

In [71]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

lr = LogisticRegression(labelCol='loan_status_index',featuresCol='features')
train_data,test_data = data.randomSplit([0.7,0.3],seed=42)

In [72]:
model = lr.fit(train_data)

In [93]:
predictions = model.transform(test_data)


In [138]:
predictions.select('probability').collect()[0]


Row(probability=DenseVector([0.173, 0.827]))

In [102]:
evaluator = BinaryClassificationEvaluator(labelCol='loan_status_index')
accuracy = evaluator.evaluate(predictions)
print('Accuracy using Logistic Regression is:',accuracy*100)

Accuracy using Logistic Regression is: 96.95360440442722


**Implementing Random Forest**

In [98]:
from pyspark.ml.classification import RandomForestClassifier
rf= RandomForestClassifier(labelCol='loan_status_index',featuresCol='features')


In [99]:
model_rf=rf.fit(train_data)
prediction_rf = model.transform(test_data)

In [101]:
evaluator_lr = BinaryClassificationEvaluator(labelCol='loan_status_index')
accuracy_lr = evaluator_lr.evaluate(prediction_rf)
print("Accuracy of Random Forest is: ", accuracy_lr*100)

Accuracy of Random Forest is:  96.95360440442722


**Implementing Decision Tree**

In [104]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol='loan_status_index',featuresCol='features')

In [105]:
model = dt.fit(train_data)
predictions_dt = model.transform(test_data)

In [107]:
evaluate_dt = BinaryClassificationEvaluator(labelCol='loan_status_index')
accuracy_dt = evaluate_dt.evaluate(predictions_dt)
print("Accuracy of Decision Tree is: ", accuracy_dt*100)

Accuracy of Decision Tree is:  97.38200378505476


**Caluclation of F1 Scores of Logistic Regression, Random Forest and Decision Tree**

In [117]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol='loan_status_index',predictionCol='prediction',metricName='f1')
precision = evaluator.evaluate(predictions)
print("Precision of Logistic Regression is: ", precision*100)
precision = evaluator.evaluate(prediction_rf)
print("Precision of Random Forest is: ", precision*100)
precision = evaluator.evaluate(predictions_dt)
print("Precision of Decision Tree is: ", precision*100)

Precision of Logistic Regression is:  91.4191419141914
Precision of Random Forest is:  91.4191419141914
Precision of Decision Tree is:  95.79919474328528
