## PaySim Dataset

##### <b>step</b> - integer - maps a unit of time in the real world. In this case 1 step is 1 hour of time. Total steps 744 (30 days simulation).

##### <b>type</b> - string/categorical - type of transaction: CASH_IN, CASH_OUT, DEBIT, PAYMENT and TRANSFER.

##### <b>amount</b> - float - amount of the transaction in local currency.

##### <b>nameOrig</b> - string - customer who initiated the transaction

##### <b>oldbalanceOrg</b> - float initial balance before the transaction

##### <b>newbalanceOrig</b> - float - new balance after the transaction

##### <b>nameDest</b> - string - customer who is the recipient of the transaction

##### <b>oldbalanceDest</b> - float - initial balance of recipient before the transaction.

##### <b>newbalanceDest</b> - float - new balance of recipient after the transaction.

##### <b>fraud</b> - boolean/binary - determines if transaction is fraudulent

In [1]:
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log
71,application_1598227897403_0035,pyspark,idle,Link,Link


SparkSession available as 'spark'.
<pyspark.sql.session.SparkSession object at 0x7f7a6d5d34a8>

In [2]:
import hashlib
from graphframes import *
from pyspark.sql import functions as func

In [3]:
df = spark.read.load("hdfs:///Projects/paysim/paysim_Training_Datasets/PaySim/paysim_month2.parquet") 

In [4]:
def hashnode(x):
    return hashlib.sha1(x.encode("UTF-8")).hexdigest()[:8]

def extract_node_type(x):
    if (x.startswith("C")):
        node_type = 0
    elif (x.startswith("B")):
        node_type = 1
    elif (x.startswith("M")):
        node_type = 2
    else:
        node_type = 99
    return node_type

def extract_fraudster(x):
    if (x.startswith("CF")):
        fraudster = 1
    else:
        fraudster = 0
    return fraudster

def action_2_code(x):
    if (x == "CASH_IN"):
        node_type = 0
    elif (x == "CASH_OUT"):
        node_type = 1
    elif (x == "DEBIT"):
        node_type = 2
    elif (x == "PAYMENT"):
        node_type = 3
    elif (x == "TRANSFER"):
        node_type = 4
    elif (x == "DEPOSIT"):
        node_type = 4        
    else:
        node_type = 99
    return node_type

hashnode_udf = func.udf(hashnode)
extract_fraudster_udf = func.udf(extract_fraudster)
node_type_udf = func.udf(extract_node_type)
action_2_code_udf = func.udf(action_2_code)

In [5]:
df = df.withColumn("label", func.when(func.col("fraud") == "true", 1 ).otherwise(0))\
        .withColumn("source", hashnode_udf("nameOrig"))\
        .withColumn("target", hashnode_udf("nameDest"))\
        .withColumn("source_type", node_type_udf("nameOrig"))\
        .withColumn("target_type", node_type_udf("nameDest"))\
        .withColumn("action", action_2_code_udf("action"))\



In [6]:
# Calculate the differences between originating and destination balances
df = df.withColumn("orgDiff", df.newBalanceOrig - df.oldBalanceOrig).withColumn("destDiff", df.newBalanceDest - df.oldBalanceDest)

In [7]:
transaction_features = df.select("source","target", "source_type","target_type","step","action","amount","oldBalanceOrig","newBalanceOrig","oldBalanceDest","newBalanceDest","label","orgDiff","destDiff")\
                           .toDF("source","target", "source_type","target_type","step","action","amount","oldbalance_orig","newbalance_orig","oldbalance_dest","newbalance_dest","label","org_diff","dest_diff")

transaction_features.show()

+--------+--------+-----------+-----------+----+------+------------------+------------------+------------------+---------------+---------------+-----+------------------+---------+
|  source|  target|source_type|target_type|step|action|            amount|   oldbalance_orig|   newbalance_orig|oldbalance_dest|newbalance_dest|label|          org_diff|dest_diff|
+--------+--------+-----------+-----------+----+------+------------------+------------------+------------------+---------------+---------------+-----+------------------+---------+
|4974242a|cd23fd61|          0|          2|   1|     0| 170256.5174409235| 59.45919938690031| 170315.9766403104|            0.0|            0.0|    0| 170256.5174409235|      0.0|
|edffbfea|47288a7f|          0|          2|   1|     0|158752.58478194074|252.22923065781498|159004.81401259854|            0.0|            0.0|    0|158752.58478194074|      0.0|
|edffbfea|314855ff|          0|          2|   1|     0|159285.57816375262|159004.81401259854|318290.

In [8]:
node_features = df.select(func.col("nameOrig").alias("id"), func.col("nameOrig").alias("type")).union(df.select(func.col("nameDest").alias("id"), func.col("nameDest").alias("type"))).distinct()

node_features = node_features.withColumn("label", extract_fraudster_udf("id")).withColumn("type", node_type_udf("type")).withColumn("id", hashnode_udf("id")) 


node_features.show()

+--------+----+-----+
|      id|type|label|
+--------+----+-----+
|dc06429a|   0|    0|
|d1832e49|   0|    0|
|f9ec4c6b|   0|    1|
|cb8ace5c|   0|    0|
|09a40820|   0|    0|
|8e0a0c88|   0|    1|
|8f229b69|   0|    1|
|1bbf9e1b|   2|    0|
|e57eaa76|   0|    1|
|6ea6b5d3|   2|    0|
|da5b86ab|   0|    1|
|a1891a55|   0|    1|
|065d3995|   2|    0|
|d540b980|   0|    0|
|a8858658|   0|    1|
|409b5a60|   0|    0|
|2b8b8b5c|   0|    0|
|b9282016|   0|    0|
|b7150334|   0|    0|
|67f027a3|   0|    0|
+--------+----+-----+
only showing top 20 rows

In [9]:
transaction_features.show()

+--------+--------+-----------+-----------+----+------+------------------+------------------+------------------+---------------+---------------+-----+------------------+---------+
|  source|  target|source_type|target_type|step|action|            amount|   oldbalance_orig|   newbalance_orig|oldbalance_dest|newbalance_dest|label|          org_diff|dest_diff|
+--------+--------+-----------+-----------+----+------+------------------+------------------+------------------+---------------+---------------+-----+------------------+---------+
|4974242a|cd23fd61|          0|          2|   1|     0| 170256.5174409235| 59.45919938690031| 170315.9766403104|            0.0|            0.0|    0| 170256.5174409235|      0.0|
|edffbfea|47288a7f|          0|          2|   1|     0|158752.58478194074|252.22923065781498|159004.81401259854|            0.0|            0.0|    0|158752.58478194074|      0.0|
|edffbfea|314855ff|          0|          2|   1|     0|159285.57816375262|159004.81401259854|318290.

In [10]:
from hops import featurestore

featurestore.create_featuregroup(
    transaction_features,
    "transaction_features",
    description="transaction features",
    descriptive_statistics=True,
    feature_correlation=True,
    feature_histograms=True,
    cluster_analysis=True,
    featurestore=featurestore.project_featurestore(),
    featuregroup_version= featurestore.get_latest_featuregroup_version("transaction_features") + 1       
)


computing descriptive statistics for : transaction_features, version: 3
computing feature correlation for: transaction_features, version: 3
computing feature histograms for: transaction_features, version: 3
computing cluster analysis for: transaction_features, version: 3
Registering feature metadata...
Registering feature metadata... [COMPLETE]
Writing feature data to offline feature group (Hive)...
Running sql: use paysim_featurestore against offline feature store
Writing feature data to offline feature group (Hive)... [COMPLETE]
Feature group created successfully

In [11]:
featurestore.create_featuregroup(
    node_features,
    "node_features",
    description="node features",
    descriptive_statistics=True,
    feature_correlation=True,
    feature_histograms=True,
    cluster_analysis=True,
    featurestore=featurestore.project_featurestore(),
    featuregroup_version= featurestore.get_latest_featuregroup_version("node_features") + 1       

)


computing descriptive statistics for : node_features, version: 3
computing feature correlation for: node_features, version: 3
Could not compute feature correlation for: node_features, version: 3, set the optional argument feature_correlation=False to skip this step,
 error: The provided spark dataframe does not contain any numeric columns. Cannot compute feature correlation on categorical columns. The numeric datatypes are: ['bigint', 'decimal', 'integer', 'int', 'double', 'long', 'float', 'short'] and the number of numeric datatypes in the dataframe is: 0 ([])
computing feature histograms for: node_features, version: 3
computing cluster analysis for: node_features, version: 3
Could not compute cluster analysis for: node_features, version: 3, set the optional argument cluster_analysis=False to skip this step,
 error: The provided spark dataframe does not contain any numeric columns. Cannot compute cluster analysis with k-means on categorical columns. The numeric datatypes are: ['bigint

In [12]:
featurestore.create_training_dataset(
    transaction_features, "transaction_features_csv",
    description="transaction features for aml classification",
    featurestore=featurestore.project_featurestore(),
    data_format="csv",
    training_dataset_version=featurestore.get_latest_training_dataset_version("transaction_features_csv") + 1,
    descriptive_statistics=True,
    feature_correlation=True,
    feature_histograms=True,
    cluster_analysis=True,
    stat_columns=None)


computing descriptive statistics for : transaction_features_csv, version: 3
computing feature correlation for: transaction_features_csv, version: 3
computing feature histograms for: transaction_features_csv, version: 3
computing cluster analysis for: transaction_features_csv, version: 3
Training Dataset created successfully

In [13]:
featurestore.create_training_dataset(
    node_features.repartition(1), "node_features_csv",
    description="node features for aml classification",
    featurestore=featurestore.project_featurestore(),
    data_format="csv",
    training_dataset_version=featurestore.get_latest_training_dataset_version("node_features_csv") + 1,
    descriptive_statistics=True,
    feature_correlation=True,
    feature_histograms=True,
    cluster_analysis=True,
    stat_columns=None)


computing descriptive statistics for : node_features_csv, version: 3
computing feature correlation for: node_features_csv, version: 3
Could not compute feature correlation for: node_features_csv, version: 3, set the optional argument feature_correlation=False to skip this step,
 error: The provided spark dataframe does not contain any numeric columns. Cannot compute feature correlation on categorical columns. The numeric datatypes are: ['bigint', 'decimal', 'integer', 'int', 'double', 'long', 'float', 'short'] and the number of numeric datatypes in the dataframe is: 0 ([])
computing feature histograms for: node_features_csv, version: 3
computing cluster analysis for: node_features_csv, version: 3
Could not compute cluster analysis for: node_features_csv, version: 3, set the optional argument cluster_analysis=False to skip this step,
 error: The provided spark dataframe does not contain any numeric columns. Cannot compute cluster analysis with k-means on categorical columns. The numeric