In [1]:
#Kaggle link to used dataset- https://www.kaggle.com/ntnu-testimon/paysim1

%reload_ext nb_black
import pandas as pd
import findspark

findspark.init()
import pyspark

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()
spark

<IPython.core.display.Javascript object>

In [2]:
import os
from os.path import isfile, join

loc = os.path.abspath("")
data_loc = f"{loc}/data"

<IPython.core.display.Javascript object>

In [3]:
fin = spark.read.csv(
    r"C:/Users/mahaj/Downloads/Finance.csv", inferSchema=True, header=True
)
fin.show(truncate=False)

+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|type    |amount   |nameOrig   |oldbalanceOrg|newbalanceOrig|nameDest   |oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|1   |PAYMENT |9839.64  |C1231006815|170136.0     |160296.36     |M1979787155|0.0           |0.0           |0      |0             |
|1   |PAYMENT |1864.28  |C1666544295|21249.0      |19384.72      |M2044282225|0.0           |0.0           |0      |0             |
|1   |TRANSFER|181.0    |C1305486145|181.0        |0.0           |C553264065 |0.0           |0.0           |1      |0             |
|1   |CASH_OUT|181.0    |C840083671 |181.0        |0.0           |C38997010  |21182.0       |0.0           |1      |0             |
|1   |PAYMENT |11668.14 |C2048537720|41554.0      |29885.86      |M123070170

<IPython.core.display.Javascript object>

In [4]:
fin = fin.select("type", "amount", "oldbalanceOrg", "newbalanceOrig", "isFraud")
fin.show(truncate=False)

+--------+---------+-------------+--------------+-------+
|type    |amount   |oldbalanceOrg|newbalanceOrig|isFraud|
+--------+---------+-------------+--------------+-------+
|PAYMENT |9839.64  |170136.0     |160296.36     |0      |
|PAYMENT |1864.28  |21249.0      |19384.72      |0      |
|TRANSFER|181.0    |181.0        |0.0           |1      |
|CASH_OUT|181.0    |181.0        |0.0           |1      |
|PAYMENT |11668.14 |41554.0      |29885.86      |0      |
|PAYMENT |7817.71  |53860.0      |46042.29      |0      |
|PAYMENT |7107.77  |183195.0     |176087.23     |0      |
|PAYMENT |7861.64  |176087.23    |168225.59     |0      |
|PAYMENT |4024.36  |2671.0       |0.0           |0      |
|DEBIT   |5337.77  |41720.0      |36382.23      |0      |
|DEBIT   |9644.94  |4465.0       |0.0           |0      |
|PAYMENT |3099.97  |20771.0      |17671.03      |0      |
|PAYMENT |2560.74  |5070.0       |2509.26       |0      |
|PAYMENT |11633.76 |10127.0      |0.0           |0      |
|PAYMENT |4098

<IPython.core.display.Javascript object>

In [5]:
type(fin)

pyspark.sql.dataframe.DataFrame

<IPython.core.display.Javascript object>

# Train and Test Splitting

In [6]:
train, test = fin.randomSplit(
    [0.7, 0.3], seed=7
)  # no sequencing in dataset so the random splitting is done

<IPython.core.display.Javascript object>

In [7]:
print(f"Train set length is: {train.count()}")
print(f"Test set length is: {test.count()}")

train.show(n=3, truncate=False)

Train set length is: 4452488
Test set length is: 1910132
+-------+------+-------------+--------------+-------+
|type   |amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+------+-------------+--------------+-------+
|CASH_IN|5.44  |0.0          |5.44          |0      |
|CASH_IN|5.66  |5061561.06   |5061566.72    |0      |
|CASH_IN|9.02  |2416260.59   |2416269.61    |0      |
+-------+------+-------------+--------------+-------+
only showing top 3 rows



<IPython.core.display.Javascript object>

In [8]:
train.dtypes  # checking type of features

[('type', 'string'),
 ('amount', 'double'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('isFraud', 'int')]

<IPython.core.display.Javascript object>

In [9]:
# we add features with categorical values in categorical list & similar for numerical values through list comprehension.
categorical = [x for (x, dataType) in train.dtypes if dataType == "string"]
numerical = [
    x for (x, dataType) in train.dtypes if ((dataType == "double") & (x != "isFraud"))
]

print(categorical)
print(numerical)

['type']
['amount', 'oldbalanceOrg', 'newbalanceOrig']


<IPython.core.display.Javascript object>

In [10]:
train.agg(F.countDistinct("type")).show()  # used to check number of categories

+-----------+
|count(type)|
+-----------+
|          5|
+-----------+



<IPython.core.display.Javascript object>

In [11]:
train.groupBy("type").count().show(
    truncate=False
)  # grouping of type column to show total number of values each type has.

+--------+-------+
|type    |count  |
+--------+-------+
|TRANSFER|373168 |
|CASH_IN |978910 |
|CASH_OUT|1566234|
|PAYMENT |1505050|
|DEBIT   |29126  |
+--------+-------+



<IPython.core.display.Javascript object>

# One_hot_encoding and string_indexing

In [12]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

<IPython.core.display.Javascript object>

In [13]:
str_indexer= [StringIndexer(inputCol=x, outputCol=x + "_StringIndexer", handleInvalid="skip") for x in categorical] #list comprehension for each column in categoricl column


<IPython.core.display.Javascript object>

In [14]:
str_indexer

[StringIndexer_15181af3d473]

<IPython.core.display.Javascript object>

In [15]:
one_hot_enc = [
    OneHotEncoder(
        inputCols=[
            f"{x}_StringIndexer" for x in categorical
        ],  # input categorical vars as binary vectors in inputCols & output them in outputCols
        outputCols=[f"{x}_OneHotEncoder" for x in categorical],
    )
]

<IPython.core.display.Javascript object>

In [16]:
one_hot_enc

[OneHotEncoder_580482c2992c]

<IPython.core.display.Javascript object>

# Vector assembling 

In [17]:
from pyspark.ml.feature import (
    VectorAssembler,
)  # used to combine values of input columns into one vector.

<IPython.core.display.Javascript object>

In [18]:
assemblerin = [x for x in numerical]
assemblerin = assemblerin + [f"{x}_OneHotEncoder" for x in categorical]
assemblerin

['amount', 'oldbalanceOrg', 'newbalanceOrig', 'type_OneHotEncoder']

<IPython.core.display.Javascript object>

In [19]:
vec_assembler = VectorAssembler(  # collects all input, output columns into one vector
    inputCols=assemblerin,
    outputCol="VectorAssembler_features",  # VectorAssembler.features is a column containing all input features as vec type
)

<IPython.core.display.Javascript object>

In [20]:
# creating pipeline skeleton
# pipeline is used for creating a priority list of preprocessing instructions.

steps = []
steps = steps + str_indexer
steps = steps + one_hot_enc
steps = steps + [vec_assembler]

steps

[StringIndexer_15181af3d473,
 OneHotEncoder_580482c2992c,
 VectorAssembler_5a55ab537920]

<IPython.core.display.Javascript object>

In [21]:
from pyspark.ml import Pipeline

pl = Pipeline().setStages(steps)
model = pl.fit(
    train
)  # fitting the model onto whole training dataset and then applying it to test dataset
plout = model.transform(test)

<IPython.core.display.Javascript object>

In [22]:
plout.select(
    "type",
    "amount",
    "oldbalanceOrg",
    "newbalanceOrig",
    "VectorAssembler_features",
).show(truncate=False)


+-------+------+-------------+--------------+--------------------------------------------------+
|type   |amount|oldbalanceOrg|newbalanceOrig|VectorAssembler_features                          |
+-------+------+-------------+--------------+--------------------------------------------------+
|CASH_IN|8.44  |39384.0      |39392.44      |[8.44,39384.0,39392.44,0.0,0.0,1.0,0.0]           |
|CASH_IN|12.79 |601743.0     |601755.79     |[12.79,601743.0,601755.79,0.0,0.0,1.0,0.0]        |
|CASH_IN|14.4  |1.143460813E7|1.143462253E7 |[14.4,1.143460813E7,1.143462253E7,0.0,0.0,1.0,0.0]|
|CASH_IN|15.52 |4368030.06   |4368045.59    |[15.52,4368030.06,4368045.59,0.0,0.0,1.0,0.0]     |
|CASH_IN|15.59 |1.64294897E7 |1.642950528E7 |[15.59,1.64294897E7,1.642950528E7,0.0,0.0,1.0,0.0]|
|CASH_IN|18.53 |951352.78    |951371.31     |[18.53,951352.78,951371.31,0.0,0.0,1.0,0.0]       |
|CASH_IN|21.15 |2729078.29   |2729099.44    |[21.15,2729078.29,2729099.44,0.0,0.0,1.0,0.0]     |
|CASH_IN|22.67 |405940.0     |

<IPython.core.display.Javascript object>

# Train Logistic Regression Model 

In [27]:
from pyspark.ml.classification import LogisticRegression

<IPython.core.display.Javascript object>

In [28]:
available_data = plout.select(
    F.col("VectorAssembler_features").alias("features"),
    F.col("isFraud").alias(
        "label"
    ),  # renaming VectorAssembler_features to features & isFraud to lable, this renaming is mandatory in spark
)

<IPython.core.display.Javascript object>

In [29]:
available_data.show(n=10, truncate=False)

+--------------------------------------------------+-----+
|features                                          |label|
+--------------------------------------------------+-----+
|[8.44,39384.0,39392.44,0.0,0.0,1.0,0.0]           |0    |
|[12.79,601743.0,601755.79,0.0,0.0,1.0,0.0]        |0    |
|[14.4,1.143460813E7,1.143462253E7,0.0,0.0,1.0,0.0]|0    |
|[15.52,4368030.06,4368045.59,0.0,0.0,1.0,0.0]     |0    |
|[15.59,1.64294897E7,1.642950528E7,0.0,0.0,1.0,0.0]|0    |
|[18.53,951352.78,951371.31,0.0,0.0,1.0,0.0]       |0    |
|[21.15,2729078.29,2729099.44,0.0,0.0,1.0,0.0]     |0    |
|[22.67,405940.0,405962.67,0.0,0.0,1.0,0.0]        |0    |
|[25.01,3077402.04,3077427.04,0.0,0.0,1.0,0.0]     |0    |
|[34.05,90282.0,90316.05,0.0,0.0,1.0,0.0]          |0    |
+--------------------------------------------------+-----+
only showing top 10 rows



<IPython.core.display.Javascript object>

In [31]:
model= LogisticRegression().fit(available_data) #applying Log_Reg model to our data 


<IPython.core.display.Javascript object>

In [32]:
model.summary.areaUnderROC #Roc is the curve of the graph that shows binary performance of classification models by
                           #plotting tru positive & false positive values

0.9936997607170124

<IPython.core.display.Javascript object>

In [34]:
model.summary.pr.show()  # recall= TP/(TP+FN), precision= TP/(TP+FP)

+-------------------+-------------------+
|             recall|          precision|
+-------------------+-------------------+
|                0.0| 0.9299691040164778|
|0.37562396006655574| 0.9299691040164778|
| 0.4937603993344426| 0.6755833807626637|
| 0.5603161397670549| 0.5296893432953205|
| 0.6027454242928453|0.43526584559927906|
| 0.6335274542429284| 0.3701093560145808|
| 0.6655574043261231| 0.3264639869414405|
| 0.6909317803660566|0.29206963249516443|
| 0.7167221297836939|0.26618260466553373|
| 0.7391846921797005|0.24479955916792948|
| 0.7591514143094842|0.22684897451833436|
| 0.7778702163061564| 0.2117540482391575|
| 0.7915973377703827|0.19787875636893001|
| 0.8107321131447587| 0.1873498029414592|
| 0.8157237936772047|0.17526141746358032|
| 0.8190515806988353| 0.1644258872651357|
| 0.8219633943427621| 0.1548467988402163|
| 0.8244592346089851|0.14630545508230605|
| 0.8290349417637272| 0.1390400446490861|
| 0.8306988352745425|0.13207671957671957|
+-------------------+-------------

<IPython.core.display.Javascript object>

In [None]:
# Ashutosh Mahajan, M.Sc. Web and Data Science, University of Koblenz-Landau ,Germany.