### **PySpark MLlib Logistic Regression  - Fraud detection**

Dataset link: https://www.kaggle.com/datasets/ealaxi/paysim1

**Variables**

step - maps a unit of time in the real world. In this case 1 step is 1 hour of time.

type - CASH-IN, CASH-OUT, DEBIT, PAYMENT and TRANSFER.

amount -
amount of the transaction in local currency.

nameOrig - customer who started the transaction

oldbalanceOrg - initial balance before the transaction

newbalanceOrig - new balance after the transaction

nameDest - customer who is the recipient of the transaction

oldbalanceDest - initial balance recipient before the transaction. Note that there is not information for customers that start with M (Merchants).

newbalanceDest - new balance recipient after the transaction. Note that there is not information for customers that start with M (Merchants).

isFraud - This is the transactions made by the fraudulent agents inside the simulation. In this specific dataset the fraudulent behavior of the agents aims to profit by taking control or customers accounts and try to empty the funds by transferring to another account and then cashing out of the system.

isFlaggedFraud - The business model aims to control massive transfers from one account to another and flags illegal attempts. An illegal attempt in this dataset is an attempt to transfer more than 200.000 in a single transaction.

**Import libraries, packages, modules**

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import (OneHotEncoder, StringIndexer)
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import col,isnan, when, count

spark=SparkSession.builder.getOrCreate()

import os
from os.path import isfile, join

In [None]:
# Import dataset

df=spark.read.csv('/content/drive/MyDrive/Colab Notebooks/Spark/PySpark_MLib_Fraud_detection_project/PS_20174392719_1491204439457_log.csv', inferSchema=True, header=True)

In [None]:
# Check the schema

df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [None]:
# Print 5 rows of the dataset

df.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

In [None]:
# We will select only the userfull variables (eliminate unique identifiers and high cardinality variables, for example, plus keep the variables related to the initiators of the transactions, not the receipients).

df=df.select('type', 'amount', 'oldbalanceOrg', 'newbalanceOrig', 'isFraud')

In [None]:
df.show(5)

+--------+--------+-------------+--------------+-------+
|    type|  amount|oldbalanceOrg|newbalanceOrig|isFraud|
+--------+--------+-------------+--------------+-------+
| PAYMENT| 9839.64|     170136.0|     160296.36|      0|
| PAYMENT| 1864.28|      21249.0|      19384.72|      0|
|TRANSFER|   181.0|        181.0|           0.0|      1|
|CASH_OUT|   181.0|        181.0|           0.0|      1|
| PAYMENT|11668.14|      41554.0|      29885.86|      0|
+--------+--------+-------------+--------------+-------+
only showing top 5 rows



In [None]:
# Check for null values
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

+----+------+-------------+--------------+-------+
|type|amount|oldbalanceOrg|newbalanceOrig|isFraud|
+----+------+-------------+--------------+-------+
|   0|     0|            0|             0|      0|
+----+------+-------------+--------------+-------+



**Train/Test split**

In [None]:
train, test=df.randomSplit([0.7, 0.3], seed=7)

In [None]:
# Records in the train dataset
train.count()

191757

In [None]:
# Records in the test dataset
test.count()

82300

**Check data types**

In [None]:
train.dtypes

[('type', 'string'),
 ('amount', 'double'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('isFraud', 'int')]

In [None]:
# Identify and assign to new dimension categorical and numerical values

catCols=[x for (x, dataType) in train.dtypes if dataType=='string']
numCols=[x for (x, dataType) in train.dtypes if ((dataType=='double') & (x !='isFraud'))]

In [None]:
# Check new variables
print(numCols)
print(catCols)

['amount', 'oldbalanceOrg', 'newbalanceOrig']
['type']


**OneHot Encoding**

In [None]:
# Count the number of distinct value for the 'Type' variable.
train.agg(F.countDistinct('type')).show()

+-----------+
|count(type)|
+-----------+
|          5|
+-----------+



In [None]:
# How many elements per each type.
train.groupBy('type').count().show()

+--------+-----+
|    type|count|
+--------+-----+
|TRANSFER|16097|
| CASH_IN|40414|
|CASH_OUT|66598|
| PAYMENT|67187|
|   DEBIT| 1461|
+--------+-----+



In [None]:
# Create a list comprehension for each column in each categorical variable
string_indexer=[
        StringIndexer(inputCol=x, outputCol=x +"_StringIndexer", handleInvalid='skip')
        for x in catCols
]

In [None]:
string_indexer

[StringIndexer_ed3d6a60f501]

In [None]:
# One Hot Encode all the categorical variables

one_hot_encoder=[
      OneHotEncoder(
          inputCols=[f"{x}_StringIndexer" for x in catCols],
          outputCols=[f"{x}_OneHotEncoder" for x in catCols]
      )
]

In [None]:
one_hot_encoder

[OneHotEncoder_7cb1d9e9d520]

# **Vector assembling**

In Spark, you need to transform all of your features to a vector of all the features you want to add in the machine learning model - we will do this with VectorAssembler.

In [None]:
assemblerInput=[x for x in numCols]   # The numeric values are already in the same range type so there is no need to transform them.
assemblerInput += [f"{x}_OneHotEncoder" for x in catCols]

In [None]:
assemblerInput

['amount', 'oldbalanceOrg', 'newbalanceOrig', 'type_OneHotEncoder']

In [None]:
# Assemble the vector by assigning the input and output column.

vector_assembler=VectorAssembler(
    inputCols=assemblerInput, outputCol='VectorAssembler_features', handleInvalid="skip"
)

In [None]:
# Create a pipeline specifying the sequence of stages for the model.

stages = []
stages += string_indexer
stages += one_hot_encoder
stages += [vector_assembler] #The vector assemble as a list.

In [None]:
stages

[StringIndexer_ed3d6a60f501,
 OneHotEncoder_7cb1d9e9d520,
 VectorAssembler_c084856ee0ab]

In [None]:
%%time

# Set the stages and fit it on the train set.
pipeline=Pipeline().setStages(stages)

model=pipeline.fit(train)
train_df=model.transform(train)

test_df=model.transform(test)

CPU times: user 83.2 ms, sys: 10.5 ms, total: 93.7 ms
Wall time: 4.65 s


In [None]:
train_df.select("type", "amount", "oldbalanceOrg", "newbalanceOrig", "VectorAssembler_features").show(truncate=False)

+-------+------+-------------+--------------+----------------------------------------------------+
|type   |amount|oldbalanceOrg|newbalanceOrig|VectorAssembler_features                            |
+-------+------+-------------+--------------+----------------------------------------------------+
|CASH_IN|5.66  |5061561.06   |5061566.72    |[5.66,5061561.06,5061566.72,0.0,0.0,1.0,0.0]        |
|CASH_IN|38.77 |6210013.72   |6210052.49    |[38.77,6210013.72,6210052.49,0.0,0.0,1.0,0.0]       |
|CASH_IN|57.98 |9021204.76   |9021262.74    |[57.98,9021204.76,9021262.74,0.0,0.0,1.0,0.0]       |
|CASH_IN|71.85 |1.688345136E7|1.688352321E7 |[71.85,1.688345136E7,1.688352321E7,0.0,0.0,1.0,0.0] |
|CASH_IN|85.96 |30214.0      |30299.96      |[85.96,30214.0,30299.96,0.0,0.0,1.0,0.0]            |
|CASH_IN|122.81|60535.02     |60657.83      |[122.81,60535.02,60657.83,0.0,0.0,1.0,0.0]          |
|CASH_IN|205.13|2.476159427E7|2.476179941E7 |[205.13,2.476159427E7,2.476179941E7,0.0,0.0,1.0,0.0]|
|CASH_IN|2

In [None]:
test_df.select("type", "amount", "oldbalanceOrg", "newbalanceOrig", "VectorAssembler_features").show(truncate=False)

+-------+------+-------------+--------------+---------------------------------------------------+
|type   |amount|oldbalanceOrg|newbalanceOrig|VectorAssembler_features                           |
+-------+------+-------------+--------------+---------------------------------------------------+
|CASH_IN|57.98 |1290788.6    |1290846.57    |[57.98,1290788.6,1290846.57,0.0,0.0,1.0,0.0]       |
|CASH_IN|82.2  |66896.0      |66978.2       |[82.2,66896.0,66978.2,0.0,0.0,1.0,0.0]             |
|CASH_IN|103.88|9419793.26   |9419897.14    |[103.88,9419793.26,9419897.14,0.0,0.0,1.0,0.0]     |
|CASH_IN|135.45|1417271.59   |1417407.04    |[135.45,1417271.59,1417407.04,0.0,0.0,1.0,0.0]     |
|CASH_IN|160.19|5019.0       |5179.19       |[160.19,5019.0,5179.19,0.0,0.0,1.0,0.0]            |
|CASH_IN|208.75|762582.92    |762791.68     |[208.75,762582.92,762791.68,0.0,0.0,1.0,0.0]       |
|CASH_IN|214.49|215.0        |429.49        |[214.49,215.0,429.49,0.0,0.0,1.0,0.0]              |
|CASH_IN|236.51|1004

In [None]:
train_df.head(1)

[Row(type='CASH_IN', amount=5.66, oldbalanceOrg=5061561.06, newbalanceOrig=5061566.72, isFraud=0, type_StringIndexer=2.0, type_OneHotEncoder=SparseVector(4, {2: 1.0}), VectorAssembler_features=DenseVector([5.66, 5061561.06, 5061566.72, 0.0, 0.0, 1.0, 0.0]))]

In [None]:
train_df.describe()

DataFrame[summary: string, type: string, amount: string, oldbalanceOrg: string, newbalanceOrig: string, isFraud: string, type_StringIndexer: string]

**Logistic Regression**

In [None]:
# Spark requires a vector type column named features and a target column called label.

data=train_df.select(
    F.col('VectorAssembler_features').alias('features'),
    F.col('isFraud').alias('label')
)

In [None]:
data.show(5, truncate=False)

+---------------------------------------------------+-----+
|features                                           |label|
+---------------------------------------------------+-----+
|[5.66,5061561.06,5061566.72,0.0,0.0,1.0,0.0]       |0    |
|[38.77,6210013.72,6210052.49,0.0,0.0,1.0,0.0]      |0    |
|[57.98,9021204.76,9021262.74,0.0,0.0,1.0,0.0]      |0    |
|[71.85,1.688345136E7,1.688352321E7,0.0,0.0,1.0,0.0]|0    |
|[85.96,30214.0,30299.96,0.0,0.0,1.0,0.0]           |0    |
+---------------------------------------------------+-----+
only showing top 5 rows



In [None]:
# Fit the data with the Logistic regression model.

model=LogisticRegression().fit(data)

In [None]:
# Area under the curve 
model.summary.areaUnderROC

0.9811424456377097

In [None]:
# Precision and recall
model.summary.pr.show()

+------+-------------------+
|recall|          precision|
+------+-------------------+
|   0.0| 0.5714285714285714|
|  0.32| 0.5714285714285714|
| 0.512|0.45714285714285713|
| 0.608| 0.3619047619047619|
| 0.616|              0.275|
| 0.616|               0.22|
| 0.616|0.18333333333333332|
| 0.616|0.15714285714285714|
| 0.616|             0.1375|
| 0.616|0.12222222222222222|
| 0.624|0.11142857142857143|
| 0.632| 0.1025974025974026|
|  0.64|0.09523809523809523|
| 0.648|0.08901098901098901|
| 0.648| 0.0826530612244898|
| 0.656|0.07809523809523809|
| 0.672|              0.075|
| 0.672|0.07058823529411765|
| 0.672|0.06666666666666667|
|  0.68|0.06390977443609022|
+------+-------------------+
only showing top 20 rows

