### Example for inference pipeline using feature store fs get and xgboost model

XGBoost model is based on https://github.com/aws/amazon-sagemaker-examples/tree/master/advanced_functionality/inference_pipeline_sparkml_xgboost_abalone

In [None]:
import sagemaker
from sagemaker import get_execution_role
role = get_execution_role()
sm_session = sagemaker.Session()

Create a feature group and ingest a sample record

In [None]:
import pandas as pd
feat_cols = ['f_1','f_2','f_3','f_4','f_5','f_6','f_7','f_8','f_9']
df = pd.DataFrame([[1, '2020-12-21T01:00:00Z', 0.0, 0.0, 0.335, 0.22, 0.07, 0.17, 0.076, 0.0365, 0.05]], 
             columns=['f_id', 'f_time'] + feat_cols)

def cast_object_to_string(df):
    for col in df.columns:
        if df.dtypes[col] == 'object':
            df[col] = df[col].astype('str').astype('string')
            
# cast object dtype to string. The SageMaker FeatureStore Python SDK will then map the string dtype to String feature type.
cast_object_to_string(df)

record_identifier_feature_name = "f_id"
event_time_feature_name = "f_time"


from time import gmtime, strftime
import time
timestamp_suffix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

fg_name = 'fg-'+ timestamp_suffix
from sagemaker.feature_store.feature_group import FeatureGroup
fg = FeatureGroup(name=fg_name, sagemaker_session=sm_session)
fg.load_feature_definitions(data_frame=df)
print ('done loading feature group definition') # to supress previous call output

Create and ingest into feature group

In [None]:
role = sagemaker.get_execution_role()
kwargs = dict(
    s3_uri = False,
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True
)
fg.create(**kwargs)

def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")

wait_for_feature_group_creation_complete(fg)

fg.ingest(data_frame=df, wait=True)

# verify feature get
feature_record_id = str(1)
record = sm_session.boto_session.client('sagemaker-featurestore-runtime', region_name=sm_session.boto_region_name) \
    .get_record(FeatureGroupName=fg_name, RecordIdentifierValueAsString=feature_record_id, FeatureNames=feat_cols)
print (record)

Deploy a model, to interact with feature store. It can be the first step in a inference pipeline

In [None]:
%%writefile inference_fs.py

import json
import os
import pickle as pkl
import time
import sys
import subprocess
import numpy as np

#from sagemaker_inference import content_types
#from sagemaker_containers.beta.framework import encoders

subprocess.check_call([sys.executable, "-m", "pip", "install", "sagemaker"])

import boto3
import sagemaker

boto_session = boto3.Session()
boto_fs_client = boto_session.client(service_name='sagemaker-featurestore-runtime')
feat_cols = ['f_1','f_2','f_3','f_4','f_5','f_6','f_7','f_8','f_9']

def model_fn(model_dir):
    print ('processing - in model_fn')
    return None


def input_fn(request_body, request_content_type):
    print (f'processing - in input_fn with content_type = {request_content_type}')
    return request_body


def predict_fn(input_data, model):
    print ('processing - in predict_fn')
    
    params = input_data.split(',')
    fg_name = params[0]
    input_feat_id = int(params[1])
    
    
    start = time.time()
    rec = boto_fs_client.get_record(FeatureGroupName=fg_name, RecordIdentifierValueAsString=str(input_feat_id),FeatureNames=feat_cols)
    end = time.time()
    feats = rec.get('Record', None)
    duration = end-start
    
    print (f'processing - duration = {duration}')
    
    if feats:
        return ','.join(i['ValueAsString'] for i in feats)
    else:
        return ''

#ref - https://github.com/aws/sagemaker-xgboost-container/blob/master/src/sagemaker_xgboost_container/handler_service.py
def output_fn(prediction, content_type):
    print (f'processing - output_fn with values = {prediction}, for output content_type = {content_type}')
    return prediction

Create models, deploy and endpoint

In [None]:
# Create Inference Model
from sagemaker.xgboost.model import XGBoostModel

fs_inference_model = XGBoostModel(
    model_data=None,
    role=role,
    entry_point="inference_fs.py",
    framework_version="1.2-2",
    sagemaker_session=sm_session
)


In [None]:
# Create xgboost model
from sagemaker.model import Model
model_tar = 's3://scratch-fs/xgboost_model_sample/model.tar.gz'
image_uri = sagemaker.image_uris.retrieve("xgboost", "us-west-2", "latest")
xgb_model = Model(
    model_data= model_tar, 
    image_uri=image_uri,
    sagemaker_session=sm_session)

In [None]:
from sagemaker.pipeline import PipelineModel
model_name = "inference-pipeline-" + timestamp_suffix
sm_model = PipelineModel(name=model_name, role=role, models=[fs_inference_model, xgb_model])
#sm_model = PipelineModel(name=model_name, role=role, models=[fs_inference_model])

In [None]:
endpoint_name = "inference-pipeline-ep-" + timestamp_suffix
sm_model.deploy(initial_instance_count=1, 
                instance_type="ml.c4.xlarge", 
                endpoint_name=endpoint_name)

In [None]:
response = sagemaker.Session().sagemaker_runtime_client.invoke_endpoint( 
    EndpointName=endpoint_name,
    Body=f'{fg_name}, {feature_record_id}',
    ContentType="text/csv",
    Accept="text/csv",
)

In [None]:
response_body = response['Body'] 
print(response_body.read())

Cleanup

In [None]:
# delete fg
fg.delete()

# delete endpoint
sm_session.delete_endpoint(endpoint_name=endpoint_name)