# Notebook 1: Ingesting Data into the Feature Store

Specify "Python 3" Kernel, "Data Science" Image.

<div class="alert alert-info"> 💡 <strong> Quick Start </strong>
This notebook will take ~10 minutes to run, so go ahead and click on "Run" in the toolbar, then "Run All Cells". This will run all the cells and begin to ingest data into SageMaker Feature Store which we'll need to call upon in the next few notebooks. As the cells are running, do look through the code and explanations as this will be the foundation for other notebooks.
</div>

### Background

Amazon SageMaker Feature Store is a centralized store for features and associated metadata so features can be easily discovered and reused. You can create an online or an offline store. The online store is used for low latency real-time inference use cases, and the offline store is used for training and batch inference.

The online store is primarily designed for supporting real-time predictions that need low millisecond latency reads and high throughput writes. Offline store is primarily intended for batch predictions and model training. Offline store is an append only store and can be used to store and access historical feature data. The offline store can help you store and serve features for exploration and model training. The online store retains only the latest feature data. Feature Group definitions are immutable after they are created.

There are 5 sets of data about our online grocery use case. Each dataset will have its own Feature Group in SageMaker Feature Store, as described in the image below:

![arch](./img/feature-store-ingestion.png)

We'll be using most Feature Groups to get our data to train our recommendation engine models as well as capture click stream data to influence real-time predictions.

The `click stream historical` Feature Group will be used to initially train our ranking model. As we'll see in later notebooks, the `click stream` Feature Group will be populated by incoming data being streamed via Amazon Kinesis Data Streams (simulating a user interacting with the e-commerce website) and this data will directly influence our recommendation at the time of inference.

### Imports

In [24]:
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup
import boto3
import json
import pandas as pd
from time import gmtime, strftime, time
import time
from parameter_store import ParameterStore
from utils import *
from IPython.core.display import HTML

### Session variables

In [25]:
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session()
default_bucket = sagemaker_session.default_bucket()
region = sagemaker_session.boto_region_name
s3_client = boto3.client('s3', region_name=region)
print(region)
# ParameterStore is a custom utility to save local variable values
# for use across all notebooks
ps = ParameterStore(verbose=False)
ps.set_namespace('feature-store-workshop')

ap-southeast-2


In [26]:
prefix = 'recsys-feature-store'

# Feature Store variables
fs_prefix = 'recsys-'
current_timestamp = strftime('%m-%d-%H-%M', gmtime())
customers_feature_group_name = f'{fs_prefix}customers-fg-{current_timestamp}'
products_feature_group_name = f'{fs_prefix}products-fg-{current_timestamp}'
orders_feature_group_name = f'{fs_prefix}orders-fg-{current_timestamp}'
click_stream_historical_feature_group_name = f'{fs_prefix}click-stream-historical-fg-{current_timestamp}'
click_stream_feature_group_name = f'{fs_prefix}click-stream-fg-{current_timestamp}'

ps.create({'customers_feature_group_name': customers_feature_group_name,
           'products_feature_group_name': products_feature_group_name,
           'orders_feature_group_name': orders_feature_group_name,
           'click_stream_historical_feature_group_name': click_stream_historical_feature_group_name,
           'click_stream_feature_group_name': click_stream_feature_group_name})

In [None]:
print(f'Using SageMaker version: {sagemaker.__version__}')
print(f'Using SageMaker Role: {role}')
print(f'Using S3 Bucket: {default_bucket}')

In [28]:
print('Feature groups names:\n')
print(customers_feature_group_name)
print(products_feature_group_name)
print(orders_feature_group_name)
print(click_stream_historical_feature_group_name)
print(click_stream_feature_group_name)

Feature groups names:

recsys-customers-fg-05-29-00-10
recsys-products-fg-05-29-00-10
recsys-orders-fg-05-29-00-10
recsys-click-stream-historical-fg-05-29-00-10
recsys-click-stream-fg-05-29-00-10


### Load and explore datasets

#### Customers dataset

In [29]:
df_customers = pd.read_csv('data/customers.csv')
df_customers.head()

Unnamed: 0,customer_id,name,state,age,is_married,customer_health_index
0,C1,justin gutierrez,alaska,52,1,0.590238
1,C2,karen cross,idaho,29,1,0.622201
2,C3,amy king,oklahoma,70,1,0.225476
3,C4,nicole hartman,missouri,52,1,0.975817
4,C5,jessica powers,minnesota,31,1,0.886133


#### Products dataset

In [30]:
df_products = pd.read_csv('data/products.csv')
df_products.head()

Unnamed: 0,product_name,product_category,product_id,product_health_index
0,chocolate sandwich cookies,cookies_cakes,P1,0.1
1,nutter butter cookie bites go-pak,cookies_cakes,P25,0.1
2,danish butter cookies,cookies_cakes,P34,0.1
3,gluten free all natural chocolate chip cookies,cookies_cakes,P55,0.1
4,mini nilla wafers munch pack,cookies_cakes,P99,0.1


#### Orders dataset

In [31]:
df_orders = pd.read_csv('data/orders.csv')
df_orders.head()

Unnamed: 0,customer_id,product_id,purchase_amount
0,C1,P10852,87.71
1,C1,P10940,101.71
2,C1,P13818,42.11
3,C1,P2310,55.37
4,C1,P393,55.16


#### Click stream historical dataset

In [32]:
df_click_stream_historical = pd.read_csv('data/click_stream_historical.csv')
df_click_stream_historical.head()

Unnamed: 0,customer_id,product_id,bought,healthy_activity_last_2m,rating
0,C1,P10852,1,1,3.048429
1,C3806,P10852,1,1,1.674935
2,C5257,P10852,1,0,2.691236
3,C8220,P10852,1,1,1.773447
4,C1,P10852,0,9,3.048429


#### Click stream

In [33]:
# Read a sample in order to have a schema for Feature Group creation
df_click_stream = pd.read_csv('data/click_stream.csv')
df_click_stream.head()

Unnamed: 0,customer_id,sum_activity_weight_last_2m,avg_product_health_index_last_2m
0,C09234,8,0.2
1,D19283,3,0.1
2,C1234,9,0.8


### Create feature definitions and feature groups

In order to create a Feature Group, you must first create a Feature Definition. A Feature Definition is a schema that describes your data's columns and data types. We'll be inferring this information from the Pandas dataframe itself.

A Feature Group is the main Feature Store resource that contains the metadata for all the data stored in Amazon SageMaker Feature Store. A feature group is a logical grouping of features, defined in the feature store, to describe records. A feature group’s definition is composed of a list of feature definitions, a record identifier name, and configurations for its online and offline store. For our purposes, we'll be using both online and offline stores for our Feature Groups.

For more information, see [Feature Store Concepts](https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store-getting-started.html#feature-store-concepts) and [these docs](https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store-create-feature-group.html).

In [34]:
customers_feature_group = FeatureGroup(
    name=customers_feature_group_name, sagemaker_session=sagemaker_session
)
orders_feature_group = FeatureGroup(
    name=orders_feature_group_name, sagemaker_session=sagemaker_session
)
products_feature_group = FeatureGroup(
    name=products_feature_group_name, sagemaker_session=sagemaker_session
)
click_stream_historical_feature_group = FeatureGroup(
    name=click_stream_historical_feature_group_name, sagemaker_session=sagemaker_session
)
click_stream_feature_group = FeatureGroup(
    name=click_stream_feature_group_name, sagemaker_session=sagemaker_session
)

In [35]:
# Event Time
event_time_feature_name = "event_time"
current_time_sec = int(round(time.time()))

df_customers[event_time_feature_name] = pd.Series([current_time_sec]*len(df_customers), dtype="float64")
df_orders[event_time_feature_name] = pd.Series([current_time_sec]*len(df_orders), dtype="float64")
df_products[event_time_feature_name] = pd.Series([current_time_sec]*len(df_products), dtype="float64")
df_click_stream_historical[event_time_feature_name] = pd.Series([current_time_sec]*len(df_click_stream_historical), dtype="float64")
df_click_stream[event_time_feature_name] = pd.Series([current_time_sec]*len(df_click_stream), dtype="float64")

df_click_stream.head()

Unnamed: 0,customer_id,sum_activity_weight_last_2m,avg_product_health_index_last_2m,event_time
0,C09234,8,0.2,1716941000.0
1,D19283,3,0.1,1716941000.0
2,C1234,9,0.8,1716941000.0


In [36]:
# Load Feature Definitions
customers_feature_group.load_feature_definitions(data_frame=df_customers)
orders_feature_group.load_feature_definitions(data_frame=df_orders)
products_feature_group.load_feature_definitions(data_frame=df_products)
click_stream_historical_feature_group.load_feature_definitions(data_frame=df_click_stream_historical)
click_stream_feature_group.load_feature_definitions(data_frame=df_click_stream)

[FeatureDefinition(feature_name='customer_id', feature_type=<FeatureTypeEnum.STRING: 'String'>, collection_type=None),
 FeatureDefinition(feature_name='sum_activity_weight_last_2m', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>, collection_type=None),
 FeatureDefinition(feature_name='avg_product_health_index_last_2m', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>, collection_type=None),
 FeatureDefinition(feature_name='event_time', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>, collection_type=None)]

In [None]:
# Create Feature Groups
record_identifier_feature_name  = "customer_id"
customers_feature_group.create(
    s3_uri=f"s3://{default_bucket}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True
)

orders_feature_group.create(
    s3_uri=f"s3://{default_bucket}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True
)

click_stream_historical_feature_group.create(
    s3_uri=f"s3://{default_bucket}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True
)

click_stream_feature_group.create(
    s3_uri=f"s3://{default_bucket}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True
)

products_record_identifier_feature_name = "product_id"

products_feature_group.create(
    s3_uri=f"s3://{default_bucket}/{prefix}",
    record_identifier_name=products_record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True
)


In [38]:
# Check Feature Groups have finished creating
def check_feature_group_status(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group to be Created")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    print(f"FeatureGroup {feature_group.name} successfully created.")


check_feature_group_status(customers_feature_group)
check_feature_group_status(orders_feature_group)
check_feature_group_status(products_feature_group)
check_feature_group_status(click_stream_historical_feature_group)
check_feature_group_status(click_stream_feature_group)

Waiting for Feature Group to be Created
Waiting for Feature Group to be Created
FeatureGroup recsys-customers-fg-05-29-00-10 successfully created.
Waiting for Feature Group to be Created
FeatureGroup recsys-orders-fg-05-29-00-10 successfully created.
Waiting for Feature Group to be Created
FeatureGroup recsys-products-fg-05-29-00-10 successfully created.
FeatureGroup recsys-click-stream-historical-fg-05-29-00-10 successfully created.
FeatureGroup recsys-click-stream-fg-05-29-00-10 successfully created.


When feature data is ingested and updated, Feature Store stores historical data for all features in the offline store, whose data is located in S3. You can actually query this offline data using Athena so long as you have the Feature Group's AWS Glue Data Catalog table name. Fortunately, this table name is stored in the metadata for each Feature Group.

Let's go ahead and store our Feature Group table names for later use.

In [39]:
# Store table names 
customers_query = customers_feature_group.athena_query()
customers_table = customers_query.table_name

products_query = products_feature_group.athena_query()
products_table = products_query.table_name

orders_query = orders_feature_group.athena_query()
orders_table = orders_query.table_name

click_stream_historical_query = click_stream_historical_feature_group.athena_query()
click_stream_historical_table = click_stream_historical_query.table_name

click_stream_query = click_stream_feature_group.athena_query()
click_stream_table = click_stream_query.table_name

# Store table names locally to be used in other notebooks
ps.add({
    'customers_table': customers_table,
    'products_table': products_table,
    'orders_table': orders_table,
    'click_stream_historical_table': click_stream_historical_table,
    'click_stream_table': click_stream_table
})

### Ingest data into feature groups

Now let's take the data from our Pandas dataframes and ingest them into the Feature Groups we created above.

In [40]:
# Ingest data into the feature groups
customers_feature_group.ingest(data_frame=df_customers, max_workers=3, wait=False)
customers_count = df_customers.shape[0]

products_feature_group.ingest(data_frame=df_products, max_workers=3, wait=False)
products_count = df_products.shape[0]

orders_feature_group.ingest(data_frame=df_orders, max_workers=3, wait=False)
orders_count = df_orders.shape[0]

click_stream_historical_feature_group.ingest(data_frame=df_click_stream_historical, max_workers=3, wait=True)

IngestionManagerPandas(feature_group_name='recsys-click-stream-historical-fg-05-29-00-10', feature_definitions={'customer_id': {'FeatureName': 'customer_id', 'FeatureType': 'String'}, 'product_id': {'FeatureName': 'product_id', 'FeatureType': 'String'}, 'bought': {'FeatureName': 'bought', 'FeatureType': 'Integral'}, 'healthy_activity_last_2m': {'FeatureName': 'healthy_activity_last_2m', 'FeatureType': 'Integral'}, 'rating': {'FeatureName': 'rating', 'FeatureType': 'Fractional'}, 'event_time': {'FeatureName': 'event_time', 'FeatureType': 'Fractional'}}, sagemaker_fs_runtime_client_config=<botocore.config.Config object at 0x7ff0c7e70460>, sagemaker_session=<sagemaker.session.Session object at 0x7ff0c7bd18a0>, max_workers=3, max_processes=1, profile_name=None, _async_result=<multiprocess.pool.MapResult object at 0x7ff0c983e950>, _processing_pool=<pool ProcessPool(ncpus=1)>, _failed_indices=[])

In [41]:
click_stream_historical_count = df_click_stream_historical.shape[0]

# Add Feature Group counts for later use
ps.add({'customers_count': customers_count,
        'products_count': products_count,
        'orders_count': orders_count,
        'click_stream_historical_count': click_stream_historical_count,
        'click_stream_count': 0})

This ingestion process is using parallelism but will still take a few minutes for our data to be ingested into each respective Feature Group.

<div class="alert alert-info"> 💡 <strong> Feature Store Ingestion </strong>
In the above code, we were waiting on data to ingest to the online store. When we ingest data into a Feature Group that has the online store enabled, it first gets ingested into the online store so that it can be used in a real-time setting. That ingested data then automatically syncs to the offline store, a process that takes anywhere from 5 to 10 minutes. Below, we're waiting on that data to be written to the offline store so that we can use it in the next notebook.
</div>

If you click on the link below, you can actually see where this offline data is stored for each Feature Group.

In [43]:
account_id = default_bucket.split('-')[-1]
offline_store_url = f'https://s3.console.aws.amazon.com/s3/buckets/{default_bucket}?region={region}&prefix={prefix}/{account_id}/sagemaker/{region}/offline-store/&showversions=false'
display(HTML(f"<a href='{offline_store_url}'>Offline Feature Store S3 Link</a>"))

In [44]:
# Save all our local params
ps.store()

date and time:  29/05/2024 00:34:44


Go back to Workshop Studio and click on "Next".