In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

In [2]:
spark

In [9]:
import os
from os.path import isfile, join

loc = os.path.abspath("D:\\")
data_loc = f"{loc}\\train_data\\"

In [11]:
df = spark.read.csv(
    f"{data_loc}Synthetic Financial Data.csv",inferSchema=True, header=True
)

In [12]:
df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)
 |-- isFraud: integer (nullable = true)



In [13]:
df.show(3)

+----+-------+---------+----------+-------------+--------------+-----------+--------------+--------------+--------------+-------+
|step|   type|   amount|  nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFlaggedFraud|isFraud|
+----+-------+---------+----------+-------------+--------------+-----------+--------------+--------------+--------------+-------+
| 619|CASH_IN|386385.08|C421351828|   4669568.85|    5055953.92|C1977099364|     506035.06|     119649.98|             0|      0|
| 164|CASH_IN|212458.78| C83569848|     234635.0|     447093.78|C1690589535|     806037.88|      593579.1|             0|      0|
| 382|PAYMENT|  19967.6|C852995095|       3634.0|           0.0|M1695416333|           0.0|           0.0|             0|      0|
+----+-------+---------+----------+-------------+--------------+-----------+--------------+--------------+--------------+-------+
only showing top 3 rows



In [15]:
df = df.select("type", "amount", "oldbalanceOrg", "newbalanceOrig", "isFraud")

In [16]:
df.show(4)

+--------+---------+-------------+--------------+-------+
|    type|   amount|oldbalanceOrg|newbalanceOrig|isFraud|
+--------+---------+-------------+--------------+-------+
| CASH_IN|386385.08|   4669568.85|    5055953.92|      0|
| CASH_IN|212458.78|     234635.0|     447093.78|      0|
| PAYMENT|  19967.6|       3634.0|           0.0|      0|
|CASH_OUT|527616.51|     180216.0|           0.0|      0|
+--------+---------+-------------+--------------+-------+
only showing top 4 rows



## Train/Test Split

In [17]:
train, test = df.randomSplit([.7, .3], seed = 7)

In [18]:
print(f"Train set length: {train.count()} records")
print(f"Test set length: {test.count()} records")

Train set length: 3561695 records
Test set length: 1528401 records


In [19]:
train.show(3)

+-------+------+-------------+--------------+-------+
|   type|amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+------+-------------+--------------+-------+
|CASH_IN|  5.28|   8752959.06|    8752964.34|      0|
|CASH_IN|  9.02|   2416260.59|    2416269.61|      0|
|CASH_IN|  10.3|      58585.0|       58595.3|      0|
+-------+------+-------------+--------------+-------+
only showing top 3 rows



### Data Types

In [20]:
train.dtypes

[('type', 'string'),
 ('amount', 'double'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('isFraud', 'int')]

In [21]:
catCols = [x for (x, dataType) in train.dtypes if dataType == "string"]
numCols = [
    x for (x, dataType) in train.dtypes if ((dataType == "double") & (x != "isFraud"))
]

In [22]:
print(numCols)
print(catCols)

['amount', 'oldbalanceOrg', 'newbalanceOrig']
['type']


## One hot encoding

In [23]:
train.agg(F.countDistinct("type")).show()

+--------------------+
|count(DISTINCT type)|
+--------------------+
|                   5|
+--------------------+



In [25]:
train.groupBy("type").count().show()

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 298452|
| CASH_IN| 783037|
|CASH_OUT|1253864|
| PAYMENT|1203106|
|   DEBIT|  23236|
+--------+-------+



In [26]:
from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
)

In [27]:
string_indexer = [
    StringIndexer(inputCol=x, outputCol=x + "_StringIndexer", handleInvalid="skip")
    for x in catCols
]

In [28]:
string_indexer

[StringIndexer_502a5bc9834a]

In [30]:
one_hot_encoder = [
    OneHotEncoder(
        inputCols=[f"{x}_StringIndexer" for x in catCols],
        outputCols=[f"{x}_OneHotEncoder" for x in catCols],
    )
]

In [31]:
one_hot_encoder

[OneHotEncoder_bab30181f733]

## Vector Assembling

In [32]:
from pyspark.ml.feature import VectorAssembler

In [33]:
assemblerInput = [x for x in numCols]
assemblerInput += [f"{x}_OneHotEncoder"for x in catCols]

In [34]:
assemblerInput

['amount', 'oldbalanceOrg', 'newbalanceOrig', 'type_OneHotEncoder']

In [35]:
vector_assembler = VectorAssembler(
    inputCols=assemblerInput, outputCol="VectorAssembler_features"
)

In [36]:
stages = []
stages += string_indexer
stages += one_hot_encoder
stages += [vector_assembler]

In [37]:
stages

[StringIndexer_502a5bc9834a,
 OneHotEncoder_bab30181f733,
 VectorAssembler_f8c088944786]

In [39]:
%%time
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages(stages)
model = pipeline.fit(train)

pp_df = model.transform(test)

CPU times: total: 0 ns
Wall time: 12.7 s


In [40]:
pp_df.select(
    "type", "amount", "oldbalanceOrg", "newbalanceOrig", "VectorAssembler_features",
).show(truncate=False)

+-------+------+-------------+--------------+---------------------------------------------------+
|type   |amount|oldbalanceOrg|newbalanceOrig|VectorAssembler_features                           |
+-------+------+-------------+--------------+---------------------------------------------------+
|CASH_IN|9.04  |99971.0      |99980.04      |[9.04,99971.0,99980.04,0.0,0.0,1.0,0.0]            |
|CASH_IN|12.79 |601743.0     |601755.79     |[12.79,601743.0,601755.79,0.0,0.0,1.0,0.0]         |
|CASH_IN|16.3  |2.140511936E7|2.140513566E7 |[16.3,2.140511936E7,2.140513566E7,0.0,0.0,1.0,0.0] |
|CASH_IN|20.34 |991344.41    |991364.74     |[20.34,991344.41,991364.74,0.0,0.0,1.0,0.0]        |
|CASH_IN|25.02 |2.449835711E7|2.449838213E7 |[25.02,2.449835711E7,2.449838213E7,0.0,0.0,1.0,0.0]|
|CASH_IN|26.67 |2117097.76   |2117124.43    |[26.67,2117097.76,2117124.43,0.0,0.0,1.0,0.0]      |
|CASH_IN|36.32 |3206.45      |3242.77       |[36.32,3206.45,3242.77,0.0,0.0,1.0,0.0]            |
|CASH_IN|37.96 |983.

## Logistic Regression

In [41]:
from pyspark.ml.classification import LogisticRegression

In [42]:
data = pp_df.select(
    F.col("VectorAssembler_features").alias("features"),
    F.col("isFraud").alias("label"),
)