In [33]:
# The data set is from https://www.kaggle.com/ealaxi/paysim1

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder.appName('FraudDectection').getOrCreate()

In [3]:
df = spark.read.csv('data.csv', header=True, inferSchema=True)

In [4]:
df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [5]:
df.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

In [6]:
df_feature = df.select('type', 'amount', 'oldbalanceOrg', 'newbalanceOrig','isFraud')

In [7]:
df_feature.show(2)

+-------+-------+-------------+--------------+-------+
|   type| amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+-------+-------------+--------------+-------+
|PAYMENT|9839.64|     170136.0|     160296.36|      0|
|PAYMENT|1864.28|      21249.0|      19384.72|      0|
+-------+-------+-------------+--------------+-------+
only showing top 2 rows



# Train / Test Split

In [8]:
# This is a real large dataset, I only use half of the data in this project

In [9]:
df1, df2 = df_feature.randomSplit([0.5, 0.5], seed=7)

In [10]:
train, test, valid = df1.randomSplit([0.7, 0.2, 0.1], seed=7)

In [11]:
print(f"Train set length: {train.count()} records")
print(f"Test set length: {test.count()} records")
print(f"Valid set length: {valid.count()} records")

Train set length: 2224874 records
Test set length: 636417 records
Valid set length: 318033 records


In [12]:
train.dtypes

[('type', 'string'),
 ('amount', 'double'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('isFraud', 'int')]

In [13]:
catCols = [x for (x, dataType) in train.dtypes if dataType == 'string']
numCols = [x for (x, dataType) in train.dtypes if (dataType == 'double') & (x != 'isFraud')] 

In [14]:
catCols

['type']

In [15]:
numCols

['amount', 'oldbalanceOrg', 'newbalanceOrig']

# One Hot Coding
Encoding category columns into on one hot code

In [16]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

In [17]:
train.groupBy(catCols).count().show()

+--------+------+
|    type| count|
+--------+------+
|TRANSFER|186585|
| CASH_IN|488942|
|CASH_OUT|782656|
| PAYMENT|752146|
|   DEBIT| 14545|
+--------+------+



In [18]:
string_indexer = [
    StringIndexer(inputCol=x, outputCol=x+"_StringIndexer", handleInvalid="skip")
    for x in catCols
]

In [19]:
string_indexer

[StringIndexer_6d9a4bafecb3]

In [21]:
one_hot_encoder = [
    OneHotEncoder(
        inputCols=[f"{x}_StringIndexer" for x in catCols],
        outputCols=[f"{x}_OneHotEncoder" for x in catCols],
    )
]

In [22]:
one_hot_encoder

[OneHotEncoder_e21ed1855904]

# Vector Assembler

In [23]:
from pyspark.ml.feature import VectorAssembler

In [24]:
assemblerInput = [x for x in numCols]
assemblerInput += [f"{x}_OneHotEncoder" for x in catCols]

In [25]:
assemblerInput

['amount', 'oldbalanceOrg', 'newbalanceOrig', 'type_OneHotEncoder']

In [26]:
vector_assembler = VectorAssembler(
    inputCols=assemblerInput, outputCol="features"
)

In [27]:
vector_assembler

VectorAssembler_7e6ae1ee65f8

# Building the pipeline

In [35]:
stages= []
stages += string_indexer
stages += one_hot_encoder
stages += [vector_assembler]

In [36]:
stages

[StringIndexer_6d9a4bafecb3,
 OneHotEncoder_e21ed1855904,
 VectorAssembler_7e6ae1ee65f8]

In [37]:
from pyspark.ml import Pipeline

In [60]:
%%time
pipleline = Pipeline().setStages(stages)
model = pipleline.fit(train)
pp_df = model.transform(train)

Wall time: 5.15 s


In [45]:
pp_df.select(
    'type', 'amount', 'oldbalanceOrg', 'newbalanceOrig', 'features'
).show(5, truncate=False)

+-------+------+-------------+--------------+---------------------------------------------------+
|type   |amount|oldbalanceOrg|newbalanceOrig|features                                           |
+-------+------+-------------+--------------+---------------------------------------------------+
|CASH_IN|5.66  |5061561.06   |5061566.72    |[5.66,5061561.06,5061566.72,0.0,0.0,1.0,0.0]       |
|CASH_IN|14.4  |1.143460813E7|1.143462253E7 |[14.4,1.143460813E7,1.143462253E7,0.0,0.0,1.0,0.0] |
|CASH_IN|16.89 |0.0          |16.89         |(7,[0,2,5],[16.89,16.89,1.0])                      |
|CASH_IN|17.53 |1111294.85   |1111312.37    |[17.53,1111294.85,1111312.37,0.0,0.0,1.0,0.0]      |
|CASH_IN|23.36 |2.442828608E7|2.442830944E7 |[23.36,2.442828608E7,2.442830944E7,0.0,0.0,1.0,0.0]|
+-------+------+-------------+--------------+---------------------------------------------------+
only showing top 5 rows



# Logistic Regression

In [47]:
from pyspark.ml.classification import LogisticRegression

In [61]:
data = pp_df.select(
    col('features'),
    col("isFraud").alias("label")
)

In [62]:
%%time
lr_model = LogisticRegression().fit(data)

Wall time: 18.6 s


In [63]:
lr_model.summary.areaUnderROC

0.992288259415677

In [64]:
pp_test = model.transform(test)

In [65]:
test_data = pp_test.select(
    col('features'),
    col("isFraud").alias("label")
)

In [68]:
prediction = lr_model.transform(test_data)

In [69]:
prediction.columns

['features', 'label', 'rawPrediction', 'probability', 'prediction']

In [81]:
accuracy = prediction.filter(prediction.label == prediction.prediction).count() / float(prediction.count())
print("Accuracy : ",accuracy)

Accuracy :  0.9991609275050792
