# Fetching data

In [1]:
!pip install pyspark



In [1]:
import pyspark
import pandas as pd

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.appName("Fraud").getOrCreate()

In [6]:
df = spark.read.csv("transactiondata.csv", header=True, inferSchema=True)
df.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

# Data Preprocessing

In [4]:
df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [8]:
df = df.select("type", "amount", "oldbalanceOrg", "newbalanceOrig", "isFraud")
df.show(2)

+-------+-------+-------------+--------------+-------+
|   type| amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+-------+-------------+--------------+-------+
|PAYMENT|9839.64|     170136.0|     160296.36|      0|
|PAYMENT|1864.28|      21249.0|      19384.72|      0|
+-------+-------+-------------+--------------+-------+
only showing top 2 rows



# Train/test split

In [12]:
train, test = df.randomSplit([0.7, 0.3], seed=7)

In [13]:
print(f"Train set length: {train.count()} records")
print(f"Test set length: {test.count()} records")

Train set length: 4454328 records
Test set length: 1908292 records


In [14]:
train.show(5)

+-------+------+-------------+--------------+-------+
|   type|amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+------+-------------+--------------+-------+
|CASH_IN|  5.66|   5061561.06|    5061566.72|      0|
|CASH_IN| 13.86|   6868100.18|    6868114.04|      0|
|CASH_IN| 14.54|    3347286.5|    3347301.03|      0|
|CASH_IN| 15.52|   4368030.06|    4368045.59|      0|
|CASH_IN| 17.33|   8964056.72|    8964074.05|      0|
+-------+------+-------------+--------------+-------+
only showing top 5 rows



# Datatypes
##### In this dataset, any column of type string is treated as a categorical feature, but sometimes we might have numeric features we want treated as categorical or vice versa. We'll need to carefully identify which columns are numeric and which are categorical.

In [15]:
train.dtypes

[('type', 'string'),
 ('amount', 'double'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('isFraud', 'int')]

In [18]:
catCols = [x for (x, dataType) in train.dtypes if dataType == "string"]
numCols = [x for (x, dataType) in train.dtypes if ((dataType == "double") and (x != "isFraud"))]
print(catCols)
print(numCols)

['type']
['amount', 'oldbalanceOrg', 'newbalanceOrig']


# One hot encoding
###### StringIndexer: Converts a single feature to an index feature
###### OneHotEncoder: Maps a column of category indices to a column of binary vectors

In [19]:
train.agg(F.countDistinct("type")).show()

+--------------------+
|count(DISTINCT type)|
+--------------------+
|                   5|
+--------------------+



In [20]:
train.groupBy("type").count().show()

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 372938|
| CASH_IN| 979390|
|CASH_OUT|1565416|
| PAYMENT|1507530|
|   DEBIT|  29054|
+--------+-------+



In [23]:
from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
)

In [26]:
string_indexer = [
    StringIndexer(inputCol = x, outputCol = x + "_StringIndexer", handleInvalid="skip")
    for x in catCols
]

In [27]:
string_indexer

[StringIndexer_477bdfba1eb1]

In [30]:
one_hot_encoder = [
    OneHotEncoder(
        inputCols = [f"{x}_StringIndexer" for x in catCols],
        outputCols=[f"{x}_OneHotEncoder" for x in catCols]
    )
]

In [31]:
one_hot_encoder

[OneHotEncoder_b7f0fe948eed]

# Vector Assembling
###### Vector Assembler: Combines the values of input columns into a single vector

In [35]:
from pyspark.ml.feature import VectorAssembler

In [36]:
assemblerInput = [x for x in numCols]
assemblerInput += [f"{x}_OneHotEncoder" for x in catCols]
assemblerInput

['amount', 'oldbalanceOrg', 'newbalanceOrig', 'type_OneHotEncoder']

In [38]:
vector_assembler = VectorAssembler(
    inputCols = assemblerInput, outputCol="VectorAssembler_features"
)

In [41]:
stages = []
stages += string_indexer
stages += one_hot_encoder
stages += [vector_assembler]

In [42]:
stages

[StringIndexer_477bdfba1eb1,
 OneHotEncoder_b7f0fe948eed,
 VectorAssembler_8e7f278a1112]

In [46]:
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages(stages)
model = pipeline.fit(train)

pp_df = model.transform(test)

In [50]:
pp_df.select(
    "type", "amount", "oldbalanceOrg", "newbalanceOrig", "VectorAssembler_features"
).show(5, truncate=False)

+-------+------+-------------+--------------+--------------------------------------------------+
|type   |amount|oldbalanceOrg|newbalanceOrig|VectorAssembler_features                          |
+-------+------+-------------+--------------+--------------------------------------------------+
|CASH_IN|14.4  |1.143460813E7|1.143462253E7 |[14.4,1.143460813E7,1.143462253E7,0.0,0.0,1.0,0.0]|
|CASH_IN|15.59 |1.64294897E7 |1.642950528E7 |[15.59,1.64294897E7,1.642950528E7,0.0,0.0,1.0,0.0]|
|CASH_IN|21.15 |2729078.29   |2729099.44    |[21.15,2729078.29,2729099.44,0.0,0.0,1.0,0.0]     |
|CASH_IN|35.47 |3796691.21   |3796726.68    |[35.47,3796691.21,3796726.68,0.0,0.0,1.0,0.0]     |
|CASH_IN|37.11 |1452790.24   |1452827.35    |[37.11,1452790.24,1452827.35,0.0,0.0,1.0,0.0]     |
+-------+------+-------------+--------------+--------------------------------------------------+
only showing top 5 rows



# Logistic Regression

In [51]:
from pyspark.ml.classification import LogisticRegression

In [52]:
data = pp_df.select(
    F.col("VectorAssembler_features").alias("features"),
    F.col("isFraud").alias("label")
)

In [53]:
data.show(5, truncate=False)

+--------------------------------------------------+-----+
|features                                          |label|
+--------------------------------------------------+-----+
|[14.4,1.143460813E7,1.143462253E7,0.0,0.0,1.0,0.0]|0    |
|[15.59,1.64294897E7,1.642950528E7,0.0,0.0,1.0,0.0]|0    |
|[21.15,2729078.29,2729099.44,0.0,0.0,1.0,0.0]     |0    |
|[35.47,3796691.21,3796726.68,0.0,0.0,1.0,0.0]     |0    |
|[37.11,1452790.24,1452827.35,0.0,0.0,1.0,0.0]     |0    |
+--------------------------------------------------+-----+
only showing top 5 rows



In [54]:
model = LogisticRegression().fit(data)

In [55]:
model.summary.areaUnderROC

0.9910301298957835

In [57]:
model.summary.pr.show()

+-------------------+-------------------+
|             recall|          precision|
+-------------------+-------------------+
|                0.0| 0.9109311740890689|
| 0.3654080389768575| 0.9109311740890689|
|0.47381242387332523| 0.6476137624861266|
|  0.535931790499391| 0.5045871559633027|
| 0.5826228177019894|0.41836734693877553|
| 0.6175395858708892|0.35838831291234685|
| 0.6443361753958587| 0.3137603795966785|
| 0.6776289078359724| 0.2842302452316076|
| 0.6975233455136013| 0.2569548309901286|
| 0.7170117742590337|0.23546666666666666|
| 0.7344701583434835| 0.2175847967284099|
| 0.7458384084449858|0.20124890446976337|
| 0.7600487210718636|0.18829209414604708|
| 0.7722289890377588|0.17683153588694683|
| 0.7864393016646366|0.16741573033707866|
| 0.7978075517661388|0.15867248062015504|
| 0.8079577750710516| 0.1507804212759509|
| 0.8103938286642306| 0.1424493291464459|
| 0.8136419001218027|0.13516794819910968|
| 0.8168899715793747| 0.1286445012787724|
+-------------------+-------------

In [58]:
model = LogisticRegression().fit(data)

In [59]:
model.summary.areaUnderROC

0.9910302156425086

In [60]:
model.summary.pr.show()

+-------------------+-------------------+
|             recall|          precision|
+-------------------+-------------------+
|                0.0| 0.9109311740890689|
| 0.3654080389768575| 0.9109311740890689|
|0.47381242387332523| 0.6476137624861266|
|  0.535931790499391| 0.5045871559633027|
| 0.5826228177019894|0.41836734693877553|
| 0.6175395858708892|0.35838831291234685|
| 0.6443361753958587| 0.3137603795966785|
| 0.6776289078359724| 0.2842302452316076|
| 0.6975233455136013| 0.2569548309901286|
| 0.7170117742590337|0.23546666666666666|
| 0.7344701583434835| 0.2175847967284099|
| 0.7458384084449858|0.20124890446976337|
| 0.7600487210718636|0.18829209414604708|
| 0.7722289890377588|0.17683153588694683|
| 0.7864393016646366|0.16741573033707866|
| 0.7978075517661388|0.15867248062015504|
| 0.8079577750710516| 0.1507804212759509|
| 0.8103938286642306| 0.1424493291464459|
| 0.8136419001218027|0.13516794819910968|
| 0.8168899715793747| 0.1286445012787724|
+-------------------+-------------