# MLOps Manual to Repeatable Workflow

## Contents

- [Introduction](#Introduction)
- [Training pipeline with SageMaker Pipelines](#Training-pipeline-with-SageMaker-Pipelines)
    - [Pipeline inputs](#Pipeline-inputs)
    - [SageMaker Processing step](#SageMaker-Processing-step)
    - [SageMaker Training step](#SageMaker-Training-step)
    - [Model evaluation step](#Model-evaluation-step)
    - [Register model in Model Registry step](#Register-model-in-Model-Registry-step)
    - [Assemble the training pipeline](#Assemble-the-training-pipeline)
    - [Execute the training pipeline](#Execute-the-training-pipeline)
- [Deployment pipeline with SageMaker Pipelines](#Deployment-pipeline-with-SageMaker-Pipelines)
    - [Assemble the deployment pipeline](#Assemble-the-deployment-pipeline)
    - [Execute the deployment pipeline](#Execute-the-deployment-pipeline)
    - [Test the SageMaker endpoint](#Test-the-SageMaker-endpoint)

## Introduction

This is our fourth notebook which will explore the orchestration stage of ML workflow.

Here, we will put on the hat of a `DevOps/MLOps Engineer` and perform the task of orchestration which includes building pipeline steps that include all the previous notebooks components into one singular entity. This pipeline entity accomplishes a repeatable and reliable orchestration of each step in the ML workflow.

For this task we will be using Amazon SageMaker Pipeline capabilities. We will be creating two SageMaker Pipelines, one for model training and one for model deployment.

<div>
<img src="./pipeline_scripts/images/training_and_deployment_pipelines.png" width="450"/>
</div>

Let's get started!

**Imports**

In [None]:
%store -r

In [None]:
# Processing imports
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor

# SageMaker Pipeline imports
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CreateModelStep, TransformStep
from sagemaker.workflow.model_step import ModelStep

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

# Other imports
import json
import time
from time import gmtime, strftime
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.model import Model
from sagemaker.tuner import IntegerParameter, HyperparameterTuner
from sagemaker.inputs import TrainingInput
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)

# To test the endpoint once it's deployed
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer, CSVDeserializer
from sagemaker.workflow.pipeline_context import PipelineSession
import sagemaker
import json
import boto3
from sagemaker.model_metrics import ModelMetrics, MetricsSource
import pandas as pd
from sagemaker.feature_store.feature_group import FeatureGroup

**Session variables**

In [None]:
def create_lambda_iam_role(role_name):
    iam = boto3.client("iam")
    try:
        response = iam.create_role(
            RoleName = role_name,
            AssumeRolePolicyDocument = json.dumps({
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Principal": {
                            "Service": "lambda.amazonaws.com"
                        },
                        "Action": "sts:AssumeRole"
                    }
                ]
            }),
            Description='Role for Lambda to call SageMaker'
        )

        role_arn = response['Role']['Arn']

        response = iam.attach_role_policy(
            RoleName=role_name,
            PolicyArn='arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
        )

        response = iam.attach_role_policy(
            PolicyArn='arn:aws:iam::aws:policy/AmazonSageMakerFullAccess',
            RoleName=role_name
        )

        return role_arn

    except iam.exceptions.EntityAlreadyExistsException:
        print(f'Using ARN from existing role: {role_name}')
        response = iam.get_role(RoleName=role_name)
        print("Done")
        return response['Role']['Arn']
    try:
        response = iam.create_role(
            RoleName = role_name,
            AssumeRolePolicyDocument = json.dumps({
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Principal": {
                            "Service": "lambda.amazonaws.com"
                        },
                        "Action": "sts:AssumeRole"
                    }
                ]
            }),
            Description='Role for Lambda to call SageMaker'
        )

        role_arn = response['Role']['Arn']

        response = iam.attach_role_policy(
            RoleName=role_name,
            PolicyArn='arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
        )

        response = iam.attach_role_policy(
            PolicyArn='arn:aws:iam::aws:policy/AmazonSageMakerFullAccess',
            RoleName=role_name
        )
        print("Done")

        return role_arn

    except iam.exceptions.EntityAlreadyExistsException:
        print(f'Using ARN from existing role: {role_name}')
        response = iam.get_role(RoleName=role_name)
        print("Done")
        return response['Role']['Arn']

In [None]:
# Useful SageMaker variables
session = PipelineSession()
bucket = session.default_bucket()
role_arn= sagemaker.get_execution_role()
region = session.boto_region_name
sagemaker_client = boto3.client('sagemaker')
aws_account_id = boto3.client('sts').get_caller_identity().get('Account')
lambda_role = create_lambda_iam_role('LambdaSageMakerExecutionRole')

## Training pipeline with SageMaker Pipelines

An Amazon [SageMaker Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) pipeline is a series of interconnected steps that is defined by a JSON pipeline definition. This pipeline definition encodes a pipeline using a directed acyclic graph (DAG). This DAG gives information on the requirements for and relationships between each step of your pipeline. The structure of a pipeline's DAG is determined by the data dependencies between steps. These data dependencies are created when the properties of a step's output are passed as the input to another step. The following image is a pipeline DAG that we'll be creating for our training pipeline:

![](./pipeline_scripts/images/sagemaker-pipelines-dag.png)

You can also include other steps to your pipeline, for example for performing Hyperparameter Optimization (HPO) on your training pipeline. [Pipeline Steps Types](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#build-and-manage-steps-types) has a list of all posible pipeline step types that you can use to build your pipeline and [this workshop](https://aws.amazon.com/getting-started/hands-on/machine-learning-tutorial-mlops-automate-ml-workflows/#) explains how to build a SageMaker pipeline with steps for data bias check and model explainability.

#### Pipeline inputs

You can give a pipeline inputs to make it reusable (you'll be able to override these inputs upon executing the pipeline later in the notebook).

In [None]:
processing_instance_count = ParameterInteger(
    name='ProcessingInstanceCount',
    default_value=1
)

processing_instance_type = ParameterString(
    name='ProcessingInstanceType',
    default_value='ml.m5.xlarge'
)

#### SageMaker Processing step

This should look very similar to the SageMaker Training job you did in notebook 2. The only new line of code is the `ProcessingStep` line at the bottom of the cell below.

In [None]:
preprocess_data_processor = SKLearnProcessor(
    framework_version='0.23-1',
    role=role_arn,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name='preprocess-data',
    sagemaker_session=session,
)

preprocess_dataset_step = ProcessingStep(
    name='PreprocessData',
    code='./pipeline_scripts/preprocessing.py',
    processor=preprocess_data_processor,
    inputs=[
        ProcessingInput(
            source=raw_s3,
            destination='/opt/ml/processing/input',
            s3_data_distribution_type='ShardedByS3Key'
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name='train',
            destination=f'{output_path}/train',
            source='/opt/ml/processing/train'
        ),
        ProcessingOutput(
            output_name='validation',
            destination=f'{output_path}/validation',
            source='/opt/ml/processing/validation'
        ),
        ProcessingOutput(
            output_name='test',
            destination=f'{output_path}/test',
            source='/opt/ml/processing/test'
        )
    ]
)

#### SageMaker Training step

This should look very similar to the SageMaker Training job you did in notebook 2. The only new line of code is the `TrainingStep` line at the bottom of the cell below.

In [None]:
# Tuned hyperparameters
hyperparameters = {
    "max_depth": "7",
    "gamma": "2",
    "alpha": "375",
    "objective": "reg:squarederror",
    "num_round": "50",
    "verbosity": "2",
    "eval_metric": "mse"
}

train_instance_type = 'ml.c5.xlarge'


# this line automatically looks for the XGBoost image URI and builds an XGBoost container.
# specify the repo_version depending on your preference.
#xgboost_container = sagemaker.image_uris.retrieve("xgboost", region, "1.5-1")
xgboost_container = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
# construct a SageMaker estimator that calls the xgboost-container
estimator = sagemaker.estimator.Estimator(
    image_uri=xgboost_container, 
    hyperparameters=hyperparameters,
    role=role_arn,
    instance_count=1, 
    instance_type='ml.m5.2xlarge', 
    volume_size=5, # 5 GB 
)

training_step = TrainingStep(
    name='TrainModel',
    estimator=estimator,
    inputs={
        'train': TrainingInput(
            s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
                'train'
            ].S3Output.S3Uri,
            content_type='text/csv'
        ),
        'validation': TrainingInput(
            s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
                'validation'
            ].S3Output.S3Uri,
            content_type='text/csv'
        )
    },
)

#### Model evaluation step

After the training step in our pipeline, we'll want to then evaluate our model's performance. To do that, we can create a SageMaker Processing Step and pass in some code to do the model evaluation.

In [None]:
evaluation_processor = ScriptProcessor(
    image_uri=xgboost_container,
    command=["python3"],
    role=role_arn,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name='evaluation',
    sagemaker_session=session,
)

In [None]:
# Specify where we'll store the model evaluation results so
# that other steps can access those results
evaluation_report = PropertyFile(
    name='EvaluationReport',
    output_name='evaluation',
    path='evaluation.json',
)

evaluation_step = ProcessingStep(
    name='EvaluateModel',
    processor=evaluation_processor,
    inputs=[
        ProcessingInput(
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination='/opt/ml/processing/model',
        ),
        ProcessingInput(
            source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri,
            destination='/opt/ml/processing/test',
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name='evaluation', source='/opt/ml/processing/evaluation'
        ),
    ],
    code='./pipeline_scripts/evaluation.py',
    property_files=[evaluation_report],
)

#### Register model in Model Registry step

Once we've evaluated the model's peformance, we'll want to register the model in a Model Registry.

In [None]:
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri='{}/evaluation.json'.format(
            evaluation_step.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output'][
                'S3Uri'
            ]
        ),
        content_type='application/json',
    )
)

model = Model(
    image_uri=estimator.training_image_uri(),
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    entry_point=estimator.entry_point,
    role=role_arn,
    sagemaker_session=session
)

model_registry_args = model.register(
    content_types=['text/csv'],
    response_types=['application/json'],
    inference_instances=['ml.m5.xlarge'],
    transform_instances=['ml.m5.xlarge'],
    model_package_group_name=model_package_group_name,
    approval_status='PendingManualApproval',
    model_metrics=model_metrics
)

register_step = ModelStep(
    name='RegisterModel',
    step_args=model_registry_args
)

But we'll only want to register the model if its performance meets a predefined threshold that we set. So let's create a Condition Step that says if our model's MSE values is less than 80000000.0, then we'll registery the model.

In [None]:
# Condition step for evaluating model quality and branching execution

cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path='regression_metrics.mse.value',
    ),
    right=80000000.0,
)
condition_step = ConditionStep(
    name='CheckEvaluation',
    conditions=[cond_lte],
    if_steps=[register_step],
    else_steps=[],
)

#### Assemble the training pipeline

Though easier to reason with, the parameters and steps don't need to be in order. The pipeline DAG will parse it out properly.

In [None]:
# pipeline_name = 'synthetic-housing-training-pipeline-{}'.format(strftime('%d-%H-%M-%S', gmtime()))
pipeline_name = 'synthetic-housing-training-pipeline'
step_list = [preprocess_dataset_step,
             training_step,
             evaluation_step,
             condition_step]

training_pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type
    ],
    steps=step_list
)

# Note: If an existing pipeline has the same name it will be overwritten.
training_pipeline.upsert(role_arn=role_arn)

# Viewing the pipeline definition will all the string variables interpolated may help debug pipeline bugs. It is commented out here due to length.
#json.loads(training_pipeline.definition())

#### Execute the training pipeline

In [None]:
execution = training_pipeline.start(
    parameters = {
        'ProcessingInstanceType': 'ml.m5.large'
    }
)

Check on status of pipeline

In [None]:
execution.describe()

In [None]:
execution.list_steps()

## Deployment pipeline with SageMaker Pipelines

Now let's create a separate pipeline that will take the model that was registered in Model Registry and deploy it as a SageMaker hosted endpoint.

First we'll specify the input parameters to our deployment pipeline so that we can reuse it.

In [None]:
model_name = ParameterString(
    name='ModelName',
    default_value='my-awesome-model'
)

Next, we'll create a Lambda function that will pull the specified model (or latest model) from the Model Registry and deploy as a Sagemaker endpoint.

In [None]:
lambda_name = 'sagemaker-pipelines-deploy-model'

lambda_function = Lambda(
    function_name=lambda_name,
    execution_role_arn=lambda_role,
    script='./pipeline_scripts/lambda_deploy.py',
    handler='lambda_deploy.lambda_handler',
    timeout=600,
    memory_size=3000,
)

try:
    lambda_function_response = lambda_function.create()
    lambda_function_arn = lambda_function_response['FunctionArn']
    print(f'Lambda function arn: {lambda_function_arn}')
except:
    print('Lambda function already exists!')

Now we'll create a Lambda step for our pipeline and associate it with the new Lambda function we just created.

In [None]:
# The dictionary retured by the Lambda function is captured by LambdaOutput, each key in the dictionary corresponds to a
# LambdaOutput

output_param_1 = LambdaOutput(output_name='statusCode', output_type=LambdaOutputTypeEnum.String)
output_param_2 = LambdaOutput(output_name='body', output_type=LambdaOutputTypeEnum.String)

deploy_lambda_step = LambdaStep(
    name='LambdaStepDeploy',
    lambda_func=lambda_function,
    inputs={
        'region': region,
        'aws_account_id': aws_account_id,
        'model_package_group_name': model_package_group_name,
        'model_name': model_name,
        'instance_count': 1,
        'role_arn': role_arn
    },
    outputs=[
        output_param_1, 
        output_param_2
    ],
)

Excellent, now we just need to assemble the pipeline.

#### Assemble the deployment pipeline

In [None]:
# pipeline_name = 'synthetic-housing-deployment-pipeline-{}'.format(strftime('%d-%H-%M-%S', gmtime()))
pipeline_name = 'synthetic-housing-deployment-pipeline'
step_list = [deploy_lambda_step]

deployment_pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        model_name
    ],
    steps=step_list
)

# Note: If an existing pipeline has the same name it will be overwritten.
deployment_pipeline.upsert(role_arn=role_arn)

# Viewing the pipeline definition will all the string variables interpolated may help debug pipeline bugs. It is commented out here due to length.
json.loads(deployment_pipeline.definition())

#### Execute the deployment pipeline

In [None]:
deployed_model_name = 'my-xgboost-model'
execution = deployment_pipeline.start(
    parameters = {
        'ModelName': deployed_model_name
    }
)

Check on status of pipeline

In [None]:
execution.describe()

In [None]:
execution.list_steps()

#### Test the SageMaker endpoint

Let's now send some data to the endpoint and test it is working properly.

For this, we first load our test data from Feature Store

In [None]:
# Read in test set that was used for batch transform
fs_group = FeatureGroup(name=test_feature_group_name, sagemaker_session=session)  
query = fs_group.athena_query()
table = query.table_name
query_string = f'SELECT {features_to_select} FROM "sagemaker_featurestore"."{table}"  ORDER BY record_id'
query_results= 'sagemaker-featurestore'
output_location = f's3://{bucket}/{query_results}/query_results/'
query.run(query_string=query_string, output_location=output_location)
query.wait()
df = query.as_dataframe()
df.head()

Then we query the endpoint once it is available

In [None]:
response_status = 'None'
while response_status != 'InService':
    if response_status != 'None':
        print(f'Waiting for the endpoint deployment to finish. Current endpoint status: {response_status}')
        time.sleep(120) # wait until endpoint is in service
    response = sagemaker_client.describe_endpoint(
        EndpointName=deployed_model_name+'-endpoint'
    )
    response_status = response['EndpointStatus']
# Attach to the SageMaker endpoint
predictor = Predictor(endpoint_name=deployed_model_name+'-endpoint',
                      sagemaker_session=session,
                      serializer=CSVSerializer(),
                      deserializer=CSVDeserializer())

# Get a real-time prediction
predictor.predict(df.drop(columns=["price"]).to_csv(index=False, header=False))[0]