In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

In [2]:
spark

In [3]:
import os
from os.path import isfile, join

loc = os.path.abspath("")
data_loc = f"{loc}/data/"

In [13]:
df = spark.read.csv(
    f"{data_loc}Data.csv", inferSchema=True, header=True
)

In [14]:
df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [15]:
df.show(2)

+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|   type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1|PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1|PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
only showing top 2 rows



In [16]:
df = df.select("type", "amount", "oldbalanceOrg", "newbalanceOrig", "isFraud")

In [17]:
df.show(2)

+-------+-------+-------------+--------------+-------+
|   type| amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+-------+-------------+--------------+-------+
|PAYMENT|9839.64|     170136.0|     160296.36|      0|
|PAYMENT|1864.28|      21249.0|      19384.72|      0|
+-------+-------+-------------+--------------+-------+
only showing top 2 rows



In [18]:
### Train/test split

In [19]:
train, test = df.randomSplit([0.7, 0.3], seed=7)

In [21]:
print(f"Train set length: {train.count()} records")
print(f"Test set length: {test.count()} records")

Train set length: 4451877 records
Test set length: 1910743 records


In [22]:
train.show(2)

+-------+------+-------------+--------------+-------+
|   type|amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+------+-------------+--------------+-------+
|CASH_IN|  1.42|   1270713.41|    1270714.83|      0|
|CASH_IN|  4.35|   4136277.22|    4136281.57|      0|
+-------+------+-------------+--------------+-------+
only showing top 2 rows



In [24]:
### Dtypes
#In this dataset, any column of type string is treated as a categorical feature, but sometimes we might have numeric features we want treated as categorical or vice versa. We’ll need to carefully identify which columns are numeric and which are categorical.

In [25]:
train.dtypes

[('type', 'string'),
 ('amount', 'double'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('isFraud', 'int')]

In [26]:
catCols = [x for (x, dataType) in train.dtypes if dataType == "string"]
numCols = [
    x for (x, dataType) in train.dtypes if ((dataType == "double") & (x != "isFraud"))
]

In [27]:
print(numCols)
print(catCols)

['amount', 'oldbalanceOrg', 'newbalanceOrig']
['type']


In [28]:
train.agg(F.countDistinct("type")).show()

+-----------+
|count(type)|
+-----------+
|          5|
+-----------+



In [48]:
train.groupBy("type").count().show()

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 372791|
| CASH_IN| 979384|
|CASH_OUT|1565928|
| PAYMENT|1504894|
|   DEBIT|  28880|
+--------+-------+



In [29]:
from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
)

In [30]:
string_indexer = [
    StringIndexer(inputCol=x, outputCol=x + "_StringIndexer", handleInvalid="skip")
    for x in catCols
]

In [31]:
string_indexer

[StringIndexer_dd31f098e76e]

In [32]:
one_hot_encoder = [
    OneHotEncoder(
        inputCols=[f"{x}_StringIndexer" for x in catCols],
        outputCols=[f"{x}_OneHotEncoder" for x in catCols],
    )
]

In [33]:
one_hot_encoder

[OneHotEncoder_1bc3c9a45e28]

In [34]:
from pyspark.ml.feature import VectorAssembler

In [35]:
assemblerInput = [x for x in numCols]
assemblerInput += [f"{x}_OneHotEncoder" for x in catCols]

In [36]:
assemblerInput

['amount', 'oldbalanceOrg', 'newbalanceOrig', 'type_OneHotEncoder']

In [37]:
vector_assembler = VectorAssembler(
    inputCols=assemblerInput, outputCol="VectorAssembler_features"
)

In [38]:
stages = []
stages += string_indexer
stages += one_hot_encoder
stages += [vector_assembler]

In [39]:
stages

[StringIndexer_dd31f098e76e,
 OneHotEncoder_1bc3c9a45e28,
 VectorAssembler_9ca799ad0e7e]

In [40]:
%%time
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages(stages)
model = pipeline.fit(train)

pp_df = model.transform(test)

Wall time: 46.4 s


In [41]:
pp_df.select(
    "type", "amount", "oldbalanceOrg", "newbalanceOrig", "VectorAssembler_features",
).show(truncate=False)

+-------+------+-------------+--------------+---------------------------------------------------+
|type   |amount|oldbalanceOrg|newbalanceOrig|VectorAssembler_features                           |
+-------+------+-------------+--------------+---------------------------------------------------+
|CASH_IN|4.58  |94241.0      |94245.58      |[4.58,94241.0,94245.58,0.0,0.0,1.0,0.0]            |
|CASH_IN|5.44  |0.0          |5.44          |(7,[0,2,5],[5.44,5.44,1.0])                        |
|CASH_IN|6.07  |400680.0     |400686.07     |[6.07,400680.0,400686.07,0.0,0.0,1.0,0.0]          |
|CASH_IN|6.76  |11322.0      |11328.76      |[6.76,11322.0,11328.76,0.0,0.0,1.0,0.0]            |
|CASH_IN|8.27  |8428410.94   |8428419.21    |[8.27,8428410.94,8428419.21,0.0,0.0,1.0,0.0]       |
|CASH_IN|8.44  |39384.0      |39392.44      |[8.44,39384.0,39392.44,0.0,0.0,1.0,0.0]            |
|CASH_IN|11.13 |4116439.74   |4116450.87    |[11.13,4116439.74,4116450.87,0.0,0.0,1.0,0.0]      |
|CASH_IN|12.79 |6017

In [42]:
from pyspark.ml.classification import LogisticRegression

In [43]:
data = pp_df.select(
    F.col("VectorAssembler_features").alias("features"),
    F.col("isFraud").alias("label"),
)

In [44]:
data.show(5, truncate=False)

+--------------------------------------------+-----+
|features                                    |label|
+--------------------------------------------+-----+
|[4.58,94241.0,94245.58,0.0,0.0,1.0,0.0]     |0    |
|(7,[0,2,5],[5.44,5.44,1.0])                 |0    |
|[6.07,400680.0,400686.07,0.0,0.0,1.0,0.0]   |0    |
|[6.76,11322.0,11328.76,0.0,0.0,1.0,0.0]     |0    |
|[8.27,8428410.94,8428419.21,0.0,0.0,1.0,0.0]|0    |
+--------------------------------------------+-----+
only showing top 5 rows



In [45]:
%%time
model = LogisticRegression().fit(data)

Wall time: 1min 56s


In [46]:
model.summary.areaUnderROC

0.9904808012683561

In [47]:
model.summary.pr.show()


+------------------+--------------------+
|            recall|           precision|
+------------------+--------------------+
|               0.0| 0.10827992288051691|
|0.8308676529388245| 0.10827992288051691|
|0.8936425429828069|0.058529303933378726|
|0.9392243102758896| 0.04108223442582812|
|0.9692123150739704| 0.03182355257975581|
|0.9948020791683326| 0.02614296672235707|
|0.9948020791683326| 0.02178710287575747|
|0.9948020791683326|0.018674472716355174|
|0.9948020791683326|0.016340255612037148|
| 0.995201919232307|0.014530912487594139|
| 0.995201919232307|0.013077836508653755|
|0.9956017592962815|0.011893218954638594|
|0.9956017592962815|0.010901878713315615|
|0.9956017592962815|0.010061906994035592|
|0.9956017592962815|0.009342989006041049|
|0.9956017592962815|0.008720166698769021|
|0.9956017592962815|0.008174977346448318|
|0.9956017592962815|0.007693876415951352|
|0.9960015993602559|0.007269045131694904|
|0.9960015993602559|0.006886523038474...|
+------------------+--------------