# Inference Pipeline Using Online featureStore as inference input to be further processed with Scikit-learn processing and Linear learner
This work is an extenstion to this repository https://github.com/aws/amazon-sagemaker-examples/blob/master/sagemaker-python-sdk/scikit_learn_inference_pipeline/Inference%20Pipeline%20with%20Scikit-learn%20and%20Linear%20Learner.ipynbto and incroporates extracting data from Sagemaker online feature store at the time of inference, preprocess the extracted feature using SKlearn transformer and then send to model for inference within a inference pipeline.

In many cases, when the trained model is used for processing real time or batch prediction requests, the model receives data in a format which needs to pre-processed (e.g. featurized) before it can be passed to the algorithm. In the following notebook, we will demonstrate how you can build your ML Pipeline leveraging the Sagemaker Scikit-learn container and SageMaker Linear Learner algorithm & after the model is trained, deploy the Pipeline (Data preprocessing and Lineara Learner) as an Inference Pipeline behind a single Endpoint for real time inference. We fruther call the records from the online feature store at the time of inference to go through the processing followed by inference.

We will demonstrate this using the Abalone Dataset to guess the age of Abalone with physical features. The dataset is available from [UCI Machine Learning](https://archive.ics.uci.edu/ml/datasets/abalone); the aim for this task is to determine age of an Abalone (a kind of shellfish) from its physical measurements. We'll use Sagemaker's Scikit-learn container to featurize the dataset so that it can be used for training with Linear Learner.
<!-- 
### Table of contents
* [Preprocessing data and training the model](#training)
 * [Upload the data for training](#upload_data)
 * [Create a Scikit-learn script to train with](#create_sklearn_script)
 * [Create SageMaker Scikit Estimator](#create_sklearn_estimator)
 * [Batch transform our training data](#preprocess_train_data)
 * [Fit a LinearLearner Model with the preprocessed data](#training_model)
* [Inference Pipeline with Scikit preprocessor and Linear Learner](#inference_pipeline)
 * [Set up the inference pipeline](#pipeline_setup)
 * [Make a request to our pipeline endpoint](#pipeline_inference_request)
 * [Delete Endpoint](#delete_endpoint) -->

Let's first create our Sagemaker session and role, and create a S3 prefix to use for the notebook example.

In [None]:
import sagemaker
from sagemaker import get_execution_role
import boto3
import os
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.impute import SimpleImputer
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import Binarizer, OneHotEncoder, StandardScaler

sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
# Get a SageMaker-compatible role used by this Notebook Instance.
role = get_execution_role()

# S3 prefix
bucket = sagemaker_session.default_bucket()
prefix = "Scikit-InferncePiplline-Featurestore"

# Preprocessing data and training the model <a class="anchor" id="training"></a>
## Downloading dataset <a class="anchor" id="download_data"></a>
SageMaker team has downloaded the dataset from UCI and uploaded to one of the S3 buckets in our account.

In [None]:
!wget --directory-prefix=./abalone_data https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/abalone/abalone.csv

## Upload the data for training <a class="anchor" id="upload_data"></a>

When training large models with huge amounts of data, you'll typically use big data tools, like Amazon Athena, AWS Glue, or Amazon EMR, to create your data in S3. We can use the tools provided by the SageMaker Python SDK to upload the data to a default bucket. 

In [None]:
WORK_DIRECTORY = "abalone_data"

train_input = sagemaker_session.upload_data(
    path="{}/{}".format(WORK_DIRECTORY, "abalone.csv"),
    bucket=bucket,
    key_prefix="{}/{}".format(prefix, "train"),
)


# Create a feature group and ingest a sample record

In [None]:
import pandas as pd
abalone_data = pd.read_csv("/root/Inferencepipeline+featurestore/abalone_data/abalone.csv")

feature_columns_names = [
    "sex",  # M, F, and I (infant)
    "length",  # Longest shell measurement
    "diameter",  # perpendicular to length
    "height",  # with meat in shell
    "whole_weight",  # whole abalone
    "shucked_weight",  # weight of meat
    "viscera_weight",  # gut weight (after bleeding)
    "shell_weight",
    "age"
]  # after being dried


abalone_data.columns=feature_columns_names
abalone_data

In [None]:
import time

#add Identifier and event time for saving onto feature store
abalone_data["abalone_id"] = abalone_data.index + 1
abalone_data["abalone_id"]=abalone_data["abalone_id"].astype('str')
#abalone_data["abalone_id"] =abalone_data["abalone_id"].astype('object')
def cast_object_to_string(df):
    for col in df.columns:
        if df.dtypes[col] == 'object':
            df[col] = df[col].astype('str').astype('string')
            
# cast object dtype to string. The SageMaker FeatureStore Python SDK will then map the string dtype to String feature type.
cast_object_to_string(abalone_data)

abalone_data.head()


abalone_feature_group_name = 'abalone-fg-'+ strftime("%Y-%m-%d-%H-%M-%S", gmtime())

from sagemaker.feature_store.feature_group import FeatureGroup

record_identifier_feature_name = "abalone_id"
current_time_sec = int(round(time.time()))
abalone_data["EventTime"] = pd.Series([current_time_sec] * len(abalone_data), dtype="float64")
abalone_data.head()



In [None]:
abalone_feature_group = FeatureGroup(
    name=abalone_feature_group_name, sagemaker_session=sagemaker_session)

abalone_feature_group .load_feature_definitions(data_frame=abalone_data)
print ('done loading feature group definition') # to supress previous call output


In [None]:
sagemaker_session.boto_session.client(
    "sagemaker", region_name=region).list_feature_groups()  # We use the boto client to list FeatureGroups


In [None]:
#abalone_feature_group.delete()

# Create and ingest into feature group

In [None]:
abalone_feature_group.create(
    s3_uri=f"s3://{bucket}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name="EventTime",
    role_arn=role,
    enable_online_store=True)


def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")

wait_for_feature_group_creation_complete(abalone_feature_group)


In [None]:
abalone_feature_group.ingest(data_frame=abalone_data, wait=True)


In [None]:
# verify feature get
abalone_id = '10'
sample_record = sagemaker_session.boto_session.client(
    "sagemaker-featurestore-runtime", region_name=region
).get_record(
    FeatureGroupName=abalone_feature_group_name, RecordIdentifierValueAsString=abalone_id)

sample_record

## Create a Scikit-learn script to train with <a class="anchor" id="create_sklearn_script"></a>
To run Scikit-learn on Sagemaker `SKLearn` Estimator with a script as an entry point. The training script is very similar to a training script you might run outside of SageMaker, but you can access useful properties about the training environment through various environment variables, such as:

* SM_MODEL_DIR: A string representing the path to the directory to write model artifacts to. These artifacts are uploaded to S3 for model hosting.
* SM_OUTPUT_DIR: A string representing the filesystem path to write output artifacts to. Output artifacts may include checkpoints, graphs, and other files to save, not including model artifacts. These artifacts are compressed and uploaded to S3 to the same S3 prefix as the model artifacts.

Supposing two input channels, 'train' and 'test', were used in the call to the Chainer estimator's fit() method, the following will be set, following the format SM_CHANNEL_[channel_name]:

* SM_CHANNEL_TRAIN: A string representing the path to the directory containing data in the 'train' channel
* SM_CHANNEL_TEST: Same as above, but for the 'test' channel.

A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to model_dir so that it can be hosted later. Hyperparameters are passed to your script as arguments and can be retrieved with an argparse.

## Create SageMaker Scikit Estimator <a class="anchor" id="create_sklearn_estimator"></a>

To run our Scikit-learn training script on SageMaker, we construct a `sagemaker.sklearn.estimator.sklearn` estimator, which accepts several constructor arguments:

* __entry_point__: The path to the Python script SageMaker runs for training and prediction.
* __role__: Role ARN
* __framework_version__: Scikit-learn version you want to use for executing your model training code.
* __train_instance_type__ *(optional)*: The type of SageMaker instances for training. __Note__: Because Scikit-learn does not natively support GPU training, Sagemaker Scikit-learn does not currently support training on GPU instance types.
* __sagemaker_session__ *(optional)*: The session used to train on Sagemaker.

To see the code for the SKLearn Estimator, see here: https://github.com/aws/sagemaker-python-sdk/tree/master/src/sagemaker/sklearn

In [None]:
from sagemaker.sklearn.estimator import SKLearn

#s3_input_train = 's3://{}/{}/train'.format(bucket, prefix)


FRAMEWORK_VERSION = "0.23-1"
script_path = "sklearn_abalone_featurizer-edit.py"

sklearn_preprocessor = SKLearn(
    entry_point=script_path,
    role=role,
    framework_version=FRAMEWORK_VERSION,
    instance_type="ml.c4.xlarge",
    sagemaker_session=sagemaker_session,
)

#sklearn_preprocessor.env = {"SAGEMAKER_DEFAULT_INVOCATIONS_ACCEPT":"text/csv"}

In [None]:
sklearn_preprocessor.fit({"train": train_input})

## Batch transform our training data <a class="anchor" id="preprocess_train_data"></a>
Now that our proprocessor is properly fitted, let's go ahead and preprocess our training data. Let's use batch transform to directly preprocess the raw data and store right back into s3.

In [None]:
# Define a SKLearn Transformer from the trained SKLearn Estimator
transformer = sklearn_preprocessor.transformer(
    instance_count=1, instance_type="ml.m5.xlarge", assemble_with="Line", accept="text/csv")

In [None]:
# # Preprocess training input
transformer.transform(train_input, content_type="text/csv")
print("Waiting for transform job: " + transformer.latest_transform_job.job_name)
transformer.wait()
preprocessed_train = transformer.output_path

## Fit a LinearLearner Model with the preprocessed data <a class="anchor" id="training_model"></a>
Let's take the preprocessed training data and fit a LinearLearner Model. Sagemaker provides prebuilt algorithm containers that can be used with the Python SDK. The previous Scikit-learn job preprocessed the raw Titanic dataset into labeled, useable data that we can now use to fit a binary classifier Linear Learner model.

For more on Linear Learner see: https://docs.aws.amazon.com/sagemaker/latest/dg/linear-learner.html

In [None]:
import boto3
from sagemaker.image_uris import retrieve

ll_image = retrieve("linear-learner", boto3.Session().region_name)

In [None]:
s3_ll_output_key_prefix = "ll_training_output"
s3_ll_output_location = "s3://{}/{}/{}/{}".format(bucket, prefix, s3_ll_output_key_prefix, "ll_model")

ll_estimator = sagemaker.estimator.Estimator(
    ll_image,
    role,
    instance_count=1,
    instance_type="ml.m4.2xlarge",
    volume_size=20,
    max_run=3600,
    input_mode="File",
    output_path=s3_ll_output_location,
    sagemaker_session=sagemaker_session,
)

ll_estimator.set_hyperparameters(feature_dim=10, predictor_type="regressor", mini_batch_size=32)

ll_train_data = sagemaker.inputs.TrainingInput(
    preprocessed_train,
    distribution="FullyReplicated",
    content_type="text/csv",
    s3_data_type="S3Prefix",
)


data_channels = {"train": ll_train_data}
ll_estimator.fit(inputs=data_channels, logs=True)

# Serial Inference Pipeline with Scikit preprocessor and Linear Learner <a class="anchor" id="serial_inference"></a>


## Set up the inference pipeline <a class="anchor" id="pipeline_setup"></a>
Setting up a Machine Learning pipeline can be done with the Pipeline Model. This sets up a list of models in a single endpoint; in this example, we configure our pipeline model with the fitted Scikit-learn inference model and the fitted Linear Learner model. Deploying the model follows the same ```deploy``` pattern in the SDK.

In [None]:
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
import boto3
from time import gmtime, strftime

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

scikit_learn_inferencee_model = sklearn_preprocessor.create_model()
linear_learner_model = ll_estimator.create_model()

model_name = "inference-pipeline-" + timestamp_prefix
endpoint_name = "inference-pipeline-ep-" + timestamp_prefix
sm_model = PipelineModel(
    name=model_name, role=role, models=[scikit_learn_inferencee_model, linear_learner_model]
)

sm_model.deploy(initial_instance_count=1, instance_type="ml.c4.xlarge", endpoint_name=endpoint_name)

## Make a request to our pipeline endpoint <a class="anchor" id="pipeline_inference_request"></a>

Here we just grab the first line from the test data (you'll notice that the inference python script is very particular about the ordering of the inference request data). The ```ContentType``` field configures the first container, while the ```Accept``` field configures the last container. You can also specify each container's ```Accept``` and ```ContentType``` values using environment variables.

We make our request with the payload in ```'text/csv'``` format, since that is what our script currently supports. If other formats need to be supported, this would have to be added to the ```output_fn()``` method in our entry point. Note that we set the ```Accept``` to ```application/json```, since Linear Learner does not support ```text/csv``` ```Accept```. The prediction output in this case is trying to guess the number of rings the abalone specimen would have given its other physical features; the actual number of rings is 10.

In [None]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
fg_name=abalone_feature_group_name
feature_record_id= "10"
payload = f'{fg_name}, {feature_record_id}'
print(payload)

# isinstance(payload, str)
# #print(payload)
predictor = Predictor(
    endpoint_name=endpoint_name,
    sagemaker_session=None,
    serializer=CSVSerializer(),
    Content_Type="text/csv",
    Accept="text/csv"
)
print(predictor.predict(payload))


## Delete Endpoint <a class="anchor" id="delete_endpoint"></a>
Once we are finished with the endpoint, we clean up the resources!

In [None]:
sm_client = sagemaker_session.boto_session.client("sagemaker")
sm_client.delete_endpoint(EndpointName=endpoint_name)

In [None]:
# boto_session = boto3.Session()
# boto_fs_client = boto_session.client(service_name='sagemaker-featurestore-runtime', 
#                                      region_name='ap-southeast-2')
# from io import StringIO

# payload = f'{fg_name}, {feature_record_id}'
# df = pd.read_csv(StringIO(input_data), header=None)
# print(df)
# #df.info()
# fg_name = df.iloc[0,0]
# print(fg_name)

# input_feat_id = df.iloc[0, 1]
# print(input_feat_id)

# def input_fn(input_data, content_type):
#     if content_type == "text/csv":
#         # Read the raw input data as CSV.
#         df = pd.read_csv(StringIO(input_data), header=None)
#         print('Input data: ', df)
#         if len(df.columns) == len(feature_columns_names) + 1:
#             # This is a labelled example, includes the ring label
#             df.columns = feature_columns_names + [label_column]
#         elif len(df.columns) == len(feature_columns_names):
#             # This is an unlabelled example.
#             df.columns = feature_columns_names
#         elif len(df.columns) < len(feature_columns_names):
#             #params = input_data.split(',')
#             fg_name = df.iloc[0,0]
#             print('fg_name: ', fg_name)
#             input_feat_id = df.iloc[0, 1]
#             print(input_feat_id)
#             rec = boto_fs_client.get_record(FeatureGroupName=fg_name, RecordIdentifierValueAsString=str(input_feat_id),FeatureNames=feature_columns_names)
#             feats = rec.get('Record', None)
#             #print(feats)
#             features= [','.join(i['ValueAsString'] for i in Record)]
#             df = pd.DataFrame([sub.split(",") for sub in features], index=None)
#             df.columns = feature_columns_names
#     return df


In [None]:
# content_type="text/csv"
# input_fn(input_data, content_type)