In [1]:
%load_ext nb_black
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

<IPython.core.display.Javascript object>

In [2]:
spark

<IPython.core.display.Javascript object>

In [3]:
import os
from os.path import isfile, join

loc = os.path.abspath("")
data_loc = f"{loc}/data/"

<IPython.core.display.Javascript object>

Dataset: https://www.kaggle.com/ntnu-testimon/paysim1/data (Randomly sampled 10% of the dataset)

In [4]:
df = spark.read.csv(
    f"{data_loc}Synthetic Financial Data.csv", inferSchema=True, header=True
)

<IPython.core.display.Javascript object>

In [5]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)



<IPython.core.display.Javascript object>

In [6]:
df.show(2)

+---+----+-------+--------+----------+-------------+--------------+-----------+--------------+--------------+-------+
|_c0|step|   type|  amount|  nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|
+---+----+-------+--------+----------+-------------+--------------+-----------+--------------+--------------+-------+
|  0| 688|CASH_IN|23557.12|C867750533|       8059.0|      31616.12|C1026934669|     169508.66|     145951.53|      0|
|  1| 274|PAYMENT| 6236.13|C601099070|          0.0|           0.0| M701283411|           0.0|           0.0|      0|
+---+----+-------+--------+----------+-------------+--------------+-----------+--------------+--------------+-------+
only showing top 2 rows



<IPython.core.display.Javascript object>

In [7]:
df = df.select("type", "amount", "oldbalanceOrg", "newbalanceOrig", "isFraud")

<IPython.core.display.Javascript object>

In [8]:
df.show(2)

+-------+--------+-------------+--------------+-------+
|   type|  amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+--------+-------------+--------------+-------+
|CASH_IN|23557.12|       8059.0|      31616.12|      0|
|PAYMENT| 6236.13|          0.0|           0.0|      0|
+-------+--------+-------------+--------------+-------+
only showing top 2 rows



<IPython.core.display.Javascript object>

### Train/test split

In [9]:
train, test = df.randomSplit([0.7, 0.3], seed=7)

<IPython.core.display.Javascript object>

In [10]:
print(f"Train set length: {train.count()} records")
print(f"Test set length: {test.count()} records")

Train set length: 445207 records
Test set length: 191055 records


<IPython.core.display.Javascript object>

In [11]:
train.show(2)

+-------+------+-------------+--------------+-------+
|   type|amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+------+-------------+--------------+-------+
|CASH_IN|  1.91|     316450.0|     316451.91|      0|
|CASH_IN| 93.31|    170084.31|     170177.62|      0|
+-------+------+-------------+--------------+-------+
only showing top 2 rows



<IPython.core.display.Javascript object>

### Dtypes
In this dataset, any column of type string is treated as a categorical feature, but sometimes we might have numeric features we want treated as categorical or vice versa. Weâ€™ll need to carefully identify which columns are numeric and which are categorical.

In [12]:
train.dtypes

[('type', 'string'),
 ('amount', 'double'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('isFraud', 'int')]

<IPython.core.display.Javascript object>

In [13]:
catCols = [x for (x, dataType) in train.dtypes if dataType == "string"]
numCols = [
    x for (x, dataType) in train.dtypes if ((dataType == "double") & (x != "isFraud"))
]

<IPython.core.display.Javascript object>

In [14]:
print(numCols)
print(catCols)

['amount', 'oldbalanceOrg', 'newbalanceOrig']
['type']


<IPython.core.display.Javascript object>

### One hot encoding

StringIndexer:
Converts a single feature to an index feature.
http://spark.apache.org/docs/latest/ml-features#stringindexer


OneHotEncoder:
http://spark.apache.org/docs/latest/ml-features#onehotencoder

For more info: http://spark.apache.org/docs/latest/ml-features

In [15]:
train.agg(F.countDistinct("type")).show()

+-----------+
|count(type)|
+-----------+
|          5|
+-----------+



<IPython.core.display.Javascript object>

In [16]:
train.groupBy("type").count().show()

+--------+------+
|    type| count|
+--------+------+
|TRANSFER| 37320|
| CASH_IN| 98095|
|CASH_OUT|156447|
| PAYMENT|150410|
|   DEBIT|  2935|
+--------+------+



<IPython.core.display.Javascript object>

In [17]:
from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
)

<IPython.core.display.Javascript object>

In [18]:
string_indexer = [
    StringIndexer(inputCol=x, outputCol=x + "_StringIndexer", handleInvalid="skip")
    for x in catCols
]

<IPython.core.display.Javascript object>

In [19]:
string_indexer

[StringIndexer_ac18404724d3]

<IPython.core.display.Javascript object>

In [20]:
one_hot_encoder = [
    OneHotEncoder(
        inputCols=[f"{x}_StringIndexer" for x in catCols],
        outputCols=[f"{x}_OneHotEncoder" for x in catCols],
    )
]

<IPython.core.display.Javascript object>

In [21]:
one_hot_encoder

[OneHotEncoder_1a81f21e8570]

<IPython.core.display.Javascript object>

### Vector assembling

VectorAssembler:
Combines the values of input columns into a single vector.
http://spark.apache.org/docs/latest/ml-features#vectorassembler


In [22]:
from pyspark.ml.feature import VectorAssembler

<IPython.core.display.Javascript object>

In [23]:
assemblerInput = [x for x in numCols]
assemblerInput += [f"{x}_OneHotEncoder" for x in catCols]

<IPython.core.display.Javascript object>

In [24]:
assemblerInput

['amount', 'oldbalanceOrg', 'newbalanceOrig', 'type_OneHotEncoder']

<IPython.core.display.Javascript object>

In [25]:
vector_assembler = VectorAssembler(
    inputCols=assemblerInput, outputCol="VectorAssembler_features"
)

<IPython.core.display.Javascript object>

In [26]:
stages = []
stages += string_indexer
stages += one_hot_encoder
stages += [vector_assembler]


<IPython.core.display.Javascript object>

In [27]:
stages

[StringIndexer_ac18404724d3,
 OneHotEncoder_1a81f21e8570,
 VectorAssembler_274ddcafb552]

<IPython.core.display.Javascript object>

In [28]:
%%time
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages(stages)
model = pipeline.fit(train)

pp_df = model.transform(test)

CPU times: user 15.1 ms, sys: 3.45 ms, total: 18.5 ms
Wall time: 927 ms


<IPython.core.display.Javascript object>

In [29]:
pp_df.select(
    "type", "amount", "oldbalanceOrg", "newbalanceOrig", "VectorAssembler_features",
).show(truncate=False)

+-------+-------+-------------+--------------+----------------------------------------------------+
|type   |amount |oldbalanceOrg|newbalanceOrig|VectorAssembler_features                            |
+-------+-------+-------------+--------------+----------------------------------------------------+
|CASH_IN|155.86 |2593748.84   |2593904.7     |[155.86,2593748.84,2593904.7,0.0,0.0,1.0,0.0]       |
|CASH_IN|273.95 |77437.29     |77711.24      |[273.95,77437.29,77711.24,0.0,0.0,1.0,0.0]          |
|CASH_IN|301.68 |339375.21    |339676.89     |[301.68,339375.21,339676.89,0.0,0.0,1.0,0.0]        |
|CASH_IN|342.82 |2747733.77   |2748076.59    |[342.82,2747733.77,2748076.59,0.0,0.0,1.0,0.0]      |
|CASH_IN|374.39 |705640.24    |706014.63     |[374.39,705640.24,706014.63,0.0,0.0,1.0,0.0]        |
|CASH_IN|418.19 |2.536384556E7|2.536426374E7 |[418.19,2.536384556E7,2.536426374E7,0.0,0.0,1.0,0.0]|
|CASH_IN|474.41 |136.0        |610.41        |[474.41,136.0,610.41,0.0,0.0,1.0,0.0]               |


<IPython.core.display.Javascript object>

### Logistic Regression

In [30]:
from pyspark.ml.classification import LogisticRegression

<IPython.core.display.Javascript object>

In [31]:
data = pp_df.select(
    F.col("VectorAssembler_features").alias("features"),
    F.col("isFraud").alias("label"),
)

<IPython.core.display.Javascript object>

In [32]:
data.show(5, truncate=False)

+----------------------------------------------+-----+
|features                                      |label|
+----------------------------------------------+-----+
|[155.86,2593748.84,2593904.7,0.0,0.0,1.0,0.0] |0    |
|[273.95,77437.29,77711.24,0.0,0.0,1.0,0.0]    |0    |
|[301.68,339375.21,339676.89,0.0,0.0,1.0,0.0]  |0    |
|[342.82,2747733.77,2748076.59,0.0,0.0,1.0,0.0]|0    |
|[374.39,705640.24,706014.63,0.0,0.0,1.0,0.0]  |0    |
+----------------------------------------------+-----+
only showing top 5 rows



<IPython.core.display.Javascript object>

In [33]:
%%time
model = LogisticRegression().fit(data)

CPU times: user 7.46 ms, sys: 1.99 ms, total: 9.45 ms
Wall time: 4.21 s


<IPython.core.display.Javascript object>

In [34]:
model.summary.areaUnderROC

0.9896242329869464

<IPython.core.display.Javascript object>

In [35]:
model.summary.pr.show()

+------------------+--------------------+
|            recall|           precision|
+------------------+--------------------+
|               0.0| 0.10701298701298702|
|0.8340080971659919| 0.10701298701298702|
|0.8906882591093117|  0.0573962953300287|
|0.9311740890688259|  0.0400627068454973|
|0.9757085020242915|0.031507386586481896|
|0.9919028340080972|0.025632977610378742|
|0.9919028340080972|0.021367521367521368|
|0.9919028340080972| 0.01831912666367579|
|0.9919028340080972|0.016741834085007515|
|0.9919028340080972|0.014810784669326563|
|0.9919028340080972|0.013279132791327914|
|0.9919028340080972|0.012034581000098242|
|0.9919028340080972|0.011002829298962591|
|0.9919028340080972|0.010134436401240952|
|0.9919028340080972| 0.00939273117619997|
|0.9919028340080972|0.009355787222667736|
|0.9919028340080972|0.008720412884854957|
|0.9919028340080972|0.008165850081658502|
|0.9919028340080972|0.007677362747555779|
|0.9919028340080972| 0.00724401998758168|
+------------------+--------------

<IPython.core.display.Javascript object>