# Batch Ingestion
**This notebook aggregates raw features into new derived features that is used for Fraud Detection model training/inference.**

---

## Contents

1. [Background](#Background)
1. [Setup](#Setup)
1. [Create PySpark Processing Script](#Create-PySpark-Processing-Script)
1. [Run SageMaker Processing Job](#Run-SageMaker-Processing-Job)
1. [Explore Aggregated Features](#Explore-Aggregated-Features)
1. [Validate Feature Group for Records](#Validate-Feature-Group-for-Records)

### Background

- This notebook takes raw credit card transactions data (csv) generated by 
[notebook 0](./0_prepare_transactions_dataset.ipynb) and aggregates the raw features to create new features (ratios) via <b>SageMaker Processing</b> PySpark Job. These aggregated features alongside the raw original features will be leveraged in the training phase of a Credit Card Fraud Detection model in the next step (see notebook [notebook 3](./3_train_and_deploy_model.ipynb)).

- As part of the Spark job, we also select the latest weekly aggregated features - `num_trans_last_1w` and `avg_amt_last_1w` grouped by `cc_num` (credit card number) and populate these features into the <b>SageMaker Online Feature Store</b> as a feature group. This feature group (`cc-agg-batch-fg`) was created in notebook [notebook 1](./1_setup.ipynb).

- [Amazon SageMaker Processing](https://aws.amazon.com/about-aws/whats-new/2020/09/amazon-sagemaker-processing-now-supports-built-in-spark-containers-for-big-data-processing/) lets customers run analytics jobs for data engineering and model evaluation on Amazon SageMaker easily and at scale. It provides a fully managed Spark environment for data processing or feature engineering workloads.

<img src="./images/batch_ingestion.png" />

### Setup

#### Imports 

In [5]:
from sagemaker.spark.processing import PySparkProcessor
import pandas as pd
import numpy as np
import sagemaker
import logging
import random
import boto3

In [6]:
print(f'Using SageMaker version: {sagemaker.__version__}')

Using SageMaker version: 2.48.1


#### Setup Logger

In [7]:
logger = logging.getLogger('sagemaker')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

In [8]:
logger.info('[Batch Aggregation using SageMaker PySpark Processing Job]')

[Batch Aggregation using SageMaker PySpark Processing Job]


#### Essentials

In [9]:
sagemaker_role = sagemaker.get_execution_role()
BUCKET = 'chime-fs-demo'
INPUT_KEY_PREFIX = 'raw'
OUTPUT_KEY_PREFIX = 'aggregated'
LOCAL_DIR = './data'

### Create PySpark Script
This PySpark script does the following:

1. Aggregates raw features to derive new features (ratios).
2. Saves the aggregated features alongside the original raw features into a CSV file and writes it to S3 - will be used in the next step for model training.
3. Groups the aggregated features by credit card number and picks selected aggregated features to write to SageMaker Feature Store (Online). <br>
<b>Note: </b> The feature group was created in the previous notebook (`1_setup.ipynb`)

In [3]:
%%writefile batch_aggregation_train1.py
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType, LongType
from pyspark.sql.functions import desc, dense_rank
from pyspark.sql import SparkSession, DataFrame
from  argparse import Namespace, ArgumentParser
from pyspark.sql.window import Window
import argparse
import logging
import boto3
import time
import sys
import os


TOTAL_UNIQUE_USERS = 10000
FEATURE_GROUP = 'cc-agg-batch-chime-fg'
TRAIN_FEATURE_GROUP = 'cc-train-chime-fg'

logger = logging.getLogger('sagemaker')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


feature_store_client = boto3.client(service_name='sagemaker-featurestore-runtime')


def parse_args() -> Namespace:
    parser = ArgumentParser(description='Spark Job Input and Output Args')
    parser.add_argument('--s3_input_bucket', type=str, help='S3 Input Bucket')
    parser.add_argument('--s3_input_key_prefix', type=str, help='S3 Input Key Prefix')
    parser.add_argument('--s3_output_bucket', type=str, help='S3 Output Bucket')
    parser.add_argument('--s3_output_key_prefix', type=str, help='S3 Output Key Prefix')
    args = parser.parse_args()
    return args
    

def define_schema() -> StructType:
    schema = StructType([StructField('tid', StringType(), True),
                         StructField('datetime', TimestampType(), True),
                         StructField('cc_num', LongType(), True),
                         StructField('amount', DoubleType(), True),
                         StructField('fraud_label', StringType(), True)])
    return schema


def aggregate_features(args: Namespace, schema: StructType, spark: SparkSession) -> DataFrame:
    logger.info('[Read Raw Transactions Data as Spark DataFrame]')
    transactions_df = spark.read.csv(f's3a://{os.path.join(args.s3_input_bucket, args.s3_input_key_prefix)}', \
                                     header=False, \
                                     schema=schema)
    logger.info('[Aggregate Transactions to Derive New Features using Spark SQL]')
    query = """
    SELECT *, \
           avg_amt_last_10m/avg_amt_last_1w AS amt_ratio1, \
           amount/avg_amt_last_1w AS amt_ratio2, \
           num_trans_last_10m/num_trans_last_1w AS count_ratio \
    FROM \
        ( \
        SELECT *, \
               COUNT(*) OVER w1 as num_trans_last_10m, \
               AVG(amount) OVER w1 as avg_amt_last_10m, \
               COUNT(*) OVER w2 as num_trans_last_1w, \
               AVG(amount) OVER w2 as avg_amt_last_1w \
        FROM transactions_df \
        WINDOW \
               w1 AS (PARTITION BY cc_num order by cast(datetime AS timestamp) RANGE INTERVAL 10 MINUTE PRECEDING), \
               w2 AS (PARTITION BY cc_num order by cast(datetime AS timestamp) RANGE INTERVAL 1 WEEK PRECEDING) \
        ) 
    """
    transactions_df.registerTempTable('transactions_df')
    aggregated_features = spark.sql(query)
    return aggregated_features


def write_to_s3(args: Namespace, aggregated_features: DataFrame) -> None:
    logger.info('[Write Aggregated Features to S3]')
    aggregated_features.coalesce(1) \
                       .write.format('com.databricks.spark.csv') \
                       .option('header', True) \
                       .mode('overwrite') \
                       .option('sep', ',') \
                       .save('s3a://' + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix))
    
def group_by_card_number(aggregated_features: DataFrame) -> DataFrame: 
    logger.info('[Group Aggregated Features by Card Number]')
    window = Window.partitionBy('cc_num').orderBy(desc('datetime'))
    sorted_df = aggregated_features.withColumn('rank', dense_rank().over(window))
    grouped_df = sorted_df.filter(sorted_df.rank == 1).drop(sorted_df.rank)
    sliced_df = grouped_df.select('cc_num', 'num_trans_last_1w', 'avg_amt_last_1w')
    return sliced_df


def transform_row(sliced_df: DataFrame) -> list:
    logger.info('[Transform Spark DataFrame Row to SageMaker Feature Store Record]')
    records = []
    for row in sliced_df.rdd.collect():
        record = []
        cc_num, num_trans_last_1w, avg_amt_last_1w = row
        if cc_num:
            record.append({'ValueAsString': str(cc_num), 'FeatureName': 'cc_num'})
            record.append({'ValueAsString': str(num_trans_last_1w), 'FeatureName': 'num_trans_last_1w'})
            record.append({'ValueAsString': str(round(avg_amt_last_1w, 2)), 'FeatureName': 'avg_amt_last_1w'})
            records.append(record)
    return records


def write_to_feature_store(records: list) -> None:
    logger.info('[Write Grouped Features to SageMaker Feature Store]')
    success, fail = 0, 0
    for record in records:
        event_time_feature = {
                'FeatureName': 'trans_time',
                'ValueAsString': str(int(round(time.time())))
            }
        record.append(event_time_feature)
        response = feature_store_client.put_record(FeatureGroupName=FEATURE_GROUP, Record=record)
        if response['ResponseMetadata']['HTTPStatusCode'] == 200:
            success += 1
        else:
            fail += 1
    logger.info('Success = {}'.format(success))
    logger.info('Fail = {}'.format(fail))
    assert success == TOTAL_UNIQUE_USERS
    assert fail == 0

def transform_row_train(df: DataFrame) -> list:
    logger.info('[Transform Spark DataFrame Row to SageMaker Feature Store Record]')
    records = []
    for row in df.rdd.collect():
        record = []
        tid,cc_num, amount, fraud_label, num_trans_last_10m, avg_amt_last_10m, num_trans_last_1w, avg_amt_last_1w,datetime, amt_ratio1,amt_ratio2,count_ratio = row
        if tid:
            record.append({'ValueAsString': str(tid), 'FeatureName': 'tid'})
            record.append({'ValueAsString': str(cc_num), 'FeatureName': 'cc_num'})
            record.append({'ValueAsString': str(round(amount, 2)), 'FeatureName': 'amount'})
            record.append({'ValueAsString': str(fraud_label), 'FeatureName': 'fraud_label'})
            record.append({'ValueAsString': str(num_trans_last_1w), 'FeatureName': 'num_trans_last_1w'})
            record.append({'ValueAsString': str(round(avg_amt_last_1w, 2)), 'FeatureName': 'avg_amt_last_1w'})
            record.append({'ValueAsString': str(datetime), 'FeatureName': 'datetime'})
            record.append({'ValueAsString': str(num_trans_last_10m), 'FeatureName': 'num_trans_last_10m'})
            record.append({'ValueAsString': str(round(avg_amt_last_10m, 2)), 'FeatureName': 'avg_amt_last_10m'})
            record.append({'ValueAsString': str(round(amt_ratio1, 2)), 'FeatureName': 'amt_ratio1'})
            record.append({'ValueAsString': str(round(amt_ratio2, 2)), 'FeatureName': 'amt_ratio2'})
            record.append({'ValueAsString': str(round(count_ratio, 2)), 'FeatureName': 'count_ratio'})
            records.append(record)
    return records

def write_to_train_feature_store(records: list) -> None:
    logger.info('[Write Grouped Features to SageMaker Feature Store]')
    success, fail = 0, 0
    for record in records:
        event_time_feature = {
                'FeatureName': 'datetime',
                'ValueAsString': str(int(round(time.time())))
            }
        record.append(event_time_feature)
        response = feature_store_client.put_record(FeatureGroupName='cc-train-chime-fg', Record=record)
        if response['ResponseMetadata']['HTTPStatusCode'] == 200:
            success += 1
        else:
            fail += 1
    logger.info('Success = {}'.format(success))
    logger.info('Fail = {}'.format(fail))
    assert success == TOTAL_UNIQUE_USERS
    assert fail == 0
    
def run_spark_job():
    spark = SparkSession.builder.appName('PySparkJob').getOrCreate()
    args = parse_args()
    schema = define_schema()
    aggregated_features = aggregate_features(args, schema, spark)
    
#     records = transform_row_train(aggregated_features)
#     write_to_train_feature_store(records)
#     write_to_s3(args, aggregated_features)
#     sliced_df = group_by_card_number(aggregated_features)
#     records = transform_row(sliced_df)
#     write_to_feature_store(records)

    
    
if __name__ == '__main__':
    run_spark_job()

Overwriting batch_aggregation_train.py


### Run SageMaker Processing Job

In [20]:
spark_processor = PySparkProcessor(base_job_name='chime-fs-demo-train', 
                                   framework_version='2.4', # spark version
                                   role=sagemaker_role, 
                                   instance_count=1, 
                                   instance_type='ml.m5.12xlarge', 
                                   env={'AWS_DEFAULT_REGION': boto3.Session().region_name},
                                   max_runtime_in_seconds=1200)

In [21]:
%%time

spark_processor.run(submit_app='batch_aggregation_train.py', 
                    arguments=['--s3_input_bucket', BUCKET, 
                               '--s3_input_key_prefix', INPUT_KEY_PREFIX, 
                               '--s3_output_bucket', BUCKET, 
                               '--s3_output_key_prefix', OUTPUT_KEY_PREFIX],
                    spark_event_logs_s3_uri='s3://{}/logs'.format(BUCKET),
                    logs=False)

Creating processing-job with name chime-fs-demo-train-2023-04-03-01-07-19-973



Job Name:  chime-fs-demo-train-2023-04-03-01-07-19-973
Inputs:  [{'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-461312420708/chime-fs-demo-train-2023-04-03-01-07-19-973/input/code/batch_aggregation_train.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'output-1', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://chime-fs-demo/logs', 'LocalPath': '/opt/ml/processing/spark-events/', 'S3UploadMode': 'Continuous'}}]
.............................................................................................*

UnexpectedStatusException: Error for Processing job chime-fs-demo-train-2023-04-03-01-07-19-973: Failed. Reason: AlgorithmError: See job logs for more information

### Explore Aggregated Features 
<p> The SageMaker Processing Job above creates the aggregated features alongside the raw features and writes it to S3. 
Let us verify this output using the code below and prep it to be used in the next step for model training.</p>


Copy results csv from S3 to local directory

In [13]:
!rm {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/part*.csv

rm: cannot remove ‘./data/aggregated/part*.csv’: No such file or directory


In [14]:
!aws s3 cp s3://{BUCKET}/{OUTPUT_KEY_PREFIX}/ {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/ --recursive --exclude '_SUCCESS'

download: s3://chime-fs-demo/aggregated/part-00000-1bc78c44-62fe-4425-8ac9-9b9a367da961-c000.csv to data/aggregated/part-00000-1bc78c44-62fe-4425-8ac9-9b9a367da961-c000.csv


In [15]:
!mv {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/part*.csv {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/part.csv 

In [16]:
agg_features = pd.read_csv(f'{LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/part.csv')
agg_features.dropna(inplace=True)
agg_features['cc_num'] = agg_features['cc_num'].astype(np.int64)
agg_features['fraud_label'] = agg_features['fraud_label'].astype(np.int64)
agg_features.head()

Unnamed: 0,tid,datetime,cc_num,amount,fraud_label,num_trans_last_10m,avg_amt_last_10m,num_trans_last_1w,avg_amt_last_1w,amt_ratio1,amt_ratio2,count_ratio
0,d621c8d794262ad5e8ad804cb4517395,2020-01-01T20:52:20.000Z,4006080197832643,8911.09,0,1,8911.09,1,8911.09,1.0,1.0,1.0
1,daa28b6f0e729f48563ee7ea945f910c,2020-01-02T00:51:07.000Z,4006080197832643,65.15,0,1,65.15,2,4488.12,0.014516,0.014516,0.5
2,c4f86514d36cf92be555c122f2faae57,2020-01-02T01:08:17.000Z,4006080197832643,4865.93,0,1,4865.93,3,4614.056667,1.054588,1.054588,0.333333
3,0758642b10c11900f42b880c9cb1527b,2020-01-02T09:50:42.000Z,4006080197832643,11.92,0,1,11.92,4,3463.5225,0.003442,0.003442,0.25
4,79dbe8dbe6d193de62cc4adad8fc6d4f,2020-01-03T00:05:41.000Z,4006080197832643,626.46,0,1,626.46,5,2896.11,0.216311,0.216311,0.2


In [17]:
agg_features.to_csv(f'{LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/processing_output.csv', index=False)

Remove the intermediate `part.csv` file

In [18]:
!rm {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/part.csv

In [1]:
def transform_row(df) -> list:
    logger.info('[Transform Spark DataFrame Row to SageMaker Feature Store Record]')
    records = []
    for index, row in df.iterrows():
        record = []
        tid,cc_num, amount, fraud_label, num_trans_last_10m, avg_amt_last_10m, num_trans_last_1w, avg_amt_last_1w,datetime, amt_ratio1,amt_ratio2,count_ratio = row
        if tid:
            record.append({'ValueAsString': str(tid), 'FeatureName': 'tid'})
            record.append({'ValueAsString': str(cc_num), 'FeatureName': 'cc_num'})
            record.append({'ValueAsString': str(round(amount, 2)), 'FeatureName': 'amount'})
            record.append({'ValueAsString': str(fraud_label), 'FeatureName': 'fraud_label'})
            record.append({'ValueAsString': str(num_trans_last_1w), 'FeatureName': 'num_trans_last_1w'})
            record.append({'ValueAsString': str(round(avg_amt_last_1w, 2)), 'FeatureName': 'avg_amt_last_1w'})
            record.append({'ValueAsString': str(datetime), 'FeatureName': 'datetime'})
            record.append({'ValueAsString': str(num_trans_last_10m), 'FeatureName': 'num_trans_last_10m'})
            record.append({'ValueAsString': str(round(avg_amt_last_10m, 2)), 'FeatureName': 'avg_amt_last_10m'})
            record.append({'ValueAsString': str(round(amt_ratio1, 2)), 'FeatureName': 'amt_ratio1'})
            record.append({'ValueAsString': str(round(amt_ratio2, 2)), 'FeatureName': 'amt_ratio2'})
            record.append({'ValueAsString': str(round(count_ratio, 2)), 'FeatureName': 'count_ratio'})
            records.append(record)
    return records

def write_to_feature_store(records: list) -> None:
    logger.info('[Write Grouped Features to SageMaker Feature Store]')
    success, fail = 0, 0
    for record in records:
        event_time_feature = {
                'FeatureName': 'datetime',
                'ValueAsString': str(int(round(time.time())))
            }
        record.append(event_time_feature)
        response = feature_store_client.put_record(FeatureGroupName='cc-train-chime-fg', Record=record)
        if response['ResponseMetadata']['HTTPStatusCode'] == 200:
            success += 1
        else:
            fail += 1
    logger.info('Success = {}'.format(success))
    logger.info('Fail = {}'.format(fail))
    assert success == TOTAL_UNIQUE_USERS
    assert fail == 0

import pandas as pd

df = pd.read_csv(f'{LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/processing_output.csv')
df.iloc[1]
# records = transform_row(df)
# write_to_feature_store(records)

NameError: name 'LOCAL_DIR' is not defined

### Validate Feature Group for Records
Let's randomly pick N credit card numbers from the `processing_output.csv` and verify if records exist in the feature group `cc-agg-batch-fg` for these card numbers.

In [23]:
N = 3 # number of random records to validate
FEATURE_GROUP = 'cc-agg-batch-fg'

In [24]:
processing_out_df = pd.read_csv(f'{LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/processing_output.csv')
processing_out_df.head()

Unnamed: 0,tid,datetime,cc_num,amount,fraud_label,num_trans_last_10m,avg_amt_last_10m,num_trans_last_1w,avg_amt_last_1w,amt_ratio1,amt_ratio2,count_ratio
0,afc4fe351c04c5b81de2437524fe7771,2020-01-01T12:54:54.000Z,4006080197832643,657.47,0,1,657.47,1,657.47,1.0,1.0,1.0
1,7fc373846810b3d5b70edcad77b5c944,2020-01-01T22:52:37.000Z,4006080197832643,11.17,0,1,11.17,2,334.32,0.033411,0.033411,0.5
2,eb37b0ec5dae2e6e49a9949a47b0ab4a,2020-01-01T23:47:32.000Z,4006080197832643,69.62,0,1,69.62,3,246.086667,0.282908,0.282908,0.333333
3,6d24d9b143ff8035f802f131ebfe993f,2020-01-02T03:45:45.000Z,4006080197832643,1187.41,0,1,1187.41,4,481.4175,2.466487,2.466487,0.25
4,ea52ff23a868b6a3ed2f45afc1c7f406,2020-01-02T05:06:46.000Z,4006080197832643,981.31,0,1,981.31,5,581.396,1.687851,1.687851,0.2


In [25]:
cc_nums = random.sample(processing_out_df['cc_num'].tolist(), N)
cc_nums

[4987034976018745, 4647964443566763, 4445477846392608]

Using SageMaker Feature Store run-time client, we can verify if records exist in the feature group for the picked `cc_nums` 

In [26]:
feature_store_client = boto3.Session().client(service_name='sagemaker-featurestore-runtime')

INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


In [27]:
success, fail = 0, 0
for cc_num in cc_nums:
    response = feature_store_client.get_record(FeatureGroupName=FEATURE_GROUP, 
                                               RecordIdentifierValueAsString=str(cc_num))
    if response['ResponseMetadata']['HTTPStatusCode'] == 200 and 'Record' in response.keys():
        success += 1
        print(response['Record'])
    else:
        print(response)
        fail += 1
assert success == N

[{'FeatureName': 'cc_num', 'ValueAsString': '4987034976018745'}, {'FeatureName': 'num_trans_last_1w', 'ValueAsString': '30'}, {'FeatureName': 'avg_amt_last_1w', 'ValueAsString': '363.93'}, {'FeatureName': 'trans_time', 'ValueAsString': '1679691906'}]
[{'FeatureName': 'cc_num', 'ValueAsString': '4647964443566763'}, {'FeatureName': 'num_trans_last_1w', 'ValueAsString': '24'}, {'FeatureName': 'avg_amt_last_1w', 'ValueAsString': '437.18'}, {'FeatureName': 'trans_time', 'ValueAsString': '1679691946'}]
[{'FeatureName': 'cc_num', 'ValueAsString': '4445477846392608'}, {'FeatureName': 'num_trans_last_1w', 'ValueAsString': '23'}, {'FeatureName': 'avg_amt_last_1w', 'ValueAsString': '367.41'}, {'FeatureName': 'trans_time', 'ValueAsString': '1679691975'}]
