# Batch Ingestion
**This notebook aggregates raw features into new derived features that is used for Fraud Detection model training/inference.**

---

## Contents

1. [Background](#Background)
1. [Setup](#Setup)
1. [Create PySpark Processing Script](#Create-PySpark-Processing-Script)
1. [Run SageMaker Processing Job](#Run-SageMaker-Processing-Job)
1. [Explore Aggregated Features](#Explore-Aggregated-Features)
1. [Validate Feature Group for Records](#Validate-Feature-Group-for-Records)

**Recommended settings to run this notebook in SageMaker Studio:**

- Image: Data Science
- Kernel: Python3
- Instance type: <font color='blue'>ml.m5.large (2 vCPU + 8 GiB)</font>

### Background

- This notebook takes raw credit card transactions data (csv) generated by 
[notebook 0](./0_prepare_transactions_dataset.ipynb) and aggregates the raw features to create new features (ratios) via <b>SageMaker Processing</b> PySpark Job. These aggregated features alongside the raw original features will be leveraged in the training phase of a Credit Card Fraud Detection model in the next step (see notebook [notebook 3](./3_train_and_deploy_model.ipynb)).

- As part of the Spark job, we also select the latest weekly aggregated features - `num_trans_last_1w` and `avg_amt_last_1w` grouped by `cc_num` (credit card number) and populate these features into the <b>SageMaker Online Feature Store</b> as a feature group. This feature group (`cc-agg-batch-fg`) was created in notebook [notebook 1](./1_setup.ipynb).

- [Amazon SageMaker Processing](https://aws.amazon.com/about-aws/whats-new/2020/09/amazon-sagemaker-processing-now-supports-built-in-spark-containers-for-big-data-processing/) lets customers run analytics jobs for data engineering and model evaluation on Amazon SageMaker easily and at scale. It provides a fully managed Spark environment for data processing or feature engineering workloads.

![SegmentLocal](images/batch_ingestion.png "connection")

### Setup

#### Imports 

In [1]:
from sagemaker.spark.processing import PySparkProcessor
import pandas as pd
import numpy as np
import sagemaker
import logging
import random
import boto3
import time



sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [2]:
print(f'Using SageMaker version: {sagemaker.__version__}')

Using SageMaker version: 2.240.0


In [3]:
%store -r

#### Setup Logger

In [4]:
logger = logging.getLogger('sagemaker')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

In [5]:
logger.info('[Batch Aggregation using SageMaker PySpark Processing Job]')

[Batch Aggregation using SageMaker PySpark Processing Job]


#### Essentials

In [6]:
sagemaker_role = sagemaker.get_execution_role()
BUCKET = sagemaker.Session().default_bucket()
INPUT_KEY_PREFIX = 'raw_clicks'
OUTPUT_KEY_PREFIX = 'aggregated_clicks'
LOCAL_DIR = '/home/sagemaker-user/cautious-parakeet/notebooks/data'

### Create PySpark Script
This PySpark script does the following:

1. Aggregates raw features to derive new features (ratios).
2. Saves the aggregated features alongside the original raw features into a CSV file and writes it to S3 - will be used in the next step for model training.
3. Groups the aggregated features by credit card number and picks selected aggregated features to write to SageMaker Feature Store (Online). <br>
<b>Note: </b> The feature group was created in the previous notebook (`1_setup.ipynb`)

In [7]:
!pwd

/home/sagemaker-user/cautious-parakeet/notebooks


In [8]:
!aws s3 cp s3://{BUCKET}/{INPUT_KEY_PREFIX}/clickstream.csv .

download: s3://sagemaker-ap-southeast-1-850995562355/raw_clicks/clickstream.csv to ./clickstream.csv


In [9]:
print(f"s3://{BUCKET}/{INPUT_KEY_PREFIX}/clickstream.csv")

s3://sagemaker-ap-southeast-1-850995562355/raw_clicks/clickstream.csv


In [10]:
clickstread_df = pd.read_csv(f"./clickstream.csv")
# set(clickstread_df["event_type"])
# Index(['Unnamed: 0', 'event_id', 'timestamp', 'customer_id', 'session_id',
#        'event_type', 'product_id', 'product_category', 'price',
#        'order_in_session', 'purchased_items', 'total_amount',
#        'interaction_value', 'cumsum_interactions'],
#       dtype='object')
print(clickstread_df.columns)
print(f"TOTALLENGTH: {len(clickstread_df)}")
clickstread_df.head()

Index(['event_id', 'timestamp', 'customer_id', 'session_id', 'event_type',
       'product_id', 'product_category', 'price', 'order_in_session',
       'purchased_items', 'total_amount', 'interaction_value',
       'cumsum_interactions'],
      dtype='object')
TOTALLENGTH: 274


Unnamed: 0,event_id,timestamp,customer_id,session_id,event_type,product_id,product_category,price,order_in_session,purchased_items,total_amount,interaction_value,cumsum_interactions
0,9268038f-dbef-4e0c-80e2-a5d7866c6000,2025-03-01 02:01:52,25,0816bcd6-6298-4ab1-8cd8-a2a30bd4de10,page_view,453.0,books,192.33,1,,,4,4
1,bf953c32-9485-46e3-8fba-229a57e6e588,2025-03-01 02:01:56,25,0816bcd6-6298-4ab1-8cd8-a2a30bd4de10,click,453.0,books,38.01,2,,,1,5
2,e6bd47c5-1485-45b6-b347-56ddda312547,2025-03-01 02:02:03,25,0816bcd6-6298-4ab1-8cd8-a2a30bd4de10,page_view,453.0,books,457.72,3,,,4,9
3,38dd0f35-4b17-4255-99a8-b1370e7fd726,2025-03-01 02:02:09,25,0816bcd6-6298-4ab1-8cd8-a2a30bd4de10,page_view,396.0,home,175.48,4,,,4,13
4,04f79f4e-0bbc-4c1d-a6e5-276162623e98,2025-03-01 02:02:13,25,0816bcd6-6298-4ab1-8cd8-a2a30bd4de10,click,396.0,home,199.26,5,,,1,14


In [11]:
%%writefile batch_aggregation_user_behavior.py
from pyspark.sql.types import (StructField, StructType, StringType, DoubleType, 
                               TimestampType, LongType)
from pyspark.sql.functions import desc, dense_rank, col, when, count, avg, lower, trim
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.window import Window
from argparse import ArgumentParser, Namespace
import logging
import boto3
import time
import sys
import os

# Total unique users expected (for validation)
TOTAL_UNIQUE_USERS = 10000
# Replace with your actual feature group name for batch aggregated user behavior features
# aggregate_feature_group_name: cc-agg-fg
# aggregate_batch_feature_group_name: cc-agg-batch-fg
FEATURE_GROUP = 'cc-agg-batch-fg'

logger = logging.getLogger('sagemaker')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

feature_store_client = boto3.client(service_name='sagemaker-featurestore-runtime')

def parse_args() -> Namespace:
    parser = ArgumentParser(description='Spark Job Input and Output Args for User Behavior Aggregation')
    parser.add_argument('--s3_input_bucket', type=str, help='S3 Input Bucket')
    parser.add_argument('--s3_input_key_prefix', type=str, help='S3 Input Key Prefix')
    parser.add_argument('--s3_output_bucket', type=str, help='S3 Output Bucket')
    parser.add_argument('--s3_output_key_prefix', type=str, help='S3 Output Key Prefix')
    args = parser.parse_args()
    return args

def define_schema() -> StructType:
    # Define schema based on your sample CSV:
    # ['event_id', 'timestamp', 'customer_id', 'session_id', 'event_type',
    #    'product_id', 'product_category', 'price', 'order_in_session',
    #    'purchased_items', 'total_amount', 'interaction_value',
    #    'cumsum_interactions']%%!
    
    schema = StructType([
        # StructField('index', StringType(), True),  # if present as the first column
        StructField('event_id', StringType(), True),
        StructField('timestamp', TimestampType(), True),
        StructField('customer_id', LongType(), True),
        StructField('session_id', StringType(), True),
        StructField('event_type', StringType(), True),
        StructField('product_id', DoubleType(), True),
        StructField('product_category', StringType(), True),
        StructField('price', DoubleType(), True),
        StructField('order_in_session', LongType(), True),
        # These fields may be null if not applicable; adjust types as needed:
        StructField('purchased_items', LongType(), True),
        StructField('total_amount', DoubleType(), True),
        StructField('interaction_value', LongType(), True),
        StructField('cumsum_interactions', LongType(), True)
    ])
    return schema

def aggregate_features(args: Namespace, schema: StructType, spark: SparkSession) -> DataFrame:
    logger.info('[Loggs][Read User Behavior Data as Spark DataFrame]')
    input_path = f's3a://{os.path.join(args.s3_input_bucket, args.s3_input_key_prefix)}'
    events_df = spark.read.csv(input_path, header=True, schema=schema)

    events_df = spark.read.csv(input_path, header=True, schema=schema)
    distinct_events = events_df.select('event_type').distinct().collect()
    events_df.printSchema()
    logger.info(f"[Loggs] Total rows in input: {events_df.count()} --- {distinct_events}")
    
    logger.info('[Loggs][Filter and Aggregate Order Data]')
    # We assume that order events are indicated by event_type = 'order'
    # If your data marks orders in another way (e.g., non-null total_amount), adjust the filter accordingly.
    # orders_df = events_df.filter(col('event_type') == 'purchase')
    orders_df = events_df.filter(lower(trim(col('event_type'))) == 'purchase')

    logger.info(f"[Loggs]Order events count: {orders_df.count()}")
    
    # Define windows:
    # For batch aggregates, we compute aggregates over the past 1 week.
    window_1w = Window.partitionBy('customer_id')\
                      .orderBy(col('timestamp').cast("long"))\
                      .rangeBetween(-7 * 24 * 3600, 0)
    
    aggregated_df = orders_df.withColumn('total_orders_last_1w', count('*').over(window_1w)) \
                              .withColumn('avg_order_value_last_1w', avg(col('total_amount')).over(window_1w))
    
    # Optionally, you can compute additional features. For example, if you want to include a real-time like metric
    # computed over a shorter interval (e.g., last 5 minutes) for clicks:
    clicks_df = events_df.filter(col('event_type') == 'click')

    clicks_df = events_df.filter(col('event_type') == 'click')
    logger.info(f"[Loggs]Click events count: {clicks_df.count()}")

    window_5m = Window.partitionBy('customer_id')\
                      .orderBy(col('timestamp').cast("long"))\
                      .rangeBetween(-5 * 60, 0)
    clicks_agg = clicks_df.withColumn('clicks_last_5m', count('*').over(window_5m))
    
    # Join the aggregates by customer_id. Depending on your needs, you may join batch and streaming features,
    # or process them in separate feature groups.
    agg_joined = aggregated_df.join(clicks_agg.select('customer_id', 'clicks_last_5m'),
                                    on='customer_id', how='left')
    
    # Remove duplicates by taking the latest record per customer.
    window_latest = Window.partitionBy('customer_id').orderBy(desc('timestamp'))
    sorted_df = agg_joined.withColumn('rank', dense_rank().over(window_latest))
    grouped_df = sorted_df.filter(col('rank') == 1).drop('rank')
    logger.info(f"[Loggs]aggregate_features - grouped_df count: {grouped_df.count()}")
    # Select the fields that match your feature group schema
    # Here, we assume the batch feature group expects: user_id, total_orders_last_1w, avg_order_value_last_1w, clicks_last_5m, event_time
    final_df = grouped_df.select(
        col('customer_id').alias('user_id'),
        col('total_orders_last_1w'),
        col('avg_order_value_last_1w'),
        col('clicks_last_5m'),
        col('timestamp').alias('event_time')
    )
    return final_df

def write_to_s3(args: Namespace, aggregated_features: DataFrame) -> None:
    logger.info(f'[Loggs][Write Aggregated Features to S3] {aggregated_features.count()}')
    output_path = 's3a://' + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix)
    aggregated_features.coalesce(1) \
                       .write.format('csv') \
                       .option('header', True) \
                       .mode('overwrite') \
                       .save(output_path)
    
def group_by_user(aggregated_features: DataFrame) -> DataFrame: 
    logger.info('[Loggs][Group Aggregated Features by User]')
    window = Window.partitionBy('user_id').orderBy(desc('event_time'))
    sorted_df = aggregated_features.withColumn('rank', dense_rank().over(window))
    logger.info(f'[Loggs]sorted_df count {sorted_df.count()}')
    grouped_df = sorted_df.filter(col('rank') == 1).drop('rank')
    logger.info(f'[Loggs]grouped_df count {grouped_df.count()}')
    # Select only the columns to be ingested into the feature store
    sliced_df = grouped_df.select('user_id', 'total_orders_last_1w', 'avg_order_value_last_1w', 'clicks_last_5m')
    logger.info(f'[Loggs]sliced_df count {sliced_df.count()}')
    return sliced_df

def transform_row(sliced_df: DataFrame) -> list:
    logger.info('[Loggs][Transform Spark DataFrame Rows to Feature Store Records]')
    records = []
    for row in sliced_df.rdd.collect():
        record = []
        user_id, total_orders_last_1w, avg_order_value_last_1w, clicks_last_5m = row
        if user_id is not None:
            record.append({'FeatureName': 'user_id', 'ValueAsString': str(user_id)})
            record.append({'FeatureName': 'total_orders_last_1w', 'ValueAsString': str(total_orders_last_1w)})
            record.append({'FeatureName': 'avg_order_value_last_1w', 'ValueAsString': str(round(avg_order_value_last_1w, 2) if avg_order_value_last_1w else 0.0)})
            # record.append({'FeatureName': 'clicks_last_5m', 'ValueAsString': str(clicks_last_5m if clicks_last_5m else 0)})
            records.append(record)
    return records

def write_to_feature_store(records: list) -> None:
    logger.info(f'[Loggs][Write Grouped Features to SageMaker Feature Store] records count- {len(records)}')
    success, fail = 0, 0
    for record in records:
        event_time_feature = {
                'FeatureName': 'event_time',
                'ValueAsString': str(int(round(time.time())))
            }
        record.append(event_time_feature)
        response = feature_store_client.put_record(FeatureGroupName=FEATURE_GROUP, Record=record)
        if response['ResponseMetadata']['HTTPStatusCode'] == 200:
            success += 1
        else:
            fail += 1
        logger.info('[Loggs]Success = {}, Failed = {}'.format(success, fail))
    logger.info('[Loggs]Success = {}'.format(success))
    logger.info('[Loggs]Fail = {}'.format(fail))
    # You can adjust these assertions based on your expected unique user count
    assert success <= TOTAL_UNIQUE_USERS
    assert fail == 0

def run_spark_job():
    spark = SparkSession.builder.appName('UserBehaviorBatchAggregationJob5').getOrCreate()
    args = parse_args()
    schema = define_schema()
    aggregated_features = aggregate_features(args, schema, spark)
    write_to_s3(args, aggregated_features)
    grouped_features = group_by_user(aggregated_features)
    records = transform_row(grouped_features)
    write_to_feature_store(records)
    
if __name__ == '__main__':
    run_spark_job()


Writing batch_aggregation_user_behavior.py


In [12]:
# # Read in the file
# with open('batch_aggregation_temp.py', 'r') as file :
#   filedata = file.read()

# # Replace the target string
# filedata = filedata.replace('|agg_batch_fg_name|', aggregate_batch_feature_group_name)

# # Write the file out again
# with open('batch_aggregation.py', 'w') as file:
#   file.write(filedata)

### Run SageMaker Processing Job

In [13]:
spark_processor = PySparkProcessor(base_job_name='sagemaker-processing', 
                                   framework_version='2.4', # spark version
                                   role=sagemaker_role, 
                                   # instance_count=1, 
                                   # instance_type='ml.m5.4xlarge', 
                                   instance_count=2, 
                                   instance_type='ml.t3.xlarge', 
                                   env={'AWS_DEFAULT_REGION': boto3.Session().region_name},
                                   max_runtime_in_seconds=1200)

In [14]:
%%time

spark_processor.run(submit_app='batch_aggregation_user_behavior.py', 
                    arguments=['--s3_input_bucket', BUCKET, 
                               '--s3_input_key_prefix', INPUT_KEY_PREFIX, 
                               '--s3_output_bucket', BUCKET, 
                               '--s3_output_key_prefix', OUTPUT_KEY_PREFIX],
                    spark_event_logs_s3_uri='s3://{}/logs'.format(BUCKET),
                    logs=True)

Creating processing-job with name sagemaker-processing-2025-03-17-09-30-46-359


...................[35m03-17 09:33 smspark.cli  INFO     Parsing arguments. argv: ['/usr/local/bin/smspark-submit', '--local-spark-event-logs-dir', '/opt/ml/processing/spark-events/', '/opt/ml/processing/input/code/batch_aggregation_user_behavior.py', '--s3_input_bucket', 'sagemaker-ap-southeast-1-850995562355', '--s3_input_key_prefix', 'raw_clicks', '--s3_output_bucket', 'sagemaker-ap-southeast-1-850995562355', '--s3_output_key_prefix', 'aggregated_clicks'][0m
[35m03-17 09:33 smspark.cli  INFO     Raw spark options before processing: {'class_': None, 'jars': None, 'py_files': None, 'files': None, 'verbose': False}[0m
[35m03-17 09:33 smspark.cli  INFO     App and app arguments: ['/opt/ml/processing/input/code/batch_aggregation_user_behavior.py', '--s3_input_bucket', 'sagemaker-ap-southeast-1-850995562355', '--s3_input_key_prefix', 'raw_clicks', '--s3_output_bucket', 'sagemaker-ap-southeast-1-850995562355', '--s3_output_key_prefix', 'aggregated_clicks'][0m
[35m03-17 09:33 smspark

### Explore Aggregated Features 
<p> The SageMaker Processing Job above creates the aggregated features alongside the raw features and writes it to S3. 
Let us verify this output using the code below and prep it to be used in the next step for model training.</p>


Copy results csv from S3 to local directory. Below step may fail if you are running the notebook for the first time. Hence any file missing error could be safely ignored.

In [12]:
!rm {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/part*.csv

rm: cannot remove '/home/sagemaker-user/cautious-parakeet/notebooks/data/aggregated_clicks/part*.csv': No such file or directory


In [15]:
!aws s3 cp s3://{BUCKET}/{OUTPUT_KEY_PREFIX}/ {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/ --recursive --exclude '_SUCCESS'

download: s3://sagemaker-ap-southeast-1-850995562355/aggregated_clicks/part-00000-ae1a25ad-40fc-4868-8adf-c0f3fcd28d17-c000.csv to data/aggregated_clicks/part-00000-ae1a25ad-40fc-4868-8adf-c0f3fcd28d17-c000.csv


In [16]:
# !aws s3 cp s3://sagemaker-ap-southeast-1-850995562355/aggregated/ {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/cc/ --recursive --exclude '_SUCCESS'
# !mv {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/cc/part*.csv {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/partcc.csv 

In [17]:
!mv {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/part*.csv {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/part.csv 

In [18]:
agg_features = pd.read_csv(f'{LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/part.csv')
# agg_features = pd.read_csv(f'{LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/partcc.csv')
agg_features.dropna(inplace=True)
agg_features = agg_features.drop(columns = ["clicks_last_5m"])
agg_features['total_orders_last_1w'] = agg_features['total_orders_last_1w'].astype(np.int64)
agg_features['avg_order_value_last_1w'] = agg_features['avg_order_value_last_1w'].astype(np.float32)
# agg_features['event_time'] = agg_features['event_time'].astype(np.datetime64)
agg_features['event_time'] = agg_features['event_time'].values.astype('datetime64[ns]')
agg_features.head()

  agg_features['event_time'] = agg_features['event_time'].values.astype('datetime64[ns]')


Unnamed: 0,user_id,total_orders_last_1w,avg_order_value_last_1w,event_time
0,25,1,446.350006,2025-03-01 02:03:40
1,25,1,446.350006,2025-03-01 02:03:40
2,25,1,446.350006,2025-03-01 02:03:40
3,25,1,446.350006,2025-03-01 02:03:40
4,176,1,493.290009,2025-03-01 02:04:59


In [19]:
agg_features.to_csv(f'{LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/processing_output.csv', index=False)

Remove the intermediate `part.csv` file

In [20]:
!rm {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/part.csv

### Validate Feature Group for Records
Let's randomly pick N credit card numbers from the `processing_output.csv` and verify if records exist in the feature group `<aggregate_batch_feature_group_name>` for these card numbers.

In [21]:
N = 3 # number of random records to validate
FEATURE_GROUP = aggregate_batch_feature_group_name

In [22]:
processing_out_df = pd.read_csv(f'{LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/processing_output.csv', nrows=10000)
processing_out_df.head()

Unnamed: 0,user_id,total_orders_last_1w,avg_order_value_last_1w,event_time
0,25,1,446.35,2025-03-01 02:03:40
1,25,1,446.35,2025-03-01 02:03:40
2,25,1,446.35,2025-03-01 02:03:40
3,25,1,446.35,2025-03-01 02:03:40
4,176,1,493.29,2025-03-01 02:04:59


In [23]:
cc_nums = random.sample(processing_out_df['user_id'].tolist(), N)
cc_nums

[660, 448, 139]

Using SageMaker Feature Store run-time client, we can verify if records exist in the feature group for the picked `cc_nums` 

In [24]:
feature_store_client = boto3.Session().client(service_name='sagemaker-featurestore-runtime')

In [25]:
success, fail = 0, 0
for cc_num in cc_nums:
    response = feature_store_client.get_record(FeatureGroupName=FEATURE_GROUP, 
                                               RecordIdentifierValueAsString=str(cc_num))
    if response['ResponseMetadata']['HTTPStatusCode'] == 200 and 'Record' in response.keys():
        success += 1
        print(response['Record'])
    else:
        print(response)
        fail += 1
assert success == N

[{'FeatureName': 'user_id', 'ValueAsString': '660'}, {'FeatureName': 'total_orders_last_1w', 'ValueAsString': '1'}, {'FeatureName': 'avg_order_value_last_1w', 'ValueAsString': '384.13'}, {'FeatureName': 'event_time', 'ValueAsString': '1742204138'}]
[{'FeatureName': 'user_id', 'ValueAsString': '448'}, {'FeatureName': 'total_orders_last_1w', 'ValueAsString': '1'}, {'FeatureName': 'avg_order_value_last_1w', 'ValueAsString': '376.75'}, {'FeatureName': 'event_time', 'ValueAsString': '1742204138'}]
[{'FeatureName': 'user_id', 'ValueAsString': '139'}, {'FeatureName': 'total_orders_last_1w', 'ValueAsString': '1'}, {'FeatureName': 'avg_order_value_last_1w', 'ValueAsString': '425.38'}, {'FeatureName': 'event_time', 'ValueAsString': '1742204138'}]
