## Imports

In [87]:
from hops import hdfs
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, FloatType, BooleanType, IntegerType, StructType, StructField
from pyspark.sql.functions import *
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.sql import SQLContext
from pyspark.sql import Row

In [2]:
%%local
import matplotlib.pyplot as plt
from pylab import rcParams
import numpy as np
%matplotlib inline

## Constants

In [67]:
INPUT_DATASET_PATH = hdfs.project_path() + '/transactions/transactions.csv'
OUTPUT_DATASET_PATH = hdfs.project_path() + '/transactions/cleaned_transactions.csv'
OUTPUT_METADATA_PATH = hdfs.project_path() + '/transactions/metadata'

## Read Dataset with Spark

In [4]:
df_raw = spark.read.format("csv").option("header", "true").load(INPUT_DATASET_PATH).repartition(100)
df_raw.persist()

DataFrame[step: string, type: string, amount: string, nameOrig: string, oldbalanceOrg: string, newbalanceOrig: string, nameDest: string, oldbalanceDest: string, newbalanceDest: string, isFraud: string, isFlaggedFraud: string]

## Basic Data Exploration

Columns: 

step - maps a unit of time in the real world. In this case 1 step is 1 hour of time. Total steps 744 (30 days simulation).

type - CASH-IN, CASH-OUT, DEBIT, PAYMENT and TRANSFER.

amount - amount of the transaction in local currency.

nameOrig - customer who started the transaction

oldbalanceOrg - initial balance before the transaction

newbalanceOrig - new balance after the transaction

nameDest - customer who is the recipient of the transaction

oldbalanceDest - initial balance recipient before the transaction. Note that there is not information for customers that start with M (Merchants).

newbalanceDest - new balance recipient after the transaction. Note that there is not information for customers that start with M (Merchants).

isFraud - This is the transactions made by the fraudulent agents inside the simulation. In this specific dataset the fraudulent behavior of the agents aims to profit by taking control or customers accounts and try to empty the funds by transferring to another account and then cashing out of the system.

isFlaggedFraud - The business model aims to control massive transfers from one account to another and flags illegal attempts. An illegal attempt in this dataset is an attempt to transfer more than 200.000 in a single transaction.

In [5]:
df_raw.printSchema()

root
 |-- step: string (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: string (nullable = true)
 |-- newbalanceOrig: string (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: string (nullable = true)
 |-- newbalanceDest: string (nullable = true)
 |-- isFraud: string (nullable = true)
 |-- isFlaggedFraud: string (nullable = true)

## Cast Columns to Correct Type

In [6]:
df_raw.columns

['step', 'type', 'amount', 'nameOrig', 'oldbalanceOrg', 'newbalanceOrig', 'nameDest', 'oldbalanceDest', 'newbalanceDest', 'isFraud', 'isFlaggedFraud']

In [8]:
def string_to_float(x):
    return float(x)

def is_fraud_to_bool(x):
    x_int = int(x)
    if x_int == 1:
        return True
    else:
        return False

udf_string_to_float = udf(string_to_float, returnType = FloatType())
udf_is_fraud_to_bool = udf(is_fraud_to_bool, returnType = BooleanType())

In [9]:
df_casted = df_raw \
        .withColumn("amount_float", udf_string_to_float("amount")) \
        .withColumn("oldbalanceOrg_float", udf_string_to_float("oldbalanceOrg")) \
        .withColumn("newbalanceOrig_float", udf_string_to_float("newbalanceOrig")) \
        .withColumn("oldbalanceDest_float", udf_string_to_float("oldbalanceDest")) \
        .withColumn("newbalanceDest_float", udf_string_to_float("newbalanceDest")) \
        .withColumn("step_float", udf_string_to_float("step")) \
        .withColumn("isFraud_bool", udf_is_fraud_to_bool("isFraud")) \
        .withColumn("isFlaggedFraud_bool", udf_is_fraud_to_bool("isFlaggedFraud")) \
        .select(
                col("step_float").alias("step"), 
                "type", 
                col("amount_float").alias("amount"), 
                "nameOrig", 
                col("oldbalanceOrg_float").alias("oldbalanceOrg"), 
                col("newbalanceOrig_float").alias("newbalanceOrg"),
                "nameDest",
                col("oldbalanceDest_float").alias("oldbalanceDest"),
                col("newbalanceDest_float").alias("newbalanceDest"),
                col("isFraud_bool").alias("isFraud"),
                col("isFlaggedFraud_bool").alias("isFlaggedFraud")
               )

In [10]:
df_casted.persist()

DataFrame[step: float, type: string, amount: float, nameOrig: string, oldbalanceOrg: float, newbalanceOrg: float, nameDest: string, oldbalanceDest: float, newbalanceDest: float, isFraud: boolean, isFlaggedFraud: boolean]

In [11]:
df_casted.printSchema()

root
 |-- step: float (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: float (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: float (nullable = true)
 |-- newbalanceOrg: float (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: float (nullable = true)
 |-- newbalanceDest: float (nullable = true)
 |-- isFraud: boolean (nullable = true)
 |-- isFlaggedFraud: boolean (nullable = true)

## DataFrame Summary Statistics

In [10]:
df_casted.show(5)

+----+--------+--------+-----------+-------------+-------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrg|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+-------------+-----------+--------------+--------------+-------+--------------+
| 1.0| PAYMENT| 9839.64|C1231006815|     170136.0|    160296.36|M1979787155|           0.0|           0.0|  false|         false|
| 1.0| PAYMENT| 1864.28|C1666544295|      21249.0|     19384.72|M2044282225|           0.0|           0.0|  false|         false|
| 1.0|TRANSFER|   181.0|C1305486145|        181.0|          0.0| C553264065|           0.0|           0.0|   true|         false|
| 1.0|CASH_OUT|   181.0| C840083671|        181.0|          0.0|  C38997010|       21182.0|           0.0|   true|         false|
| 1.0| PAYMENT|11668.14|C2048537720|      41554.0|     29885.86|M1230701703|           0.0

In [11]:
df_casted.count()

6362620

In [12]:
df_stats = df_casted.describe()
df_stats.show()

+-------+------------------+--------+------------------+-----------+------------------+------------------+-----------+------------------+------------------+
|summary|              step|    type|            amount|   nameOrig|     oldbalanceOrg|     newbalanceOrg|   nameDest|    oldbalanceDest|    newbalanceDest|
+-------+------------------+--------+------------------+-----------+------------------+------------------+-----------+------------------+------------------+
|  count|           6362620| 6362620|           6362620|    6362620|           6362620|           6362620|    6362620|           6362620|           6362620|
|   mean|243.39724563151657|    null|179861.90355799918|       null| 833883.1040482766| 855113.6685519267|       null|1100701.6665156619|1224996.3981843132|
| stddev|142.33197104913046|    null|  603858.231677277|       null|2888242.6729989573|2924048.5029103346|       null|3399180.1117577255| 3674128.941013774|
|    min|               1.0| CASH_IN|               0.0|C1

In [13]:
df_stats.select("summary", "step", "amount", "type", "nameOrig").show()

+-------+------------------+------------------+--------+-----------+
|summary|              step|            amount|    type|   nameOrig|
+-------+------------------+------------------+--------+-----------+
|  count|           6362620|           6362620| 6362620|    6362620|
|   mean|243.39724563151657|179861.90355799918|    null|       null|
| stddev|142.33197104913046|  603858.231677277|    null|       null|
|    min|               1.0|               0.0| CASH_IN|C1000000639|
|    max|             743.0|        9.244552E7|TRANSFER| C999999784|
+-------+------------------+------------------+--------+-----------+

In [14]:
df_stats.select("summary", "oldbalanceOrg", "newbalanceOrg", "oldbalanceDest", "newbalanceDest").show()

+-------+------------------+------------------+------------------+------------------+
|summary|     oldbalanceOrg|     newbalanceOrg|    oldbalanceDest|    newbalanceDest|
+-------+------------------+------------------+------------------+------------------+
|  count|           6362620|           6362620|           6362620|           6362620|
|   mean| 833883.1040482766| 855113.6685519267|1100701.6665156619|1224996.3981843132|
| stddev|2888242.6729989573|2924048.5029103346|3399180.1117577255| 3674128.941013774|
|    min|               0.0|               0.0|               0.0|               0.0|
|    max|        5.958504E7|        4.958504E7|      3.56015904E8|      3.56179264E8|
+-------+------------------+------------------+------------------+------------------+

## Covariance/Correlation Statistics

Covariance is a measure of how two variables change with respect to each other. A positive number would mean that there is a tendency that as one variable increases, the other increases as well. A negative number would mean that as one variable increases, the other variable has a tendency to decrease. The sample covariance of two columns of a DataFrame can be calculated as follows

In [15]:
df_casted.stat.cov('oldbalanceDest', 'newbalanceDest')

12196389479605.941

In [16]:
df_casted.stat.cov('oldbalanceDest', 'oldbalanceOrg')

650346160469.154

In [17]:
df_casted.stat.cov('newbalanceDest', 'newbalanceOrg')

449474089593.0458

In [18]:
df_casted.stat.cov('amount', 'oldbalanceDest')

603753263202.9885

In [19]:
df_casted.stat.cov('amount', 'oldbalanceOrg')

-4818002158.3172455

In [20]:
df_casted.stat.cov('amount', 'newbalanceDest')

1019036792579.7117

In [21]:
df_casted.stat.cov('amount', 'newbalanceOrg')

-13880120343.009201

 Correlation is a normalized measure of covariance that is easier to understand, as it provides quantitative measurements of the statistical dependence between two random variables.

In [22]:
df_casted.stat.corr('oldbalanceDest', 'newbalanceDest')

0.9765685054902183

In [23]:
df_casted.stat.corr('oldbalanceDest', 'oldbalanceOrg')

0.06624250134480529

In [24]:
df_casted.stat.corr('newbalanceDest', 'newbalanceOrg')

0.04183749714055719

In [25]:
df_casted.stat.corr('amount', 'oldbalanceDest')

0.2941374500504793

In [26]:
df_casted.stat.corr('amount', 'oldbalanceOrg')

-0.0027624747622870425

In [27]:
df_casted.stat.corr('amount', 'newbalanceDest')

0.45930426711925426

In [28]:
df_casted.stat.corr('amount', 'newbalanceOrg')

-0.007860925283326372

## Register SQL Table View

In [29]:
# Register the DataFrame as a SQL temporary view
df_casted.createOrReplaceTempView("trx")

## Counts

In [30]:
number_of_transactions = df_casted.count()
number_of_frauds = spark.sql("SELECT isFraud FROM trx WHERE isFraud = 'true'").count()
number_of_non_frauds = spark.sql("SELECT isFraud FROM trx WHERE isFraud = 'false'").count()
number_of_flagged_frauds = spark.sql("SELECT isFlaggedFraud FROM trx WHERE isFraud = 'true'").count()
number_of_non_flagged_frauds = spark.sql("SELECT isFlaggedFraud FROM trx WHERE isFraud = 'false'").count()

In [31]:
number_of_transactions

6362620

In [32]:
number_of_frauds

8213

In [33]:
number_of_non_frauds

6354407

In [34]:
assert number_of_frauds + number_of_non_frauds == number_of_transactions

In [35]:
number_of_flagged_frauds

8213

In [36]:
number_of_non_flagged_frauds

6354407

In [37]:
assert number_of_flagged_frauds + number_of_non_flagged_frauds == number_of_transactions

## Data Cleaning

In [46]:
df_cleaned = df_casted.drop("isFlaggedFraud") # Redundant column
df_cleaned = df_cleaned.persist()

In [14]:
orig_account_names = df_cleaned.select("nameOrig").distinct().collect()
dest_account_names = df_cleaned.select("nameDest").distinct().collect()

In [15]:
orig_account_names = list(map(lambda x: x.nameOrig, orig_account_names))
dest_account_names = list(map(lambda x: x.nameDest, dest_account_names))

In [16]:
names = set(orig_account_names + dest_account_names)

In [17]:
number_of_unique_accounts = len(names)

In [18]:
number_of_unique_accounts

9073900

In [19]:
names = list(names)

In [33]:
names_lookup = {k: v for v, k in enumerate(names)}

In [35]:
names_lookup_broadcast = spark.sparkContext.broadcast(names_lookup)

In [36]:
df_cleaned = df_cleaned.persist()

In [47]:
df_cleaned.printSchema()

root
 |-- step: float (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: float (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: float (nullable = true)
 |-- newbalanceOrg: float (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: float (nullable = true)
 |-- newbalanceDest: float (nullable = true)
 |-- isFraud: boolean (nullable = true)

In [48]:
transaction_types = {
    "CASH_IN": 0, 
    "CASH_OUT": 1, 
    "DEBIT": 2,
    "PAYMENT": 3,
    "TRANSFER": 4
}

In [49]:
def bool_to_int(x):
    if x:
        return 1
    else:
        return 0

def transaction_type_to_int(x):
    return transaction_types[x]

def account_name_to_int(x):
    return names_lookup_broadcast.value[x]

udf_transaction_type_to_int = udf(transaction_type_to_int, returnType = IntegerType())
udf_bool_to_int = udf(bool_to_int, returnType = IntegerType())
udf_account_name_to_int = udf(account_name_to_int, returnType = IntegerType())

In [50]:
df_cleaned.columns

['step', 'type', 'amount', 'nameOrig', 'oldbalanceOrg', 'newbalanceOrg', 'nameDest', 'oldbalanceDest', 'newbalanceDest', 'isFraud']

In [51]:
df_cleaned = df_cleaned \
        .withColumn("isFraud_int", udf_bool_to_int("isFraud")) \
        .withColumn("type_int", udf_transaction_type_to_int("type")) \
        .withColumn("nameOrig_int", udf_account_name_to_int("nameOrig")) \
        .withColumn("nameDest_int", udf_account_name_to_int("nameDest")) \
        .select(
                col("type_int").alias("type"),  
                col("isFraud_int").alias("isFraud"),
                col("nameOrig_int").alias("nameOrig"),
                col("nameDest_int").alias("nameDest"),
                "step",
                "amount",
                "oldbalanceOrg",
                "newbalanceOrg",
                "oldbalanceDest",
                "newbalanceDest"
               )

In [52]:
df_cleaned = df_cleaned.repartition(100)

In [45]:
df_cleaned.show(5)

+----+-------+--------+--------+-----+---------+-------------+-------------+--------------+--------------+
|type|isFraud|nameOrig|nameDest| step|   amount|oldbalanceOrg|newbalanceOrg|oldbalanceDest|newbalanceDest|
+----+-------+--------+--------+-----+---------+-------------+-------------+--------------+--------------+
|   1|      0| 1310881| 7838458| 39.0|118267.24|          0.0|          0.0|     151104.11|     269371.34|
|   1|      0| 1575152| 6357476|161.0| 57609.85|          0.0|          0.0|     101361.11|     158970.95|
|   3|      0| 1936818|  285856|402.0|  7985.94|          0.0|          0.0|           0.0|           0.0|
|   1|      0|  734760|  888668|252.0|245800.06|      48236.0|          0.0|      973947.0|     1146464.5|
|   3|      0|  532527| 9005312|325.0|  3330.45|          0.0|          0.0|           0.0|           0.0|
+----+-------+--------+--------+-----+---------+-------------+-------------+--------------+--------------+
only showing top 5 rows

In [38]:
df_cleaned.printSchema()

root
 |-- type: integer (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- nameOrig: integer (nullable = true)
 |-- nameDest: integer (nullable = true)
 |-- step: float (nullable = true)
 |-- amount: float (nullable = true)
 |-- oldbalanceOrg: float (nullable = true)
 |-- newbalanceOrg: float (nullable = true)
 |-- oldbalanceDest: float (nullable = true)
 |-- newbalanceDest: float (nullable = true)

## Plotting

In [None]:
# Register the DataFrame as a SQL temporary view
df_cleaned.createOrReplaceTempView("trx")

In [None]:
%%help

### Transaction Type Pie Chart

In [None]:
transaction_types_df = spark.sql("SELECT type from trx")

In [None]:
%%spark -o transaction_types_df

In [None]:
%%local
transaction_types_df

## isFraud pie chart

In [None]:
is_fraud_df = spark.sql("SELECT isFraud from trx")

In [None]:
%%spark -o is_fraud_df

In [None]:
%%local
is_fraud_df

### Transaction Amount Histogram

In [None]:
%%sql -o amount -q
SELECT amount from trx

In [None]:
%%local
rcParams['figure.figsize'] = 20, 15
n, bins, patches = plt.hist(amount.values, bins=100, facecolor='green', alpha=0.75)
plt.xlabel('Transaction Amount')
plt.ylabel('Count')
plt.title("Transaction Amount Distribution")
plt.show()

## Save Cleaned DataFrame to HopsFS

In [40]:
df_cleaned.printSchema()

root
 |-- type: integer (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- nameOrig: integer (nullable = true)
 |-- nameDest: integer (nullable = true)
 |-- step: float (nullable = true)
 |-- amount: float (nullable = true)
 |-- oldbalanceOrg: float (nullable = true)
 |-- newbalanceOrg: float (nullable = true)
 |-- oldbalanceDest: float (nullable = true)
 |-- newbalanceDest: float (nullable = true)

In [41]:
df_cleaned.show(5)

+----+-------+--------+--------+----+--------+-------------+-------------+--------------+--------------+
|type|isFraud|nameOrig|nameDest|step|  amount|oldbalanceOrg|newbalanceOrg|oldbalanceDest|newbalanceDest|
+----+-------+--------+--------+----+--------+-------------+-------------+--------------+--------------+
|   3|      0|    null|    null| 1.0| 9839.64|     170136.0|    160296.36|           0.0|           0.0|
|   3|      0|    null|    null| 1.0| 1864.28|      21249.0|     19384.72|           0.0|           0.0|
|   4|      1|    null|    null| 1.0|   181.0|        181.0|          0.0|           0.0|           0.0|
|   1|      1|    null|    null| 1.0|   181.0|        181.0|          0.0|       21182.0|           0.0|
|   3|      0|    null|    null| 1.0|11668.14|      41554.0|     29885.86|           0.0|           0.0|
+----+-------+--------+--------+----+--------+-------------+-------------+--------------+--------------+
only showing top 5 rows

In [53]:
df_cleaned.write.mode('overwrite').option("header", "true").csv(OUTPUT_DATASET_PATH)

## Save Metadata to HopsFS

In [79]:
schema = StructType([StructField("account_name", StringType(), nullable=False)])

In [88]:
names_rdd = spark.sparkContext.parallelize(names).map(lambda x: Row(account_name = str(x)))

In [92]:
names_df = names_rdd.toDF()

In [93]:
names_int_df = names_df.withColumn("account_name_int", udf_account_name_to_int("account_name"))

In [94]:
names_int_df.printSchema()

root
 |-- account_name: string (nullable = true)
 |-- account_name_int: integer (nullable = true)

In [96]:
names_int_df.write.mode('overwrite').option("header", "true").csv(OUTPUT_METADATA_PATH + "/account_names")

In [100]:
list(transaction_types.keys())

['CASH_IN', 'CASH_OUT', 'DEBIT', 'PAYMENT', 'TRANSFER']

In [101]:
list(transaction_types.values())

[0, 1, 2, 3, 4]

In [103]:
transasction_types_int_rdd = spark.sparkContext.parallelize(transaction_types.keys()) \
     .map(lambda x: Row(transaction_type = str(x), transaction_int = transaction_types[str(x)]))

In [106]:
transasction_types_int_df = transasction_types_int_rdd.toDF()

In [107]:
transasction_types_int_df.printSchema()

root
 |-- transaction_int: long (nullable = true)
 |-- transaction_type: string (nullable = true)

In [108]:
transasction_types_int_df.coalesce(1).write.mode('overwrite').option("header", "true").csv(OUTPUT_METADATA_PATH + "/transaction_types")