# Module 8: Build a pipeline using Amazon SagerMaker Pipelines

In this final notebook, you will build a model build workflow that orchestrates the preprocessing and training steps and registers the serial inference pipeline model in the SageMaker Model Registry. You will use Amazon SageMaker Pipelines for the workflow orchestration and lineage.

Orchestrating and automating the model build workflow is preliminary to any ML CI/CD, since CI/CD automations must be capable of executing the steps that lead to the generation of a model, which can vary based on the use case. The idea is that a typical "build" stage of CI/CD will execute a workflow that has been previously defined by a Data Scientist.

Amazon SageMaker Pipelines  supports a pipeline Domain Specific Language (DSL), which is a declarative JSON specification. This DSL defines a Directed Acyclic Graph (DAG) of pipeline parameters and steps. The SageMaker Python SDK streamlines the generation of the pipeline DSL using constructs that are already familiar to engineers and scientists alike.

SageMaker Model Registry is where trained models are stored, versioned, and managed. Data Scientists and Machine Learning Engineers can compare model versions, approve models for deployment, and deploy models from different AWS accounts, all from a single Model Registry.

Let's define the variables first.

In [None]:
import sagemaker
import boto3
import time

role = sagemaker.get_execution_role()
region = boto3.Session().region_name
sagemaker_session = sagemaker.Session()
bucket_name = sagemaker_session.default_bucket()
prefix = 'end-to-end-ml'

print(region)
print(role)
print(bucket_name)

In [None]:
%store -r experiment_name

print(experiment_name)

<h2>Define Pipeline</h2>

In this section, you will define a model build workflow for the pre-processing and training operations you executed manually in the previous notebooks. The workflow definition will also include steps to register the model in the SageMaker model registry.

Your objective is defining a pipeline as shown below: 

![Workflow](./workflow.png)

The pipeline will execute the following steps:
- Run a SageMaker Processing job to execute data preparation and generate a featurizer model
    - Repack the featurizer model to bundle inference scripts
- Run a SageMAker Training job to train the XGBoost model
    - Repack the XGBoost model to bundle inference scripts
- Register a serial inference pipeline of the two models in the SageMaker Model Registry

Note: SageMaker automatically adds the model repack steps to convert the models to a format suitable for inference when custom inference logic is required. You will not add these steps explicitly.

<h3>Pipeline parameters</h3>

You define workflow parameters for the pipeline so that you can vary the values without having to modify the workflow definition.

The supported parameter types include:

* `ParameterString` - representing a `str` Python type
* `ParameterInteger` - representing an `int` Python type
* `ParameterFloat` - representing a `float` Python type

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

# ---------------------
# Processing parameters
# ---------------------

# The path to the raw data.
raw_data_path = 's3://{0}/{1}/data/raw/'.format(bucket_name, prefix)
raw_data_path_param = ParameterString(name="raw_data_path", default_value=raw_data_path)

# The output path to the training data.
train_data_path = 's3://{0}/{1}/data/preprocessed/train/'.format(bucket_name, prefix)
train_data_path_param = ParameterString(name="train_data_path", default_value=train_data_path)

# The output path to the validation data.
val_data_path = 's3://{0}/{1}/data/preprocessed/val/'.format(bucket_name, prefix)
val_data_path_param = ParameterString(name="val_data_path", default_value=val_data_path)

# The output path to the test data.
test_data_path = 's3://{0}/{1}/data/preprocessed/test/'.format(bucket_name, prefix)
test_data_path_param = ParameterString(name="test_data_path", default_value=test_data_path)

# The output path to the featurizer model.
model_path = 's3://{0}/{1}/output/sklearn/model.tar.gz'.format(bucket_name, prefix)
model_path_param = ParameterString(name="model_path", default_value=model_path)

# The instance type for the processing job.
processing_instance_type_param = ParameterString(name="processing_instance_type", default_value='ml.m5.large')

# The instance count for the processing job.
processing_instance_count_param = ParameterInteger(name="processing_instance_count", default_value=1)

# The train/test split ration parameter.
train_test_split_ratio_param = ParameterString(name="train_test_split_ratio", default_value='0.2')

# -------------------
# Training parameters
# -------------------
        
# XGB hyperparameters.
max_depth_param = ParameterString(name="max_depth", default_value='3')
eta_param = ParameterString(name="eta", default_value='0.1')
gamma_param = ParameterString(name="gamma", default_value='0')
min_child_weight_param = ParameterString(name="min_child_weight", default_value='1')
objective_param = ParameterString(name="objective", default_value='binary:logistic')
num_round_param = ParameterString(name="num_round", default_value='10')
eval_metric_param = ParameterString(name="eval_metric", default_value='auc')

# The instance type for the training job.
training_instance_type_param = ParameterString(name="training_instance_type", default_value='ml.m5.xlarge')

# The instance count for the training job.
training_instance_count_param = ParameterInteger(name="training_instance_count", default_value=1)

# The training output path for the model.
output_path = 's3://{0}/{1}/output/'.format(bucket_name, prefix)
output_path_param = ParameterString(name="output_path", default_value=output_path)

# --------------------------
# Register model parameters
# --------------------------

# The default intance type for deployment.
deploy_instance_type_param = ParameterString(name="deploy_instance_type", default_value='ml.m5.2xlarge')

# The approval status for models added to the registry.
model_approval_status_param = ParameterString(name="model_approval_status", default_value='PendingManualApproval')


<h3>Processing Step</h3>

Now, you define the processing step that will prepare the dataset, as seen in module <a href="../03_feature_Engineering/03_feature_engineering.ipynb">03_feature_engineering</a>.

In [None]:
!pygmentize ../03_feature_engineering/source_dir/preprocessor.py

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_processor = SKLearnProcessor(role=role,
                                     instance_type=processing_instance_type_param,
                                     instance_count=processing_instance_count_param,
                                     framework_version='0.20.0')

inputs = [ProcessingInput(input_name='raw_data', 
                          source=raw_data_path_param, destination='/opt/ml/processing/input')]

outputs = [ProcessingOutput(output_name='train_data', 
                            source='/opt/ml/processing/train', destination=train_data_path_param),
           ProcessingOutput(output_name='val_data', 
                            source='/opt/ml/processing/val', destination=val_data_path_param),
           ProcessingOutput(output_name='test_data', 
                            source='/opt/ml/processing/test', destination=test_data_path_param),
           ProcessingOutput(output_name='model', 
                            source='/opt/ml/processing/model', destination=model_path_param)]

code_path = '../03_feature_engineering/source_dir/preprocessor.py'

In [None]:
from sagemaker.workflow.steps import ProcessingStep

processing_step = ProcessingStep(
    name='Processing', 
    code=code_path,
    processor=sklearn_processor,
    inputs=inputs,
    outputs=outputs,
    job_arguments=['--train-test-split-ratio', train_test_split_ratio_param]
)

print(processing_step)

<h3>Training Step</h3>

Then, we create a training step, using the same estimator definition as seen in module <a href="../04_train_model/04_train_model.ipynb">04_train_model</a>.

In [None]:
!pygmentize ../04_train_model/source_dir/training.py

In [None]:
from sagemaker.xgboost import XGBoost

hyperparameters = {
    "max_depth": max_depth_param,
    "eta": eta_param,
    "gamma": gamma_param,
    "min_child_weight": min_child_weight_param,
    "silent": 0,
    "objective": objective_param,
    "num_round": num_round_param,
    "eval_metric": eval_metric_param
}

entry_point='training.py'
source_dir='../04_train_model/source_dir/'
code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix)

estimator = XGBoost(
    entry_point=entry_point,
    source_dir=source_dir,
    output_path=output_path_param,
    code_location=code_location,
    hyperparameters=hyperparameters,
    instance_type=training_instance_type_param,
    instance_count=training_instance_count_param,
    framework_version="0.90-2",
    py_version="py3",
    role=role
)

In [None]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

training_step = TrainingStep(
    name='Training',
    estimator=estimator,
    inputs={
        'train': TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
                'train_data'
            ].S3Output.S3Uri,
            content_type='text/csv'
        ),
        'validation': TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
                'val_data'
            ].S3Output.S3Uri,
            content_type='text/csv'
        )      
    }
)

print(training_step)

<h3>Register Model Step</h3>

Last step to define is the step for registering the serial inference pipeline model into the SageMaker Model Registry. We create a PipelineModel as seen in <a href="../04_deploy_model/04_deploy_model.ipynb">04_deploy_model</a> based on the SKLearn and XGBoost models, and then use it for the register model step.

<h4>Featurizer Model</h4>

In [None]:
import time
from sagemaker.sklearn import SKLearnModel

code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix)

sklearn_model = SKLearnModel(name='end-to-end-ml-sm-skl-model-{0}'.format(str(int(time.time()))),
                             model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri,
                             entry_point='inference.py',
                             source_dir='../05_deploy_model/sklearn_source_dir/',
                             code_location=code_location,
                             role=role,
                             sagemaker_session=sagemaker_session,
                             framework_version='0.20.0',
                             py_version='py3')

<h4>XGBoost Model</h4>

In [None]:
import time
from sagemaker.xgboost import XGBoostModel

code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix)

xgboost_model = XGBoostModel(name='end-to-end-ml-sm-xgb-model-{0}'.format(str(int(time.time()))),
                             model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
                             entry_point='inference.py',
                             source_dir='../05_deploy_model/xgboost_source_dir/',
                             code_location=code_location,
                             framework_version='0.90-2',
                             py_version='py3',
                             role=role, 
                             sagemaker_session=sagemaker_session)

<h4>Serial Inference Pipeline Model</h4>

In [None]:
import sagemaker
import time
from sagemaker.pipeline import PipelineModel

pipeline_model_name = 'end-to-end-ml-sm-xgb-skl-pipeline-{0}'.format(str(int(time.time())))

pipeline_model = PipelineModel(
    name=pipeline_model_name, 
    role=role,
    models=[
        sklearn_model, 
        xgboost_model],
    sagemaker_session=sagemaker_session)

<h4>Register Model Step</h4>

In [None]:
from sagemaker.workflow.step_collections import RegisterModel

model_package_group_name = 'end-to-end-ml-sm-model-package-group'

register_model_step = RegisterModel(
    name='RegisterModel',
    content_types=['text/csv'],
    response_types=['application/json', 'text/csv'],
    inference_instances=[deploy_instance_type_param],
    transform_instances=['ml.c5.4xlarge'],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status_param,
    model = pipeline_model
)

<h3>Pipeline</h3>

After all steps have been defined, we are now ready to create our model build workflow (SageMaker Pipeline).
The pipeline definition takes as input all parameters we have previously created, and the sequence of steps. In this example, the dependencies among the steps will be automatically computed based on the inputs and outputs of each step, but the service supports also setting them explicitly.

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = 'end-to-end-ml-sagemaker-pipeline'

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        raw_data_path_param,
        train_data_path_param,
        val_data_path_param,
        test_data_path_param,
        model_path_param,
        processing_instance_type_param,
        processing_instance_count_param,
        train_test_split_ratio_param,
        max_depth_param,
        eta_param,
        gamma_param,
        min_child_weight_param,
        objective_param,
        num_round_param,
        eval_metric_param,
        training_instance_type_param,
        training_instance_count_param,
        output_path_param,
        deploy_instance_type_param,
        model_approval_status_param
    ],
    steps=[processing_step, training_step, register_model_step],
    sagemaker_session=sagemaker_session,
)

We can also take a look at the JSON representation of the pipeline as follows:

In [None]:
import json
definition = json.loads(pipeline.definition())
definition

<h2>Insert and Execute the pipeline</h2>

Once the pipeline has been defined, we have to insert/update its definition on the service, and then we can start it, providing the parameters (for the ones not set, the default value will be used at run-time).

In [None]:
response = pipeline.upsert(role_arn=role)

pipeline_arn = response["PipelineArn"]
print(pipeline_arn)

In [None]:
from sagemaker.experiments.run import Run

run_name=f'pipeline-{time.strftime("%H-%M-%S", time.localtime())}'
run_display_name=run_name

with Run(
    experiment_name=experiment_name,
    run_name=run_name,
    run_display_name=run_display_name,
    sagemaker_session=sagemaker_session,
) as run:

    execution = pipeline.start(parameters={
        'train_test_split_ratio': '0.2'
    })
    print(execution.arn)

<h3>Wait for pipeline execution</h3>

In [None]:
%%time
execution.wait()

While waiting for pipeline execution to complete (it will take ~10mins), feel free to use the left side panel in SageMaker Studio to review the pipeline definition and execution status.

<h2>Approve model in the SageMaker Model Registry</h2>

When the pipeline has completed its execution, the model has been registered to the model registry with a PendingManualApproval status and we need to approve it before deployment.

First, we get the ARN (Amazon Resource Name) of the versioned model package (i.e. versioned model in the model registry).

In [None]:
steps = execution.list_steps()

register_model_step = next(s for s in steps if s['StepName'].startswith('RegisterModel') )

model_package_arn = register_model_step['Metadata']['RegisterModel']['Arn']
print(model_package_arn)

Let's describe the model package and check the InferenceSpecification property to make sure the serial inference pipeline of models has been set.

In [None]:
sm_client = boto3.client('sagemaker')

response = sm_client.describe_model_package(
    ModelPackageName=model_package_arn)

response

Finally, we can approve the model package.

In [None]:
sm_client.update_model_package(
    ModelPackageArn=model_package_arn,
    ModelApprovalStatus="Approved",
)

<h2>Deploy real-time endpoint from the model package in the registry</h2>

In order to deploy the model from the model registry, we can use the ModelPackage class of the SDK as follows:

In [None]:
from sagemaker.model import ModelPackage

model_package = ModelPackage(model_package_arn=model_package_arn,
                             role=role)

In [None]:
endpoint_name = 'end-to-end-ml-sm-pipeline-endpoint-{0}'.format(str(int(time.time())))
print(endpoint_name)

model_package.deploy(initial_instance_count=1, 
                     instance_type='ml.m5.2xlarge', 
                     endpoint_name=endpoint_name)

<h3>Execute inference</h3>

Let's execute some inferences to test our real-time endpoint.

In [None]:
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer
from sagemaker.predictor import Predictor

predictor = Predictor(
    endpoint_name=endpoint_name,
    sagemaker_session=sagemaker_session,
    serializer=CSVSerializer(),
    deserializer=JSONDeserializer())

payload = "L,298.4,308.2,1582,70.7,216"
print(predictor.predict(payload))

Finally, we can cleanup resources.

In [None]:
predictor.delete_endpoint()

## You have completed Module 8 and the workshop!

Well done! 

We hope you have seen the journey that started with experimentation in SageMaker Studio Notebook, through to preprocessing and training jobs, onto model deployment and inference, and ending with building a pipeline for running the preprocessing and building steps.

Please feel free to contiunue exploring SageMaker Studio environment and reading the notes in the notebooks you might have skipped when going through the workshop.

### Clean up

If you are attending an AWS-led event, there is no need for a clean up. However, if you have been using your AWS environment, please follow the [clean up steps](../cleanup/README.md) to avoid incurring charges.