### Imports

### Setup SageMaker Feature Store

In [1]:
#!pip install s3fs

import boto3
import sagemaker
from sagemaker.session import Session
import pandas as pd


region = boto3.Session().region_name

boto_session = boto3.Session(region_name=region)

s3_client = boto3.client('s3', region_name=region)

account_id = boto3.client('sts').get_caller_identity()["Account"]

sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

#### Set Up S3 Bucket For The OfflineStore

In [2]:
# change the bucket name to your desired bucket name 
default_s3_bucket_name = feature_store_session.default_bucket()
prefix = 'feature-store-demo'

print(default_s3_bucket_name)

sagemaker-eu-west-1-198539629085


#### IAM role setup

In [3]:
from sagemaker import get_execution_role

# You can modify the following to use a role of your choosing. See the documentation for how to create this.
role = get_execution_role()
print (role)

arn:aws:iam::198539629085:role/service-role/AmazonSageMaker-ExecutionRole-20220528T185695


#### Dataset read

In [4]:
#sample data to test functionality
#df = pd.read_csv("df.csv", nrows=1000)
df = pd.read_csv("df.csv")
df.head()

Unnamed: 0,age,anaemia,creatinine_phosphokinase,diabetes,ejection_fraction,high_blood_pressure,platelets,serum_creatinine,serum_sodium,sex,smoking,time,DEATH_EVENT
0,75.0,0,582,0,20,1,265000.0,1.9,130,1,0,4,1
1,55.0,0,7861,0,38,0,263358.03,1.1,136,1,0,6,1
2,65.0,0,146,0,20,0,162000.0,1.3,129,1,1,7,1
3,50.0,1,111,0,20,0,210000.0,1.9,137,1,0,7,1
4,65.0,1,160,1,20,0,327000.0,2.7,116,0,0,8,1


### Prepare data For Feature Store

#### Create a unique ID for each patient

In [5]:
#### Add an id for each patient
df.reset_index(inplace = True)
df.rename(columns = {'index': 'record_id'}, inplace = True)

In [6]:
df.dtypes

record_id                     int64
age                         float64
anaemia                       int64
creatinine_phosphokinase      int64
diabetes                      int64
ejection_fraction             int64
high_blood_pressure           int64
platelets                   float64
serum_creatinine            float64
serum_sodium                  int64
sex                           int64
smoking                       int64
time                          int64
DEATH_EVENT                   int64
dtype: object

In [7]:
#### We want this record_id to be treated as a sting ID, we will change object --> String
df['record_id'] = df['record_id'].astype(object)

def cast_object_to_string(data_frame):
    for label in data_frame.columns:
        if data_frame.dtypes[label] == 'object':
            data_frame[label] = data_frame[label].astype("str").astype("string")

# cast object dtype to string. The SageMaker Feature Store Python SDK will then map the string dtype to String feature type.
cast_object_to_string(df)

#### Create a TimeStamp for each Record

In [8]:
import time

current_time_sec = int(round(time.time()))
# append EventTime feature
df['EventTime'] = pd.Series([current_time_sec]*len(df), dtype="float64")

In [9]:
df.head()

Unnamed: 0,record_id,age,anaemia,creatinine_phosphokinase,diabetes,ejection_fraction,high_blood_pressure,platelets,serum_creatinine,serum_sodium,sex,smoking,time,DEATH_EVENT,EventTime
0,0,75.0,0,582,0,20,1,265000.0,1.9,130,1,0,4,1,1653828000.0
1,1,55.0,0,7861,0,38,0,263358.03,1.1,136,1,0,6,1,1653828000.0
2,2,65.0,0,146,0,20,0,162000.0,1.3,129,1,1,7,1,1653828000.0
3,3,50.0,1,111,0,20,0,210000.0,1.9,137,1,0,7,1,1653828000.0
4,4,65.0,1,160,1,20,0,327000.0,2.7,116,0,0,8,1,1653828000.0


#### Check data types for each column

In [10]:
df.dtypes

record_id                    string
age                         float64
anaemia                       int64
creatinine_phosphokinase      int64
diabetes                      int64
ejection_fraction             int64
high_blood_pressure           int64
platelets                   float64
serum_creatinine            float64
serum_sodium                  int64
sex                           int64
smoking                       int64
time                          int64
DEATH_EVENT                   int64
EventTime                   float64
dtype: object

### Create Features

#### Assign a feature group name¶

In [11]:
from time import gmtime, strftime, sleep

demo_feature_group_name = 'demo-feature-group-' + strftime('%d-%H-%M-%S', gmtime())
print(demo_feature_group_name)

demo-feature-group-29-12-47-51


#### Create a FeatureGroup

In [12]:
from sagemaker.feature_store.feature_group import FeatureGroup

demo_feature_group = FeatureGroup(name=demo_feature_group_name, sagemaker_session=feature_store_session)

#### Define Identifier

In [13]:
# record identifier and event time feature names
record_identifier_feature_name = "record_id"
event_time_feature_name = "EventTime"

#### Load feature definitions to the feature group

In [14]:
demo_feature_group.load_feature_definitions(data_frame=df)# output is suppressed

[FeatureDefinition(feature_name='record_id', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='age', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='anaemia', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='creatinine_phosphokinase', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='diabetes', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='ejection_fraction', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='high_blood_pressure', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='platelets', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='serum_creatinine', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='serum_sodium', feature_type=<FeatureT

#### Create FeatureGroup

In [15]:
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    print(status)
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")

demo_feature_group.create(
    s3_uri=f"s3://{default_s3_bucket_name}/{prefix}", #offline feature store bucket
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True
)
wait_for_feature_group_creation_complete(feature_group=demo_feature_group)


Creating
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
FeatureGroup demo-feature-group-29-12-47-51 successfully created.


### Work with your FeatureGroup

#### Check FeatureGroup Info¶

In [16]:
demo_feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:eu-west-1:198539629085:feature-group/demo-feature-group-29-12-47-51',
 'FeatureGroupName': 'demo-feature-group-29-12-47-51',
 'RecordIdentifierFeatureName': 'record_id',
 'EventTimeFeatureName': 'EventTime',
 'FeatureDefinitions': [{'FeatureName': 'record_id', 'FeatureType': 'String'},
  {'FeatureName': 'age', 'FeatureType': 'Fractional'},
  {'FeatureName': 'anaemia', 'FeatureType': 'Integral'},
  {'FeatureName': 'creatinine_phosphokinase', 'FeatureType': 'Integral'},
  {'FeatureName': 'diabetes', 'FeatureType': 'Integral'},
  {'FeatureName': 'ejection_fraction', 'FeatureType': 'Integral'},
  {'FeatureName': 'high_blood_pressure', 'FeatureType': 'Integral'},
  {'FeatureName': 'platelets', 'FeatureType': 'Fractional'},
  {'FeatureName': 'serum_creatinine', 'FeatureType': 'Fractional'},
  {'FeatureName': 'serum_sodium', 'FeatureType': 'Integral'},
  {'FeatureName': 'sex', 'FeatureType': 'Integral'},
  {'FeatureName': 'smoking', 'FeatureType': 'Integ

In [17]:
sagemaker_client.list_feature_groups() # use boto client to list FeatureGroups

{'FeatureGroupSummaries': [{'FeatureGroupName': 'demo-feature-group-29-12-47-51',
   'FeatureGroupArn': 'arn:aws:sagemaker:eu-west-1:198539629085:feature-group/demo-feature-group-29-12-47-51',
   'CreationTime': datetime.datetime(2022, 5, 29, 12, 49, 24, 897000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Created'}],
 'ResponseMetadata': {'RequestId': '41c9b8be-7115-4d63-8842-32cd4083df94',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '41c9b8be-7115-4d63-8842-32cd4083df94',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '250',
   'date': 'Sun, 29 May 2022 12:53:08 GMT'},
  'RetryAttempts': 0}}

#### Put Records into the Feature Store

In [18]:
print(df.shape)
demo_feature_group.ingest(
    data_frame=df, max_workers=3, wait=True
)

(299, 15)


IngestionManagerPandas(feature_group_name='demo-feature-group-29-12-47-51', sagemaker_fs_runtime_client_config=<botocore.config.Config object at 0x7fad35b32cd0>, max_workers=3, max_processes=1, profile_name=None, _async_result=<multiprocess.pool.MapResult object at 0x7fad35133f50>, _processing_pool=<pool ProcessPool(ncpus=1)>, _failed_indices=[])

#### Get Records from a Feature Group

##### If we don't enable online store then we can't get the results via 'featurestore_runtime'

In [22]:
record_identifier_value = str(201)

featurestore_runtime.get_record(FeatureGroupName=demo_feature_group_name, RecordIdentifierValueAsString=record_identifier_value)

{'ResponseMetadata': {'RequestId': 'bf19f0db-2f17-47f9-b944-f808cc400015',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'bf19f0db-2f17-47f9-b944-f808cc400015',
   'content-type': 'application/json',
   'content-length': '787',
   'date': 'Sun, 29 May 2022 13:01:28 GMT'},
  'RetryAttempts': 0},
 'Record': [{'FeatureName': 'record_id', 'ValueAsString': '201'},
  {'FeatureName': 'age', 'ValueAsString': '45.0'},
  {'FeatureName': 'anaemia', 'ValueAsString': '0'},
  {'FeatureName': 'creatinine_phosphokinase', 'ValueAsString': '308'},
  {'FeatureName': 'diabetes', 'ValueAsString': '1'},
  {'FeatureName': 'ejection_fraction', 'ValueAsString': '60'},
  {'FeatureName': 'high_blood_pressure', 'ValueAsString': '1'},
  {'FeatureName': 'platelets', 'ValueAsString': '377000.0'},
  {'FeatureName': 'serum_creatinine', 'ValueAsString': '1.0'},
  {'FeatureName': 'serum_sodium', 'ValueAsString': '136'},
  {'FeatureName': 'sex', 'ValueAsString': '1'},
  {'FeatureName': 'smoking', 'ValueA

### Build a Training Dataset

In [23]:
demo_query = demo_feature_group.athena_query()
print(demo_query)
demo_table = demo_query.table_name
print(demo_table)

AthenaQuery(catalog='AwsDataCatalog', database='sagemaker_featurestore', table_name='demo-feature-group-29-12-47-51-1653828564', sagemaker_session=<sagemaker.session.Session object at 0x7fad36125e50>, _current_query_execution_id=None, _result_bucket=None, _result_file_prefix=None)
demo-feature-group-29-12-47-51-1653828564


In [None]:
df.shape

In [24]:
# Athena query
query_string = 'SELECT * FROM "'+demo_table+'" LIMIT 100000'
print(query_string)
# run Athena query. The output is loaded to a Pandas dataframe.
dataset = pd.DataFrame()
demo_query.run(query_string=query_string, output_location='s3://'+default_s3_bucket_name+'/'+prefix+'/query_results/')
demo_query.wait()
dataset = demo_query.as_dataframe()
print(dataset.shape)
dataset.head()

SELECT * FROM "demo-feature-group-29-12-47-51-1653828564" LIMIT 100000
(299, 18)


Unnamed: 0,record_id,age,anaemia,creatinine_phosphokinase,diabetes,ejection_fraction,high_blood_pressure,platelets,serum_creatinine,serum_sodium,sex,smoking,time,death_event,eventtime,write_time,api_invocation_time,is_deleted
0,100,65.0,1,305,0,25,0,298000.0,1.1,141,1,0,87,0,1653828000.0,2022-05-29 12:58:49.519,2022-05-29 12:53:42.000,False
1,214,65.0,1,135,0,35,1,290000.0,0.8,134,1,0,194,0,1653828000.0,2022-05-29 12:58:49.519,2022-05-29 12:53:42.000,False
2,219,55.0,0,582,1,35,1,371000.0,0.7,140,0,0,197,0,1653828000.0,2022-05-29 12:58:49.519,2022-05-29 12:53:42.000,False
3,238,65.0,1,720,1,40,0,257000.0,1.0,136,0,0,210,0,1653828000.0,2022-05-29 12:58:49.519,2022-05-29 12:53:42.000,False
4,154,65.0,1,335,0,35,1,235000.0,0.8,136,0,0,120,0,1653828000.0,2022-05-29 12:58:49.519,2022-05-29 12:53:43.000,False


#### Prepare dataset for training

In [25]:
# Prepare query results for training.
query_execution = demo_query.get_query_execution()
query_result = 's3://'+default_s3_bucket_name+'/'+prefix+'/query_results/'+query_execution['QueryExecution']['QueryExecutionId']+'.csv'
print(query_result)

s3://sagemaker-eu-west-1-198539629085/feature-store-demo/query_results/0402deb9-27d8-4dfd-817c-271cffef09a9.csv


In [26]:
# Select useful columns for training with target column as the first.
dataset = dataset[["death_event", "age", 'anaemia', 'creatinine_phosphokinase', 'diabetes',
       'ejection_fraction', 'high_blood_pressure', 'platelets',
       'serum_creatinine', 'serum_sodium', 'sex', 'smoking', 'time']]
# Write to csv in S3 without headers and index column.
dataset.to_csv('dataset.csv', header=False, index=False)
s3_client.upload_file('dataset.csv', default_s3_bucket_name, prefix+'/training_input/dataset.csv')
dataset_uri_prefix = 's3://'+default_s3_bucket_name+'/'+prefix+'/training_input/';
print(dataset_uri_prefix)

s3://sagemaker-eu-west-1-198539629085/feature-store-demo/training_input/


### Delete feature group

In [27]:
sagemaker_client.delete_feature_group(FeatureGroupName = demo_feature_group_name)

{'ResponseMetadata': {'RequestId': '1b68c457-48f6-4199-919d-26d50d747990',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '1b68c457-48f6-4199-919d-26d50d747990',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'date': 'Sun, 29 May 2022 13:06:32 GMT'},
  'RetryAttempts': 0}}

In [28]:
sagemaker_client.list_feature_groups() # use boto client to list FeatureGroups

{'FeatureGroupSummaries': [],
 'ResponseMetadata': {'RequestId': 'a38c194d-6b1c-41aa-b611-f73c49b9357e',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'a38c194d-6b1c-41aa-b611-f73c49b9357e',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '28',
   'date': 'Sun, 29 May 2022 13:06:37 GMT'},
  'RetryAttempts': 0}}

# Further Read
* [SageMaker Feature Store Documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store.html)
* [Store, Discover, and Share Machine Learning Features with Amazon SageMaker Feature Store](https://aws.amazon.com/blogs/aws/new-store-discover-and-share-machine-learning-features-with-amazon-sagemaker-feature-store/?sc_icampaign=launch_sagemaker-feature-store_reinvent20&sc_ichannel=ha&sc_icontent=awssm-6216&sc_iplace=ribbon&trk=ha_awssm-6216)  
* [Using streaming ingestion with Amazon SageMaker Feature Store to make ML-backed decisions in near-real time](https://aws.amazon.com/blogs/machine-learning/using-streaming-ingestion-with-amazon-sagemaker-feature-store-to-make-ml-backed-decisions-in-near-real-time/)
* [Fraud Detection using SageMaker Feature Store](https://github.com/aws/amazon-sagemaker-examples/blob/master/sagemaker-featurestore/sagemaker_featurestore_fraud_detection_python_sdk.ipynb)
