# Ingesting data into Amazon SageMaker FeatureStore

Kernel `Python 3 (Data Science)` works well with this notebook.

The following policies need to be attached to the execution role:
- AmazonSageMakerFullAccess
- AmazonS3FullAccess

## Contents
1. [Background](#Background)
1. [Setup SageMaker FeatureStore](#Setup-SageMaker-FeatureStore)
1. [Inspect Dataset](#Inspect-Dataset)
1. [Ingest Data into FeatureStore](#Ingest-Data-into-FeatureStore)

## Background

Amazon SageMaker FeatureStore is a new SageMaker capability that makes it easy for customers to create and manage curated data for machine learning (ML) development. SageMaker FeatureStore enables data ingestion via a high TPS API and data consumption via the online and offline stores. 

This notebook provides an example for the APIs provided by SageMaker FeatureStore by walking through the process of training a fraud detection model. The notebook demonstrates how the dataset's tables can be ingested into the FeatureStore, queried to create a training dataset, and quickly accessed during inference. 


### Terminology

A **FeatureGroup** is the main resource that contains the metadata for all the data stored in SageMaker FeatureStore. A FeatureGroup contains a list of FeatureDefinitions. A **FeatureDefinition** consists of a name and one of the following data types: a integral, string or decimal. The FeatureGroup also contains an **OnlineStoreConfig** and an **OfflineStoreConfig** controlling where the data is stored. Enabling the online store allows quick access to the latest value for a Record via the GetRecord API. The offline store, a required configuration, allows storage of historical data in your S3 bucket. 

Once a FeatureGroup is created, data can be added as Records. **Records** can be thought of as a row in a table. Each record will have a unique **RecordIdentifier** along with values for all other FeatureDefinitions in the FeatureGroup. 

## Setup SageMaker FeatureStore

Let's start by setting up the SageMaker Python SDK and boto client.

In [13]:
import boto3
import sagemaker
from sagemaker.session import Session


region = boto3.Session().region_name

boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

#### S3 Bucket Setup For The OfflineStore

SageMaker FeatureStore writes the data in the OfflineStore of a FeatureGroup to a S3 bucket owned by you. To be able to write to your S3 bucket, SageMaker FeatureStore assumes an IAM role which has access to it. The role is also owned by you.
Note that the same bucket can be re-used across FeatureGroups. Data in the bucket is partitioned by FeatureGroup.

Set the default s3 bucket name and it will be referenced throughout the notebook.

In [14]:
# You can modify the following to use a bucket of your choosing
default_s3_bucket_name = feature_store_session.default_bucket()
prefix = 'sagemaker-featurestore-demo'

print(default_s3_bucket_name)

sagemaker-ap-southeast-1-365792799466


In [15]:
from sagemaker import get_execution_role

# You can modify the following to use a role of your choosing. See the documentation for how to create this.
role = get_execution_role()
print (role)

arn:aws:iam::365792799466:role/service-role/AmazonSageMaker-ExecutionRole-20191216T120729


## Load data

In [16]:
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt
import io

s3_client = boto3.client('s3', region_name=region)

fraud_detection_bucket_name = default_s3_bucket_name
data_file_key = 'data/fraud-detection/credit-dataset.csv'

data_object = s3_client.get_object(Bucket=fraud_detection_bucket_name, Key=data_file_key)

transaction_data = pd.read_csv(io.BytesIO(data_object['Body'].read()))

transaction_data.head()


Unnamed: 0,time,v1,v2,v3,v4,v5,v6,v7,v8,v9,...,v23,v24,v25,v26,v27,v28,amount,class,event_time,record_id
0,0.0,-1.359807,-0.072781,2.536347,1.378155,-0.338321,0.462388,0.239599,0.098698,0.363787,...,-0.110474,0.066928,0.128539,-0.189115,0.133558,-0.021053,149.62,0,2021-08-24 03:42:37.420039,1
1,0.0,1.191857,0.266151,0.16648,0.448154,0.060018,-0.082361,-0.078803,0.085102,-0.255425,...,0.101288,-0.339846,0.16717,0.125895,-0.008983,0.014724,2.69,0,2021-08-24 03:42:37.420039,2
2,1.0,-1.358354,-1.340163,1.773209,0.37978,-0.503198,1.800499,0.791461,0.247676,-1.514654,...,0.909412,-0.689281,-0.327642,-0.139097,-0.055353,-0.059752,378.66,0,2021-08-24 03:42:37.420039,3
3,1.0,-0.966272,-0.185226,1.792993,-0.863291,-0.010309,1.247203,0.237609,0.377436,-1.387024,...,-0.190321,-1.175575,0.647376,-0.221929,0.062723,0.061458,123.5,0,2021-08-24 03:42:37.420039,4
4,2.0,-1.158233,0.877737,1.548718,0.403034,-0.407193,0.095921,0.592941,-0.270533,0.817739,...,-0.137458,0.141267,-0.20601,0.502292,0.219422,0.215153,69.99,0,2021-08-24 03:42:37.420039,5


## Ingest Data into FeatureStore

In this step we will create the FeatureGroups representing the transaction and identity tables.

In [17]:
from time import gmtime, strftime, sleep

fd_feature_group_name = 'transactionfeaturegroup001'


In [18]:
from sagemaker.feature_store.feature_group import FeatureGroup

fd_feature_group = FeatureGroup(name=fd_feature_group_name, sagemaker_session=feature_store_session)

In [19]:
import time

current_time_sec = int(round(time.time()))

def cast_object_to_string(data_frame):
    for label in data_frame.columns:
        if data_frame.dtypes[label] == 'object':
            data_frame[label] = data_frame[label].astype("str").astype("string")

# cast object dtype to string. The SageMaker FeatureStore Python SDK will then map the string dtype to String feature type.
cast_object_to_string(transaction_data)

# record identifier and event time feature names
record_identifier_feature_name = "record_id"
event_time_feature_name = "event_time"

# append EventTime feature
transaction_data[event_time_feature_name] = pd.Series([current_time_sec]*len(transaction_data), dtype="float64")

# load feature definitions to the feature group. SageMaker FeatureStore Python SDK will auto-detect the data schema based on input data.
fd_feature_group.load_feature_definitions(data_frame=transaction_data); # output is suppressed


#### Create FeatureGroups in SageMaker FeatureStore

In [20]:
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")

fd_feature_group.create(
    s3_uri=f"s3://{default_s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True,
  
)



wait_for_feature_group_creation_complete(feature_group=fd_feature_group)


Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
FeatureGroup transactionfeaturegroup001 successfully created.


In [21]:
fd_feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:ap-southeast-1:365792799466:feature-group/transactionfeaturegroup001',
 'FeatureGroupName': 'transactionfeaturegroup001',
 'RecordIdentifierFeatureName': 'record_id',
 'EventTimeFeatureName': 'event_time',
 'FeatureDefinitions': [{'FeatureName': 'time', 'FeatureType': 'Fractional'},
  {'FeatureName': 'v1', 'FeatureType': 'Fractional'},
  {'FeatureName': 'v2', 'FeatureType': 'Fractional'},
  {'FeatureName': 'v3', 'FeatureType': 'Fractional'},
  {'FeatureName': 'v4', 'FeatureType': 'Fractional'},
  {'FeatureName': 'v5', 'FeatureType': 'Fractional'},
  {'FeatureName': 'v6', 'FeatureType': 'Fractional'},
  {'FeatureName': 'v7', 'FeatureType': 'Fractional'},
  {'FeatureName': 'v8', 'FeatureType': 'Fractional'},
  {'FeatureName': 'v9', 'FeatureType': 'Fractional'},
  {'FeatureName': 'v10', 'FeatureType': 'Fractional'},
  {'FeatureName': 'v11', 'FeatureType': 'Fractional'},
  {'FeatureName': 'v12', 'FeatureType': 'Fractional'},
  {'FeatureName': 'v13', '

#### PutRecords into FeatureGroup

After the FeatureGroups have been created, we can put data into the FeatureGroups by using the PutRecord API. This API can handle high TPS and is designed to be called by different streams. The data from all of these Put requests is buffered and written to S3 in chunks. The files will be written to the offline store within a few minutes of ingestion. For this example, to accelerate the ingestion process, we are specifying multiple workers to do the job simultaneously. It will take ~5min to ingest data to the FeatureGroup, respectively.

In [22]:
fd_feature_group.ingest(
    data_frame=transaction_data, max_workers=3, wait=True
)

IngestionManagerPandas(feature_group_name='transactionfeaturegroup001', sagemaker_fs_runtime_client_config=<botocore.config.Config object at 0x7f8a2b25bd50>, max_workers=3, max_processes=1, _async_result=<multiprocess.pool.MapResult object at 0x7f8a2aab4750>, _processing_pool=<pool ProcessPool(ncpus=1)>, _failed_indices=[])

To confirm that data has been ingested, we can quickly retrieve a record from the online store:

In [23]:
record_identifier_value = str(100)

featurestore_runtime.get_record(FeatureGroupName=transaction_feature_group_name, RecordIdentifierValueAsString=record_identifier_value)

NameError: name 'transaction_feature_group_name' is not defined

In [None]:
% store fd_feature_group_name