##  An MLOps Pipeline With Training, Model Registry, and Batch Inference Harness SageMaker Pipelines With Batch Inference

### Import Libraries

In [None]:
import os
import boto3
import re
import time
import json
from sagemaker import get_execution_role, session
import pandas as pd
from time import gmtime, strftime
import sagemaker
from sagemaker.model import Model
from sagemaker.image_uris import retrieve
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.model_step import ModelStep
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.parameters import ParameterString
from sagemaker.estimator import Estimator

In [None]:
region = boto3.Session().region_name
sagemaker_session = sagemaker.Session()
s3_prefix = 'xgboost-example'
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
print("RoleArn: {}".format(role))
from sagemaker.workflow.pipeline import Pipeline

# We also instantiate a Pipeline Session which ensures none of our steps run standalone and are only conducted when the Pipeline is executed.

pipeline_session = PipelineSession()

## Download the Dataset (Abalone Dataset)

In [None]:
!aws s3 cp s3://sagemaker-sample-files/datasets/tabular/uci_abalone/train_csv/abalone_dataset1_train.csv .
!aws s3 cp abalone_dataset1_train.csv s3://{default_bucket}/xgboost-regression/train.csv
training_path = 's3://{}/xgboost-regression/train.csv'.format(default_bucket)

- Since we’re working with Batch Inference we need to ensure that our dataset is in a format that is compliant with the SageMaker XGBoost algorithm. The SageMaker XGBoost algorithm expects for the target column to be removed from the test dataset. Hence, we drop this column and create a test set for our Batch Inference step and upload it to S3.

In [None]:
import pandas as pd

test = pd.read_csv('abalone_dataset1_train.csv')
test = test.iloc[: , 1:]
test.to_csv('test.csv', index=False)

#Create a sagemaker session to be able to upload data to s3
import boto3
import sagemaker
sagemaker_session = sagemaker.Session()

#Uploading test data to S3 bucket
prefix = "xgb-test-batch-abalone"
test_data_path = sagemaker_session.upload_data('test.csv', key_prefix=prefix + '/test')

- Next we parameterize our Pipeline with the input data locations and hardware necessary for the Training and Batch Inference portions of the Pipeline.

In [None]:
# Pipeline Parameters

training_input_param = ParameterString(
    name = "training_input",
    default_value=training_path,
)

test_data_param = ParameterString(
    name = "test_input",
    default_value=test_data_path,
)

training_instance_param = ParameterString(
    name = "training_instance",
    default_value = "ml.c5.xlarge")

batch_transform_param = ParameterString(
    name = "batch_inference",
    default_value = "ml.m5.xlarge")

- Retrieve the AWS provided container for XGBoost that will be utilized for training and inference.

In [None]:
model_path = f's3://{default_bucket}/{s3_prefix}/xgb_model'

image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=training_instance_param,
)

image_uri

## Training Step

- Build out the Training portion via SageMaker Pipelines. For training with the XGBoost algorithm we create an object that points towards the hardware we need for our Training Job as well as the necessary hyperparameters to solve a regression problem using XGBoost.

In [None]:
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_param,
    instance_count=1,
    output_path=model_path,
    sagemaker_session=pipeline_session,
    role=role
)

xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=40,
    max_depth=4,
    eta=0.1,
    gamma=3,
    min_child_weight=5,
    subsample=0.6,
    silent=0,
)

- Encapsulate this object in a Training Step within SageMaker Pipelines and point towards our parameter with the training data.

In [None]:
train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=training_input_param,
            content_type="text/csv",
        )
    }
)

training_step = TrainingStep(
    name="Training",
    step_args=train_args,
)

# Create Model & Register Model Steps
  - Create and catalog a SageMaker Model object. First we want to define a SageMaker Model from the Training Job that we defined in the previous step of the Pipeline. To do so we point towards the model artifacts that the Training Step generates and create our Model object using a ModelStep.


In [None]:
model = Model(
    image_uri=image_uri,
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    role=role,
    sagemaker_session=pipeline_session
)

# Step to create a SageMaker Model
create_model_step = ModelStep(
    name="CreateXGBoostModel",
    step_args=model.create(),
)

## Model Registry

In [None]:
from sagemaker.workflow.step_collections import RegisterModel

register_step = RegisterModel(
 name="AbaloneRegisterModel",
 model=model,
 content_types=["text/csv"],
 response_types=["text/csv"],
 inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
 transform_instances=["ml.m5.xlarge"],
 model_package_group_name='batchgroup',
)

## Batch Transform Step

  - Unlike SageMaker Real-Time Inference there is no REST Endpoint we are creating. With Batch Inference there’s a Transformer object that you can define that captures your created Model, the hardware for inference, and the acceptable data formats for the Transform Job.

In [None]:
transformer = Transformer(model_name=create_model_step.properties.ModelName,
                          instance_count=1, instance_type=batch_transform_param,
                          assemble_with="Line", accept="text/csv",
                          sagemaker_session=PipelineSession())

In [None]:
# We then wrap this up in a Transform Step and point towards the paramter we defined with our test dataset.

transform_step = TransformStep(
    name="AbaloneTransform",
    step_args=transformer.transform(data=test_data_param,
                                    content_type = "text/csv"),
)

## Pipeline Execution

In [None]:
pipeline = Pipeline(
    name="batch-pipeline-abalone",
    steps=[training_step, create_model_step, register_step, transform_step],
    parameters= [training_input_param, training_instance_param, test_data_param, batch_transform_param]
)
# We can then execute the Pipeline with the following code, this specific pipeline execution should take about five minutes.

pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.wait()