In [112]:
from pyspark.sql import SparkSession 
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()


In [113]:
spark

In [114]:
import os
from os.path import isfile, join

loc = os.path.abspath(".")

data_loc = f"{loc}/"



In [115]:
input_data = spark.read.csv(f"{data_loc}Financial_data.csv", inferSchema = True, header = True)

In [116]:

df = input_data.sample(fraction=0.1)

df.printSchema()



root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [117]:
df.show(3)

+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1|   DEBIT|  1065.41|C1959239586|       1817.0|        751.59| C515132998|       10330.0|           0.0|      0|             0|
|   1| PAYMENT|  8901.99|C1632497828|      2958.91|           0.0|  M33419717|           0.0|           0.0|      0|             0|
|   1|CASH_OUT|110414.71| C768216420|     26845.41|           0.0|C1509514333|      288800.0|       2415.16|      0|             0|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
only showing top 3 rows



In [118]:
train, test = df.randomSplit([0.7, 0.3], seed=7)

print(f"Train set length: {train.count()} reords")
print(f"Test set length: {test.count()} reords")

Train set length: 445012 reords
Test set length: 191204 reords


In [119]:
train.show(2)

+----+-------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|   type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+-------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1|CASH_IN|  270.78| C619985571|   4184966.65|    4185237.43| C875917495|        3019.0|           0.0|      0|             0|
|   1|CASH_IN|10844.33|C1828508781|   8623151.43|    8633995.76|C2083117811|       42274.0|      290772.6|      0|             0|
+----+-------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
only showing top 2 rows



In [120]:
train.dtypes

[('step', 'int'),
 ('type', 'string'),
 ('amount', 'double'),
 ('nameOrig', 'string'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('nameDest', 'string'),
 ('oldbalanceDest', 'double'),
 ('newbalanceDest', 'double'),
 ('isFraud', 'int'),
 ('isFlaggedFraud', 'int')]

In [121]:
catcols = [x for (x,dtypes) in train.dtypes if dtypes == "string" and x not in ["nameOrig","nameDest"]]
numcols = [x for (x,dtypes) in train.dtypes if dtypes == "double"]

In [122]:
print(catcols)
print(numcols)

['type']
['amount', 'oldbalanceOrg', 'newbalanceOrig', 'oldbalanceDest', 'newbalanceDest']


In [123]:
train.agg(F.countDistinct("type")).show()

+--------------------+
|count(DISTINCT type)|
+--------------------+
|                   5|
+--------------------+



In [124]:
train.groupby("type").count().show()

+--------+------+
|    type| count|
+--------+------+
|TRANSFER| 37390|
| CASH_IN| 97613|
|CASH_OUT|156502|
| PAYMENT|150632|
|   DEBIT|  2875|
+--------+------+



In [125]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

In [126]:
string_indexer = [
    StringIndexer(inputCol=x, outputCol=x + "_StringIndexer", handleInvalid="skip") for x in catcols    
]

In [127]:
string_indexer

[StringIndexer_f1ae3725efc8]

In [128]:
one_hot_encoder = [
    OneHotEncoder(inputCols=[f"{x}_StringIndexer" for x in catcols],
                  outputCols=[f"{x}_OneHotEncoder" for x in catcols],
    )     
]

In [129]:
one_hot_encoder

[OneHotEncoder_170807770483]

Vector Assembler

In [130]:
from pyspark.ml.feature import VectorAssembler

In [131]:
assemblerInput = [x for x in numcols]
assemblerInput += [f"{x}_OneHotEncoder" for x in catcols]
assemblerInput

['amount',
 'oldbalanceOrg',
 'newbalanceOrig',
 'oldbalanceDest',
 'newbalanceDest',
 'type_OneHotEncoder']

In [132]:
vector_assembler = VectorAssembler(
    inputCols= assemblerInput, outputCol="VectorAssembler_features"
)

In [133]:
stages = []
stages += string_indexer
stages += one_hot_encoder
stages += [vector_assembler]

stages

[StringIndexer_f1ae3725efc8,
 OneHotEncoder_170807770483,
 VectorAssembler_1dadb6f94700]

In [134]:
from pyspark.ml import Pipeline


In [135]:
%%time

pipeline = Pipeline().setStages(stages)

model = pipeline.fit(train)


CPU times: total: 46.9 ms
Wall time: 16.2 s


In [136]:
pp_df = model.transform(test)

In [137]:
pp_df.select("VectorAssembler_features").show(10,truncate=False)

+----------------------------------------------------------------------+
|VectorAssembler_features                                              |
+----------------------------------------------------------------------+
|[18536.86,4031649.33,4050186.19,303464.0,0.0,0.0,0.0,1.0,0.0]         |
|[35902.49,371688.15,407590.65,49003.3,0.0,0.0,0.0,1.0,0.0]            |
|[44809.72,3493979.56,3538789.28,5683588.25,5579568.65,0.0,0.0,1.0,0.0]|
|[57262.72,4374599.01,4431861.74,72917.77,16896.7,0.0,0.0,1.0,0.0]     |
|[62871.06,8367720.25,8430591.3,177707.91,4894.45,0.0,0.0,1.0,0.0]     |
|[65525.9,6129312.4,6194838.3,129627.65,122027.28,0.0,0.0,1.0,0.0]     |
|[72897.92,6666214.43,6739112.35,327169.9,4891090.56,0.0,0.0,1.0,0.0]  |
|[80448.13,4274305.14,4354753.26,124139.21,43691.09,0.0,0.0,1.0,0.0]   |
|[89327.65,943628.44,1032956.09,2063442.28,3940085.21,0.0,0.0,1.0,0.0] |
|[95564.34,5326873.42,5422437.76,97263.78,1699.44,0.0,0.0,1.0,0.0]     |
+--------------------------------------------------

In [138]:
from pyspark.ml.classification import LogisticRegression

In [139]:
data = pp_df.select(
    F.col("VectorAssembler_features").alias("features"),
    F.col("isFraud").alias("label")
)

In [140]:
data.show(5,truncate=False)

+----------------------------------------------------------------------+-----+
|features                                                              |label|
+----------------------------------------------------------------------+-----+
|[18536.86,4031649.33,4050186.19,303464.0,0.0,0.0,0.0,1.0,0.0]         |0    |
|[35902.49,371688.15,407590.65,49003.3,0.0,0.0,0.0,1.0,0.0]            |0    |
|[44809.72,3493979.56,3538789.28,5683588.25,5579568.65,0.0,0.0,1.0,0.0]|0    |
|[57262.72,4374599.01,4431861.74,72917.77,16896.7,0.0,0.0,1.0,0.0]     |0    |
|[62871.06,8367720.25,8430591.3,177707.91,4894.45,0.0,0.0,1.0,0.0]     |0    |
+----------------------------------------------------------------------+-----+
only showing top 5 rows



In [141]:
%%time

model1 = LogisticRegression().fit(data)

CPU times: total: 31.2 ms
Wall time: 42.5 s


In [142]:
model1.summary.areaUnderROC

0.9823238841041741

In [143]:
model1.summary.pr.show()

+-------------------+-------------------+
|             recall|          precision|
+-------------------+-------------------+
|                0.0| 0.9142857142857143|
|0.38095238095238093| 0.9142857142857143|
| 0.5357142857142857| 0.7180851063829787|
| 0.5674603174603174| 0.5276752767527675|
| 0.5833333333333334| 0.4152542372881356|
| 0.6031746031746031|0.34782608695652173|
|  0.623015873015873| 0.3019230769230769|
| 0.6349206349206349|0.26533996683250416|
| 0.6587301587301587|0.24198250728862974|
| 0.6666666666666666|0.21846553966189858|
| 0.6825396825396826|0.20187793427230047|
| 0.6944444444444444|0.18716577540106952|
| 0.7023809523809523| 0.1738703339882122|
| 0.7182539682539683|0.16439600363306087|
| 0.7301587301587301| 0.1554054054054054|
| 0.7420634920634921|0.14759273875295975|
|  0.746031746031746|0.13925925925925925|
|  0.753968253968254|0.13258897418004187|
| 0.7619047619047619| 0.1266490765171504|
| 0.7738095238095238|0.12195121951219512|
+-------------------+-------------