<a href="https://colab.research.google.com/github/anshupandey/Machine_Learning_Training/blob/master/Apache_Spark_Logistic_Regression.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# <font color=#FF6378> <b> Big Data Analysis with Apache Spark </font>


## <b> Section: Spark MLLib </b>
***
***

## Objective
***
* Logistic Regression with Pyspark


### Logistic Regression in PySpark
***

PySpark Logistic Regression is a type of supervised machine learning model which comes under the classification type . This algorithm defines the relation among the data and classify the data according the relation among them . The logistic regression is the fundamental technique in classification that is relatively faster and easier to compute.

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

In [None]:
!tar xzf spark-3.2.1-bin-hadoop3.2.tgz

In [None]:
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.types import * 
import pyspark.sql.functions as F
from pyspark.sql.functions import col, asc,desc
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from pyspark.sql import SQLContext
from pyspark.mllib.stat import Statistics
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.ml import Pipeline
from sklearn.metrics import confusion_matrix

In [None]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

Now we create an instance of SparkSession and name the app.

Here we present a synthetic dataset generated using the simulator called PaySim. PaySim uses aggregated data from the private dataset to generate a synthetic dataset that resembles the normal operation of transactions and injects malicious behaviour to later evaluate the performance of fraud detection methods.

In [None]:
!wget -q https://www.dropbox.com/s/c99v1df5i4nvbpg/PS_20174392719_1491204439457_log.csv

In [None]:
spark=SparkSession.builder.appName('synthetic-paysim').getOrCreate()
  
#create spark dataframe of input csv file
df=spark.read.csv('PS_20174392719_1491204439457_log.csv',inferSchema=True,header=True)
df.show(10)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

In [None]:
df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [None]:
df.columns

['step',
 'type',
 'amount',
 'nameOrig',
 'oldbalanceOrg',
 'newbalanceOrig',
 'nameDest',
 'oldbalanceDest',
 'newbalanceDest',
 'isFraud',
 'isFlaggedFraud']

We select only a few columns. 

In [None]:
df = df.select("type", "amount", "oldbalanceOrg", "newbalanceOrig", "isFraud")

In [None]:
df.show(2)

+-------+-------+-------------+--------------+-------+
|   type| amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+-------+-------------+--------------+-------+
|PAYMENT|9839.64|     170136.0|     160296.36|      0|
|PAYMENT|1864.28|      21249.0|      19384.72|      0|
+-------+-------+-------------+--------------+-------+
only showing top 2 rows



Now we split the data into train and test.

In [None]:
train, test = df.randomSplit([0.7, 0.3], seed=7)

In [None]:
print(f"Train set length: {train.count()} records")
print(f"Test set length: {test.count()} records")

Train set length: 4451490 records
Test set length: 1911130 records


In [None]:
train.show(2)

+-------+------+-------------+--------------+-------+
|   type|amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+------+-------------+--------------+-------+
|CASH_IN|  1.42|   1270713.41|    1270714.83|      0|
|CASH_IN|  4.35|   4136277.22|    4136281.57|      0|
+-------+------+-------------+--------------+-------+
only showing top 2 rows



In this dataset, any column of type string is treated as a categorical feature, but sometimes we might have numeric features we want treated as categorical or vice versa. We’ll need to carefully identify which columns are numeric and which are categorical.

In [None]:
train.dtypes

[('type', 'string'),
 ('amount', 'double'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('isFraud', 'int')]

In [None]:
catCols = [x for (x, dataType) in train.dtypes if dataType == "string"]
numCols = [
    x for (x, dataType) in train.dtypes if ((dataType == "double") & (x != "isFraud"))
]

In [None]:
print(numCols)
print(catCols)

['amount', 'oldbalanceOrg', 'newbalanceOrig']
['type']


Now we perform one hot encoding to convert categorical column to numerical column.

StringIndexer:
Converts a single feature to an index feature.
http://spark.apache.org/docs/latest/ml-features#stringindexer


OneHotEncoder:
http://spark.apache.org/docs/latest/ml-features#onehotencoder

For more info: http://spark.apache.org/docs/latest/ml-features

In [None]:
train.agg(F.countDistinct("type")).show()

+-----------+
|count(type)|
+-----------+
|          5|
+-----------+



In [None]:
train.groupBy("type").count().show()

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 373084|
| CASH_IN| 979536|
|CASH_OUT|1566112|
| PAYMENT|1503731|
|   DEBIT|  29027|
+--------+-------+



In [None]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer

In [None]:
string_indexer = [
    StringIndexer(inputCol=x, outputCol=x + "_StringIndexer", handleInvalid="skip")
    for x in catCols
]

In [None]:
string_indexer

[StringIndexer_6c98e72780af]

In [None]:
one_hot_encoder = [
    OneHotEncoder(
        inputCols=[f"{x}_StringIndexer" for x in catCols],
        outputCols=[f"{x}_OneHotEncoder" for x in catCols],
    )
]

In [None]:
one_hot_encoder

[OneHotEncoder_22f6ae91a9cd]

Now we do Vector Assembly. VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees.

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
assemblerInput = [x for x in numCols]
assemblerInput += [f"{x}_OneHotEncoder" for x in catCols]

In [None]:
assemblerInput

['amount', 'oldbalanceOrg', 'newbalanceOrig', 'type_OneHotEncoder']

The input column will be the assembler input that has been one hot encoded and the output column will be the features. 

In [None]:
vector_assembler = VectorAssembler(inputCols=assemblerInput, outputCol="VectorAssembler_features")

In [None]:
stages = []
stages += string_indexer
stages += one_hot_encoder
stages += [vector_assembler]

In [None]:
stages

[StringIndexer_6c98e72780af,
 OneHotEncoder_22f6ae91a9cd,
 VectorAssembler_f735bab59364]

In [None]:
%%time
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages(stages)
model = pipeline.fit(train)

pp_df = model.transform(test)

CPU times: user 229 ms, sys: 26.3 ms, total: 255 ms
Wall time: 27.1 s


We can see how our features have been assembled in a vector in the last column. 

In [None]:
pp_df.select(
    "type", "amount", "oldbalanceOrg", "newbalanceOrig", "VectorAssembler_features",
).show(truncate=False)

+-------+------+-------------+--------------+---------------------------------------------------+
|type   |amount|oldbalanceOrg|newbalanceOrig|VectorAssembler_features                           |
+-------+------+-------------+--------------+---------------------------------------------------+
|CASH_IN|4.58  |94241.0      |94245.58      |[4.58,94241.0,94245.58,0.0,0.0,1.0,0.0]            |
|CASH_IN|5.44  |0.0          |5.44          |(7,[0,2,5],[5.44,5.44,1.0])                        |
|CASH_IN|6.07  |400680.0     |400686.07     |[6.07,400680.0,400686.07,0.0,0.0,1.0,0.0]          |
|CASH_IN|6.76  |11322.0      |11328.76      |[6.76,11322.0,11328.76,0.0,0.0,1.0,0.0]            |
|CASH_IN|8.27  |8428410.94   |8428419.21    |[8.27,8428410.94,8428419.21,0.0,0.0,1.0,0.0]       |
|CASH_IN|8.44  |39384.0      |39392.44      |[8.44,39384.0,39392.44,0.0,0.0,1.0,0.0]            |
|CASH_IN|9.04  |99971.0      |99980.04      |[9.04,99971.0,99980.04,0.0,0.0,1.0,0.0]            |
|CASH_IN|11.13 |4116

Now we train our model using Logistic Regression. 

In [None]:
from pyspark.ml.classification import LogisticRegression

Before going ahead, we will name our vector assembler column name as features as it is a pyspark convention. And we also need to label our target col name as 'label'. 

In [None]:
data = pp_df.select(
    F.col("VectorAssembler_features").alias("features"),
    F.col("isFraud").alias("label"),
)

In [None]:
data.show(5, truncate=False)

+--------------------------------------------+-----+
|features                                    |label|
+--------------------------------------------+-----+
|[4.58,94241.0,94245.58,0.0,0.0,1.0,0.0]     |0    |
|(7,[0,2,5],[5.44,5.44,1.0])                 |0    |
|[6.07,400680.0,400686.07,0.0,0.0,1.0,0.0]   |0    |
|[6.76,11322.0,11328.76,0.0,0.0,1.0,0.0]     |0    |
|[8.27,8428410.94,8428419.21,0.0,0.0,1.0,0.0]|0    |
+--------------------------------------------+-----+
only showing top 5 rows



Now we fit the model on our data. 

In [None]:
%%time
model = LogisticRegression().fit(data)

CPU times: user 491 ms, sys: 68.3 ms, total: 560 ms
Wall time: 1min 17s


In [None]:
model.summary.areaUnderROC

0.9932490712682276

Thus, we get an ROC of 0.99 which is excellent. 

In [None]:
model.summary.pr.show()

+-------------------+-------------------+
|             recall|          precision|
+-------------------+-------------------+
|                0.0| 0.9218585005279831|
|0.36089293096320796| 0.9218585005279831|
| 0.4795369987598181| 0.6728538283062645|
| 0.5527077304671352| 0.5345861655337865|
| 0.5990078544853246| 0.4420378279438682|
| 0.6366267052501033| 0.3797780517879161|
| 0.6709384042992972|  0.335885761589404|
| 0.6986357999173212| 0.3013014797646639|
| 0.7172385283174866|   0.27168806764798|
| 0.7391484084332369|0.24961608264693563|
| 0.7614716825134353|0.23198992443324937|
| 0.7759404712691195|0.21532637375243777|
| 0.7924762298470442|0.20191700021065936|
| 0.7945431996692849|0.18712880926881512|
|  0.798677139313766|0.17487328023171614|
| 0.8015708970649028|0.16397463002114165|
| 0.8057048367093841| 0.1546579907951119|
| 0.8081852004960728|0.14612452350698857|
| 0.8119057461761058| 0.1387299569117751|
| 0.8147995039272427|0.13198071514664525|
+-------------------+-------------



## Thank You !