## Multi-Model Endpoints With SageMaker Pipelines

In this example we take a look at how we can train and deploy a Multi-Model Endpoint (MME) utilizing SageMaker Pipelines. We will train two sample XGBoost models on the same dataset, both will have different hyperparameters. For a real-world use-case you can scale this up to your appropriate number of training jobs, please check AWS limits for how many concurrent training jobs you can run via Pipelines at the same time.

At the moment MME is not supported out of the box via SageMaker Pipelines, we will thus utilize a Lambda step to create a MME after extracting the training artifacts from our two jobs.

### Imports

In [None]:
import os
import boto3
import re
import time
import json
from sagemaker import get_execution_role, session
import pandas as pd

from time import gmtime, strftime
import sagemaker
from sagemaker.model import Model
from sagemaker.image_uris import retrieve
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.model_step import ModelStep
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.parameters import ParameterString
from sagemaker.estimator import Estimator

region = boto3.Session().region_name
sagemaker_session = sagemaker.Session()
s3_prefix = 'xgboost-example'
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
print("RoleArn: {}".format(role))

# Custom Lambda Step
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.pipeline import Pipeline

### Setup

Create Pipeline Session and any necessary Parameters. We retrieve the training dataset and define the training instance type and define these as two parameters for the model.

In [None]:
pipeline_session = PipelineSession()

In [None]:
!aws s3 cp s3://sagemaker-sample-files/datasets/tabular/uci_abalone/train_csv/abalone_dataset1_train.csv .

In [None]:
!aws s3 cp abalone_dataset1_train.csv s3://{default_bucket}/xgboost-regression/train.csv

In [None]:
training_path = 's3://{}/xgboost-regression/train.csv'.format(default_bucket)
training_path

In [None]:
training_input_param = ParameterString(
    name = "training_input",
    default_value=training_path,
)

training_instance_param = ParameterString(
    name = "training_instance",
    default_value = "ml.c5.xlarge")

In [None]:
model_path = f's3://{default_bucket}/{s3_prefix}/xgb_model'

image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=training_instance_param,
)

image_uri

### Configure Training

In [None]:
xgb_train_one = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_param,
    instance_count=1,
    output_path=model_path,
    sagemaker_session=pipeline_session,
    role=role
)

xgb_train_one.set_hyperparameters(
    objective="reg:linear",
    num_round=40,
    max_depth=4,
    eta=0.1,
    gamma=3,
    min_child_weight=5,
    subsample=0.6,
    silent=0,
)

In [None]:
xgb_train_two = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_param,
    instance_count=1,
    output_path=model_path,
    sagemaker_session=pipeline_session,
    role=role
)

#adjusting hyperparams
xgb_train_two.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0,
)

In [None]:
train_args_one = xgb_train_one.fit(
    inputs={
        "train": TrainingInput(
            s3_data=training_input_param,
            content_type="text/csv",
        )
    }
)

train_args_two = xgb_train_two.fit(
    inputs={
        "train": TrainingInput(
            s3_data=training_input_param,
            content_type="text/csv",
        )
    }
)

In [None]:
step_train_one = TrainingStep(
    name="TrainOne",
    step_args=train_args_one,
)

step_train_two = TrainingStep(
    name = "TrainTwo",
    step_args= train_args_two
)

### Lambda Step

#### IAM Helper

We can grab an existing IAM script from this [example](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/lambda-step/iam_helper.py) that essentially gives our Lambda permissions to work with SageMaker.

In [None]:
%%writefile iam_helper.py

import boto3
import json

iam = boto3.client("iam")


def create_lambda_role(role_name):
    try:
        response = iam.create_role(
            RoleName=role_name,
            AssumeRolePolicyDocument=json.dumps(
                {
                    "Version": "2012-10-17",
                    "Statement": [
                        {
                            "Effect": "Allow",
                            "Principal": {"Service": "lambda.amazonaws.com"},
                            "Action": "sts:AssumeRole",
                        }
                    ],
                }
            ),
            Description="Role for Lambda to call SageMaker functions",
        )

        role_arn = response["Role"]["Arn"]

        response = iam.attach_role_policy(
            RoleName=role_name,
            PolicyArn="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
        )

        response = iam.attach_role_policy(
            PolicyArn="arn:aws:iam::aws:policy/AmazonSageMakerFullAccess", RoleName=role_name
        )

        return role_arn

    except iam.exceptions.EntityAlreadyExistsException:
        print(f"Using ARN from existing role: {role_name}")
        response = iam.get_role(RoleName=role_name)
        return response["Role"]["Arn"]

In [None]:
from iam_helper import create_lambda_role

lambda_role = create_lambda_role("lambda-deployment-role")

### Lambda Function

Here we configure our Lambda function to deploy the MME from the two models we trained. 

In [None]:
!mkdir code

In [None]:
%%writefile code/lambda_helper.py

"""
The Lambda function extracts the two model.tar.gz from the training jobs and places them in a singular S3 location as 
needed for MME. We then deploy a real-time SageMaker MME utilizing boto3.
"""

import json
import boto3
from time import gmtime, strftime

sm_client = boto3.client("sagemaker")
s3 = boto3.resource('s3')

def extract_bucket_key(model_data):
    """
    Extracts the bucket and key from the model data tarballs that we are passing in
    """
    bucket = model_data.split('/', 3)[2]
    key = model_data.split('/', 3)[-1]
    return [bucket, key]

def create_mme_dir(model_data_dir):
    """
    Takes in a list of lists with the different trained models, 
    creates a central S3 bucket/key location with all model artifacts for MME.
    """
    bucket_name = model_data_dir[0][0]
    for i, model_data in enumerate(model_data_dir):
        copy_source = {
              'Bucket': bucket_name,
              'Key': model_data[1]
            }
        bucket = s3.Bucket(bucket_name)
        destination_key = 'xgboost-mme-pipelines/model-{}.tar.gz'.format(i)
        bucket.copy(copy_source, destination_key)
    mme_s3_path = 's3://{}/xgboost-mme-pipelines/'.format(bucket_name)
    return mme_s3_path
    
def lambda_handler(event, context):
    """ """
    
    model_artifacts_one = event["model_artifacts_one"]
    model_artifacts_two = event["model_artifacts_two"]
    
    #Extract S3 metadata of trained artifacts
    model_data_one = extract_bucket_key(model_artifacts_one)
    model_data_two = extract_bucket_key(model_artifacts_two)
    model_data_dir = [model_data_one, model_data_two]
    
    #Create a central S3 path with all model tarballs
    model_url = create_mme_dir(model_data_dir)
    image_uri = '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:1.0-1-cpu-py3'
    
    
    model_name = 'mme-source' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
    create_model_response = sm_client.create_model(
        ModelName=model_name,
        Containers=[
            {
                "Image": image_uri,
                "Mode": "MultiModel",
                "ModelDataUrl": model_url
            }
        ],
        #to-do parameterize this
        ExecutionRoleArn='arn:aws:iam::474422712127:role/sagemaker-role-BYOC',
    )
    print("Model Arn: " + create_model_response["ModelArn"])
    
    
    #Step 2: EPC Creation
    xgboost_epc_name = "mme-source" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
    endpoint_config_response = sm_client.create_endpoint_config(
        EndpointConfigName=xgboost_epc_name,
        ProductionVariants=[
            {
                "VariantName": "xgbvariant",
                "ModelName": model_name,
                "InstanceType": "ml.c5.large",
                "InitialInstanceCount": 1
            },
        ],
    )
    print("Endpoint Configuration Arn: " + endpoint_config_response["EndpointConfigArn"])
    
    #Step 3: EP Creation
    endpoint_name = "mme-source" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
    create_endpoint_response = sm_client.create_endpoint(
        EndpointName=endpoint_name,
        EndpointConfigName=xgboost_epc_name,
    )
    print("Endpoint Arn: " + create_endpoint_response["EndpointArn"])
    

    return {
        "statusCode": 200,
        "body": json.dumps("Created Endpoint!"),
        "endpoint_name": endpoint_name
    }

In [None]:
current_time = time.strftime("%m-%d-%H-%M-%S", time.localtime())

function_name = "sagemaker-lambda-step-endpoint-deploy-" + current_time

# Lambda helper class can be used to create the Lambda function
func = Lambda(
    function_name=function_name,
    execution_role_arn=lambda_role,
    script="code/lambda_helper.py",
    handler="lambda_helper.lambda_handler",
)

output_param_1 = LambdaOutput(output_name="statusCode", output_type=LambdaOutputTypeEnum.String)
output_param_2 = LambdaOutput(output_name="body", output_type=LambdaOutputTypeEnum.String)
output_param_3 = LambdaOutput(output_name="endpoint_name", output_type=LambdaOutputTypeEnum.String)

step_deploy_lambda = LambdaStep(
    name="LambdaStep",
    lambda_func=func,
    inputs={
        "model_artifacts_one": step_train_one.properties.ModelArtifacts.S3ModelArtifacts,
        "model_artifacts_two": step_train_two.properties.ModelArtifacts.S3ModelArtifacts
    },
    outputs=[output_param_1, output_param_2, output_param_3],
)

In [None]:
pipeline = Pipeline(
    name="mme-pipeline",
    steps=[step_train_one, step_train_two, step_deploy_lambda],
    parameters= [training_input_param, training_instance_param]
)

### Pipeline Execution

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
execution = pipeline.start()

In [None]:
execution.wait()

### Sample Inference

In [None]:
endpoint_name = execution.list_steps()[0]['Metadata']['Lambda']['OutputParameters'][0]['Value']
endpoint_name

In [None]:
import boto3
smr = boto3.client('sagemaker-runtime') #client for inference

In [None]:
resp = smr.invoke_endpoint(EndpointName=endpoint_name, Body=b'.345,0.224414,.131102,0.042329,.279923,-0.110329,-0.099358,0.0', 
                           ContentType='text/csv', TargetModel = 'model-0.tar.gz')

print(resp['Body'].read())