### Spark Setup

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.3.4/spark-2.3.4-bin-hadoop2.7.tgz
!tar xf spark-2.3.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.4-bin-hadoop2.7"

import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').appName('test').getOrCreate()

from pyspark import SparkConf
SparkConf().set("spark.executor.memory", "13g")

<pyspark.conf.SparkConf at 0x7f2ec880c160>

In [4]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import Window

import pandas as pd
import numpy as np
import zipfile
from google.colab import files


In [3]:
# Run this cell and select the kaggle.json file downloaded from the Kaggle account settings page.
files.upload()
# Install the Kaggle API client/.
!pip install -q kaggle
# The Kaggle API client expects this file to be in ~/.kaggle so move it there.
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
# Avoid Warnings
!chmod 600 ~/.kaggle/kaggle.json
#  Copy API command from kaggle and paste it to download desired dataset
!kaggle datasets download -d ntnu-testimon/paysim1
# open and extract files 
zip_ref = zipfile.ZipFile('paysim1.zip', 'r')
zip_ref.extractall('files')
zip_ref.close()
# list the files present in zip
!ls files

paysim1.zip: Skipping, found more recently modified local copy (use --force to force download)
PS_20174392719_1491204439457_log.csv


In [17]:
# Read data to spark dataframe
# set inferSchema = True, to get the input schema automatically from data.
df = spark.read.csv('/content/files/PS_20174392719_1491204439457_log.csv', header=True, inferSchema = True)
# columnar storage of data for efficient complex data processing.
# df.repartition(1).write.partitionBy('type').mode("overwrite").parquet("data/df.parquet")
# df = spark.read.option("mergeSchema", "true").parquet("data/df.parquet")

In [18]:
df= df.sort('step')    

In [19]:
df = df.withColumnRenamed('nameOrig','Payee_Account')
df = df.withColumnRenamed('nameDest','Beneficiary_Account')
df = df.withColumnRenamed('oldbalanceOrg','Old_Bal_Orig')
df = df.withColumnRenamed('newbalanceOrig','New_Bal_Orig')
df = df.withColumnRenamed('oldbalanceDest','Old_Bal_Dest')
df = df.withColumnRenamed('newbalanceDest','New_Bal_Dest')


In [None]:
# creating consecutive Trxn_Id
df = df.withColumn('count',F.monotonically_increasing_id() ).withColumn('Trxn_Id',F.row_number().over(Window.orderBy('count')) ).drop('count')


In [23]:
df.show()

+----+--------+---------+-------------+------------+------------+-------------------+------------+------------+-------+--------------+------------+-----------+-----------+-------------+-------+
|step|    type|   amount|Payee_Account|Old_Bal_Orig|New_Bal_Orig|Beneficiary_Account|Old_Bal_Dest|New_Bal_Dest|isFraud|isFlaggedFraud|day_of_month|hour_of_day|day_of_week|week_of_month|Trxn_Id|
+----+--------+---------+-------------+------------+------------+-------------------+------------+------------+-------+--------------+------------+-----------+-----------+-------------+-------+
|   1| PAYMENT|  9839.64|  C1231006815|    170136.0|   160296.36|        M1979787155|         0.0|         0.0|      0|             0|           1|          1|        1.0|            1|      1|
|   1| PAYMENT|  1864.28|  C1666544295|     21249.0|    19384.72|        M2044282225|         0.0|         0.0|      0|             0|           1|          1|        1.0|            1|      2|
|   1|TRANSFER|    181.0|  C13

## Time-based Features

In [25]:
df = df.withColumn('day_of_month', F.when( (F.col('step')%24) !=0, ( ( F.col('step')/24) +1)).otherwise(( F.col('step')/24)).cast(T.IntegerType()))
df = df.withColumn('hour_of_day',F.when((F.col('step')%24)!=0,  (F.col('step')%24)).otherwise(24).cast(T.IntegerType()))
df = df.withColumn('day_of_week',F.when((F.col('day_of_month')%7)!=0,(F.col('day_of_month')%7)).otherwise(7.0))
df = df.withColumn('week_of_month',F.when( (F.col('day_of_month')%7) !=0, ( ( F.col('day_of_month')/7) +1)).otherwise( F.col('day_of_month')/7).cast(T.IntegerType()))

Periodic Mean and Standard Deviation

In [16]:
# not implemented

## Customer Spending Patterns
Number and amount of transactions for the last N hours for an account 


In [31]:

df = df.withColumn("Count_PA_12HR", F.count('Trxn_Id').over(Window.partitionBy("Payee_Account").orderBy("step").rangeBetween(-12, Window.currentRow)))
df = df.withColumn("Count_PA_24HR", F.count('Trxn_Id').over(Window.partitionBy("Payee_Account").orderBy("step").rangeBetween(-24, Window.currentRow)))


In [32]:

df = df.withColumn("Sum_Amount_PA_12HR", F.sum('amount').over(Window.partitionBy("Payee_Account").orderBy("step").rangeBetween(-12, Window.currentRow)))
df = df.withColumn("Sum_Amount_PA_24HR", F.sum('amount').over(Window.partitionBy("Payee_Account").orderBy("step").rangeBetween(-24, Window.currentRow)))


Number and amount of transactions for the last N hours for a transaction type


In [33]:

df = df.withColumn("Count_Type_12HR", F.count('trxn_Id').over(Window.partitionBy("type").orderBy("step").rangeBetween(-12, Window.currentRow)))
df = df.withColumn("Count_Type_24HR", F.count('trxn_Id').over(Window.partitionBy("type").orderBy("step").rangeBetween(-24, Window.currentRow)))


In [34]:

df = df.withColumn("Max_Amount_Type_12HR", F.max('amount').over(Window.partitionBy("type").orderBy("step").rangeBetween(-12, Window.currentRow)))
df = df.withColumn("Max_Amount_Type_24HR", F.max('amount').over(Window.partitionBy("type").orderBy("step").rangeBetween(-24, Window.currentRow)))


Frequency of the beneficiary account for each payee account with larger transaction amount  in the last N days (increase the risk factor for that transaction) 


In [35]:

df = df.withColumn("Count_BA_12HR", F.count('Beneficiary_Account').over(Window.partitionBy("Payee_Account").orderBy("step").rangeBetween(-12, Window.currentRow)))
df = df.withColumn("Count_BA_24HR", F.count('Beneficiary_Account').over(Window.partitionBy("Payee_Account").orderBy("step").rangeBetween(-24, Window.currentRow)))


## Frequency-Based Features

 Maximum amount for each account for the last 30 days

In [36]:
df = df.withColumn("max_amt_30_days", F.max('amount').over(Window.partitionBy("Payee_Account").orderBy("day_of_month").rangeBetween(-30, Window.currentRow)))

In [25]:
# df.filter("Payee_Account='C1154436417'").show()

+----+--------+---------+-------------+------------+------------+-------------------+------------+------------+-------+--------------+------------+-----------+-----------+-------------+-------------+-------------+------------------+------------------+---------------+---------------+--------------------+--------------------+-------------+-------------+---------------+
|step|    type|   amount|Payee_Account|Old_Bal_Orig|New_Bal_Orig|Beneficiary_Account|Old_Bal_Dest|New_Bal_Dest|isFraud|isFlaggedFraud|day_of_month|hour_of_day|day_of_week|week_of_month|Count_PA_12HR|Count_PA_24HR|Sum_Amount_PA_12HR|Sum_Amount_PA_24HR|Count_Type_12HR|Count_Type_24HR|Max_Amount_Type_12HR|Max_Amount_Type_24HR|Count_BA_12HR|Count_BA_24HR|max_amt_30_days|
+----+--------+---------+-------------+------------+------------+-------------------+------------+------------+-------+--------------+------------+-----------+-----------+-------------+-------------+-------------+------------------+------------------+---------

% difference of the transaction amount with the last maximum amount.

In [37]:
df = df.withColumn("diff_with_max", F.col('amount')-F.col('max_amt_30_days'))
df = df.withColumn("perc_diff_with_max", F.when(F.col('diff_with_max')>0,F.col('diff_with_max')/F.col('max_amt_30_days')*100).otherwise(0))

Frequency of the beneficiary account for each payee account  in the last N days (increase the risk factor for that transaction) 


In [38]:
df = df.withColumn("Freq_Benef_Acc", F.count('Payee_Account').over(Window.partitionBy("Beneficiary_Account").orderBy("day_of_month").rangeBetween(-30, Window.currentRow)) )

In [16]:
# df.filter("Beneficiary_Account='C1001354766'").show()

+----+--------+---------+-------------+-------------+------------+-------------------+------------+------------+-------+--------------+------------+-----------+-----------+-------------+-------------+-------------+------------------+------------------+---------------+---------------+--------------------+--------------------+-------------+-------------+---------------+-------------+------------------+--------------+
|step|    type|   amount|Payee_Account| Old_Bal_Orig|New_Bal_Orig|Beneficiary_Account|Old_Bal_Dest|New_Bal_Dest|isFraud|isFlaggedFraud|day_of_month|hour_of_day|day_of_week|week_of_month|Count_PA_12HR|Count_PA_24HR|Sum_Amount_PA_12HR|Sum_Amount_PA_24HR|Count_Type_12HR|Count_Type_24HR|Max_Amount_Type_12HR|Max_Amount_Type_24HR|Count_BA_12HR|Count_BA_24HR|max_amt_30_days|diff_with_max|perc_diff_with_max|Freq_Benef_Acc|
+----+--------+---------+-------------+-------------+------------+-------------------+------------+------------+-------+--------------+------------+-----------+--

Flag large transaction  amount which empties the account balance to zero


In [39]:
THRESHOLD = 200000
df = df.withColumn('Large_Empty',F.when( ((F.col('amount')>THRESHOLD) & (F.col('New_Bal_Orig')==0)) , 1).otherwise(0))

Average amount spent over a week during past month  on the same beneficiary as this transaction


In [18]:
# not implemented yet

Total amount spent on the same day up to this transaction


In [40]:
df = df.withColumn("Amount_Spent_On_Day", F.sum('amount').over(Window.partitionBy("Payee_Account","day_of_week").orderBy("step").rangeBetween(Window.unboundedPreceding, Window.currentRow)))

In [None]:
df.filter("Payee_Account='C935362246'").show()

Total number of transactions on the same day up to this transaction


In [None]:
df = df.withColumn("Count_Trxns_On_Day", F.count('Trxn_Id').over(Window.partitionBy("Payee_Account","day_of_week").orderBy("step").rangeBetween(Window.unboundedPreceding, Window.currentRow)))