# Batch Ingestion
**This notebook aggregates raw features into new derived features that is used for Fraud Detection model training/inference.**

---

## Contents

1. [Background](#Background)
1. [Setup](#Setup)
1. [Create PySpark Processing Script](#Create-PySpark-Processing-Script)
1. [Run SageMaker Processing Job](#Run-SageMaker-Processing-Job)
1. [Explore Aggregated Features](#Explore-Aggregated-Features)
1. [Validate Feature Group for Records](#Validate-Feature-Group-for-Records)

### Background

- This notebook takes raw credit card transactions data (csv) generated by 
[notebook 0](./0_prepare_transactions_dataset.ipynb) and aggregates the raw features to create new features (ratios) via PySpark Job. These aggregated features alongside the raw original features will be leveraged in the training phase of a Credit Card Fraud Detection model in the next step (see notebook [notebook 3](./3_train_and_deploy_model.ipynb)).

- As part of the Spark job, we also select the latest weekly aggregated features - `num_trans_last_1w` and `avg_amt_last_1w` grouped by `cc_num` (credit card number) and populate these features into the <b>SageMaker Online Feature Store</b> as a feature group. This feature group (`cc-agg-batch-fg`) was created in notebook [notebook 1](./1_setup.ipynb).

- [Hopsworks Processing](https://hopsworks.ai) lets customers run analytics jobs for data engineering and model evaluation on Hopsworks easily and at scale. It provides a fully managed Spark environment for data processing or feature engineering workloads.

### Setup

#### Imports 

In [82]:
import pandas as pd
import numpy as np
import logging
import random

#### Essentials

### Create PySpark Script
This PySpark script does the following:

1. Aggregates raw features to derive new features (ratios).
2. Saves the aggregated features alongside the original raw features into a CSV file and writes it to S3 - will be used in the next step for model training.
3. Groups the aggregated features by credit card number and picks selected aggregated features to write to Hopsworks Feature Store (Online). <br>
<b>Note: </b> The feature group was created in the previous notebook (`1_setup.ipynb`)

In [83]:
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType, LongType
from pyspark.sql.functions import desc, dense_rank
from pyspark.sql import SparkSession, DataFrame
from  argparse import Namespace, ArgumentParser
from pyspark.sql.window import Window
import time
import sys
import os


TOTAL_UNIQUE_USERS = 100 #10000
FEATURE_GROUP = 'cc-agg-batch-fg'
    
schema = StructType([StructField('tid', StringType(), True),
                         StructField('datetime', TimestampType(), True),
                         StructField('cc_num', LongType(), True),
                         StructField('amount', DoubleType(), True),
                         StructField('fraud_label', StringType(), True)])

# aggregated_features
#transactions_df = spark.read.format("csv").option("header", "true").schema(schema).load("hdfs:///Projects/realtime/Resources/transactions.csv")

transactions_df = spark.read.csv(path="hdfs:///Projects/realtime/Resources/transactions.csv", inferSchema=True, header=True, sep=",") #schema=schema, 

In [84]:
transactions_df.printSchema()

root
 |-- amount: double (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- fraud_label: integer (nullable = true)
 |-- tid: string (nullable = true)

In [85]:
transactions_df.show()

+------+----------------+-------------------+-----------+--------------------+
|amount|          cc_num|           datetime|fraud_label|                 tid|
+------+----------------+-------------------+-----------+--------------------+
|  19.9|4653672048903767|2020-03-15 15:06:09|          0|df9058c1c293e6bd3...|
|  36.6|4170245277417751|2020-03-15 15:10:11|          0|b9902ddccead2a1ca...|
| 26.28|4978667132535671|2020-03-15 15:20:11|          0|2985420e04a29c7e1...|
|822.53|4332149413304557|2020-03-15 15:20:31|          0|e41a745ed61f9d875...|
| 652.2|4676706014866559|2020-03-15 15:26:45|          0|34108c0ff7bb8af9b...|
| 48.34|4978667132535671|2020-03-15 15:36:04|          0|3997b0cb3d7d39f02...|
| 18.55|4444037300542691|2020-03-15 15:41:04|          0|ac80ce12bcc81aa6e...|
| 67.22|4789490563144262|2020-03-15 15:45:31|          0|dd3a6f7ec38653709...|
|971.94|4829328237114208|2020-03-15 16:05:12|          0|553359d0e3e453bce...|
| 89.96|4829328237114208|2020-03-15 16:06:34|       

In [86]:
query = """
    SELECT *, \
           avg_amt_last_10m/avg_amt_last_1w AS amt_ratio1, \
           amount/avg_amt_last_1w AS amt_ratio2, \
           num_trans_last_10m/num_trans_last_1w AS count_ratio \
    FROM \
        ( \
        SELECT *, \
               COUNT(*) OVER w1 as num_trans_last_10m, \
               AVG(amount) OVER w1 as avg_amt_last_10m, \
               COUNT(*) OVER w2 as num_trans_last_1w, \
               AVG(amount) OVER w2 as avg_amt_last_1w \
        FROM transactions_df \
        WINDOW \
               w1 AS (PARTITION BY cc_num order by cast(datetime AS timestamp) RANGE INTERVAL 10 MINUTE PRECEDING), \
               w2 AS (PARTITION BY cc_num order by cast(datetime AS timestamp) RANGE INTERVAL 1 WEEK PRECEDING) \
        ) 
    """
transactions_df.registerTempTable('transactions_df')
aggregated_features = spark.sql(query)

In [87]:
"""
aggregated_features.coalesce(1) \
                   .write.format('com.databricks.spark.csv') \
                   .option('header', True) \
                   .mode('overwrite') \
                   .option('sep', ',') \
                   .save('s3a://' + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix))
"""
# group_by_card_number
window = Window.partitionBy('cc_num').orderBy(desc('datetime'))
sorted_df = aggregated_features.withColumn('rank', dense_rank().over(window))
grouped_df = sorted_df.filter(sorted_df.rank == 1).drop(sorted_df.rank)

"""
# TODO: will do this for TD transformation

sliced_df = grouped_df.select('cc_num', 'num_trans_last_1w', 'avg_amt_last_1w')

# process rows
records = []
for row in sliced_df.rdd.collect():
        record = []
        cc_num, num_trans_last_1w, avg_amt_last_1w = row
        if cc_num:
            record.append({'ValueAsString': str(cc_num), 'FeatureName': 'cc_num'})
            record.append({'ValueAsString': str(num_trans_last_1w), 'FeatureName': 'num_trans_last_1w'})
            record.append({'ValueAsString': str(round(avg_amt_last_1w, 2)), 'FeatureName': 'avg_amt_last_1w'})
            records.append(record)

# write_to_feature_store
success, fail = 0, 0
for record in records:
        event_time_feature = {
                'FeatureName': 'trans_time',
                'ValueAsString': str(int(round(time.time())))
            }
        record.append(event_time_feature)
        response = feature_store_client.put_record(FeatureGroupName=FEATURE_GROUP, Record=record)
        if response['ResponseMetadata']['HTTPStatusCode'] == 200:
            success += 1
        else:
            fail += 1
assert success == TOTAL_UNIQUE_USERS
assert fail == 0
"""

"\n# TODO: will do this for TD transformation\n\nsliced_df = grouped_df.select('cc_num', 'num_trans_last_1w', 'avg_amt_last_1w')\n\n# process rows\nrecords = []\nfor row in sliced_df.rdd.collect():\n        record = []\n        cc_num, num_trans_last_1w, avg_amt_last_1w = row\n        if cc_num:\n            record.append({'ValueAsString': str(cc_num), 'FeatureName': 'cc_num'})\n            record.append({'ValueAsString': str(num_trans_last_1w), 'FeatureName': 'num_trans_last_1w'})\n            record.append({'ValueAsString': str(round(avg_amt_last_1w, 2)), 'FeatureName': 'avg_amt_last_1w'})\n            records.append(record)\n\n# write_to_feature_store\nsuccess, fail = 0, 0\nfor record in records:\n        event_time_feature = {\n                'FeatureName': 'trans_time',\n                'ValueAsString': str(int(round(time.time())))\n            }\n        record.append(event_time_feature)\n        response = feature_store_client.put_record(FeatureGroupName=FEATURE_GROUP, Record=r

In [88]:
grouped_df.printSchema()

root
 |-- amount: double (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- fraud_label: integer (nullable = true)
 |-- tid: string (nullable = true)
 |-- num_trans_last_10m: long (nullable = false)
 |-- avg_amt_last_10m: double (nullable = true)
 |-- num_trans_last_1w: long (nullable = false)
 |-- avg_amt_last_1w: double (nullable = true)
 |-- amt_ratio1: double (nullable = true)
 |-- amt_ratio2: double (nullable = true)
 |-- count_ratio: double (nullable = true)

In [89]:
grouped_df.select("datetime").show(5)

+-------------------+
|           datetime|
+-------------------+
|2020-05-31 18:31:38|
|2020-05-31 19:57:01|
|2020-05-31 20:45:54|
|2020-05-31 23:43:01|
|2020-05-31 23:37:09|
+-------------------+
only showing top 5 rows

In [90]:
from pyspark.sql import functions as f
grouped_df = grouped_df.withColumn("date", f.from_unixtime(f.unix_timestamp(f.col("datetime")), "yyyy-MM-dd"))

In [91]:
grouped_df.select("date").show(5)

+----------+
|      date|
+----------+
|2020-05-31|
|2020-05-31|
|2020-05-31|
|2020-05-31|
|2020-05-31|
+----------+
only showing top 5 rows

In [92]:
# TODO (davit): investigate this
#grouped_df = grouped_df.repartition("date")

In [93]:
import hsfs

connection = hsfs.connection()
# get a reference to the feature store, you can access also shared feature stores by providing the feature store name
fs = connection.get_feature_store();

Connected. Call `.close()` to terminate connection gracefully.

In [94]:
transactions_fg = fs.create_feature_group(
    name = "transactions_fg", 
    description = "Transactions Feature Group",
    version=1,
    primary_key = ["tid"], 
    partition_key = ["date"], 
    time_travel_format = None
)
transactions_fg.save(grouped_df)

<hsfs.feature_group.FeatureGroup object at 0x7fe08cc7da50>

### Explore Features 

### Validate Feature Group for Records

### Statistics
