In [1]:
import numpy as np
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType

In [2]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)# Property used to format output tables better

df = spark.read.csv("./data/data.csv", inferSchema=True, header=True)

In [3]:
df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [4]:
df.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

In [5]:
df.count()

6362620

In [6]:
df.select("type").distinct()

type
TRANSFER
CASH_IN
CASH_OUT
PAYMENT
DEBIT


In [7]:
df.select("nameDest").distinct().count()

2722362

In [8]:
df.select("step").distinct().orderBy("step", ascending=False).head(5)

[Row(step=743), Row(step=742), Row(step=741), Row(step=740), Row(step=739)]

In [9]:
df.select("nameOrig").distinct().count()

6353307

In [10]:
df = df.select("step", "type", "amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest", "isFraud")
df.head(5)

[Row(step=1, type='PAYMENT', amount=9839.64, oldbalanceOrg=170136.0, newbalanceOrig=160296.36, oldbalanceDest=0.0, newbalanceDest=0.0, isFraud=0),
 Row(step=1, type='PAYMENT', amount=1864.28, oldbalanceOrg=21249.0, newbalanceOrig=19384.72, oldbalanceDest=0.0, newbalanceDest=0.0, isFraud=0),
 Row(step=1, type='TRANSFER', amount=181.0, oldbalanceOrg=181.0, newbalanceOrig=0.0, oldbalanceDest=0.0, newbalanceDest=0.0, isFraud=1),
 Row(step=1, type='CASH_OUT', amount=181.0, oldbalanceOrg=181.0, newbalanceOrig=0.0, oldbalanceDest=21182.0, newbalanceDest=0.0, isFraud=1),
 Row(step=1, type='PAYMENT', amount=11668.14, oldbalanceOrg=41554.0, newbalanceOrig=29885.86, oldbalanceDest=0.0, newbalanceDest=0.0, isFraud=0)]

In [11]:
string_indexer = [StringIndexer(inputCol="type",
                                outputCol="type" + "_StringIndexer",
                                handleInvalid="skip")]
                        
one_hot_encoder = [OneHotEncoder(inputCols=["type_StringIndexer"],
                                 outputCols=["type_OneHotEncoder"])]

assemblerInput = ["step", "amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest", "isFraud", "type_OneHotEncoder"]

vector_assembler = VectorAssembler(inputCols=assemblerInput,
                                   outputCol="VectorAssembler_features")
                    
stages = []
stages += string_indexer
stages += one_hot_encoder
stages += [vector_assembler]

In [12]:
train, test = df.randomSplit([0.7, 0.3], seed=5624)

In [13]:
pipeline = Pipeline().setStages(stages)
pipe_model = pipeline.fit(train)

train_data_pipe = pipe_model.transform(train)
test_data_pipe = pipe_model.transform(test)
train_data_pipe.show(10)

+----+-------+-------+-------------+--------------+--------------+--------------+-------+------------------+------------------+------------------------+
|step|   type| amount|oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|isFraud|type_StringIndexer|type_OneHotEncoder|VectorAssembler_features|
+----+-------+-------+-------------+--------------+--------------+--------------+-------+------------------+------------------+------------------------+
|   1|CASH_IN| 270.78|   4184966.65|    4185237.43|        3019.0|           0.0|      0|               2.0|     (4,[2],[1.0])|    (11,[0,1,2,3,4,9]...|
|   1|CASH_IN| 484.57|   5422437.76|    5422922.33|    5638778.53|    5579568.65|      0|               2.0|     (4,[2],[1.0])|    [1.0,484.57,54224...|
|   1|CASH_IN| 783.31|   8150331.93|    8151115.24|       2013.12|       1229.81|      0|               2.0|     (4,[2],[1.0])|    [1.0,783.31,81503...|
|   1|CASH_IN| 863.08|   9290756.54|    9291619.62|       5577.88|        4714.8| 

In [14]:
train_data = train_data_pipe.select(F.col("VectorAssembler_features").alias("features"),
                                    F.col("isFraud").alias("label"))

test_data = test_data_pipe.select(F.col("VectorAssembler_features").alias("features"),
                                  F.col("isFraud").alias("label"))

In [15]:
lr_spark = LogisticRegression().fit(train_data)

print(f"Training Area Under ROC: {lr_spark.summary.areaUnderROC}")
print(f"Training Accuracy: {lr_spark.summary.accuracy}")
pred = lr_spark.evaluate(test_data)

Training Area Under ROC: 0.9996194116684382
Training Accuracy: 1.0


In [16]:
print(f"Test Area Under ROC: {pred.areaUnderROC}")
print(f"Test Accuracy: {pred.accuracy}")

Test Area Under ROC: 0.9995965438127711
Test Accuracy: 1.0


In [17]:
metrics = MulticlassMetrics(pred.predictions.select("prediction", "label").withColumn("label", F.col("prediction").cast(FloatType())).rdd.map(tuple))



In [18]:
cm = metrics.confusionMatrix().toArray()

In [19]:
cm

array([[1906031.,       0.],
       [      0.,    2438.]])