# Orchestrating Model Build and Registration with SageMaker Pipelines

> *This notebook is designed to work with the `Python 3 (Data Science)` kernel on SageMaker Studio.*

Amazon SageMaker offers Machine Learning application developers and Machine Learning operations engineers the ability to orchestrate SageMaker jobs and author reproducible Machine Learning pipelines, deploy custom-build models for inference in real-time with low latency or offline inferences with Batch Transform, and track lineage of artifacts. You can institute sound operational practices in deploying and monitoring production workflows, deployment of model artifacts, and track artifact lineage through a simple interface, adhering to safety and best-practice paradigmsfor Machine Learning application development.

The [SageMaker Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) service supports a SageMaker Machine Learning Pipeline Domain Specific Language (DSL), which is a declarative Json specification. This DSL defines a Directed Acyclic Graph (DAG) of pipeline parameters and SageMaker job steps. The [SageMaker Python Software Developer Kit (SDK)](https://sagemaker.readthedocs.io/en/stable/) streamlines the generation of the pipeline DSL using constructs that are already familiar to engineers and scientists alike.

The [SageMaker Model Registry](https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry.html) is where trained models are stored, versioned, and managed. Data Scientists and Machine Learning Engineers can compare model versions, approve models for deployment, and deploy models from different AWS accounts, all from a single Model Registry. SageMaker enables customers to follow the best practices with ML Ops and getting started right. Customers are able to standup a full ML Ops end-to-end system with a single API call.

## SageMaker Pipelines

A SageMaker Pipeline defines a Directed Acyclic Graph (DAG) of steps and conditions to orchestrate SageMaker jobs and resource creation - supporting a [broad range of activities](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html) including:

* **Processing Job steps** - A simplified, managed experience on SageMaker to run data processing workloads, such as feature engineering, data validation, model evaluation, and model interpretation.
* **Training Job steps** - An iterative process that teaches a model to make predictions by presenting examples from a training dataset.
* **Registering Models** - Creates a model package resource in the Model Registry that can be used to create deployable models in Amazon SageMaker.
* **Creating Model steps** - Create a model for use in transform steps or later publication as an endpoint.
* **Transform Job steps** - A batch transform to preprocess datasets to remove noise or bias that interferes with training or inference from your dataset, get inferences from large datasets, and run inference when you don't need a persistent endpoint.
* **Conditional step execution** - Provides conditional execution of branches in a pipeline.
* **Parameterized Pipeline executions** - Allows pipeline executions to vary by supplied parameters.

The pipeline that we'll create in this example follows a typical Machine Learning Application pattern of pre-processing, training, evaluation, and conditional model registration and publication, if the quality of the model is sufficient.

![](imgs/sm-pipelines.png "A typical ML Application pipeline")

## Setup

First, let's import the various Python libraries we'll use in the exercise:

In [None]:
# Python Built-Ins:
import os

# External Dependencies:
import boto3  # The general-purpose AWS SDK for Python
import sagemaker  # High-level Python SDK for Amazon SageMaker

print(f"sagemaker SDK v{sagemaker.__version__}")

Next, we can connect to AWS services and **configure**:

- The [Amazon S3 bucket](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html) and prefix to use for storing data and artifacts.
- The [IAM role](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-roles.html) ARN to be used for accessing data and other resources.
- Some additional **names and prefixes** for the pipeline and model package resources the notebook will create.

In [None]:
sm_session = sagemaker.Session()
pipeline_session = sagemaker.workflow.pipeline_context.PipelineSession()
s3 = boto3.resource("s3")  # Amazon S3

bucket = sm_session.default_bucket()
prefix = "sagemaker/DEMO-pipelines-churn"
print(f"Saving S3 data to: s3://{bucket}/{prefix}")

base_job_prefix="CustomerChurn"
model_package_group_name = "ChurnModelPackageGroup-v1"
pipeline_name = "ChurnPipeline-v1"

role = sagemaker.get_execution_role()
print(f"Using IAM role: {role}")

### 1. Gather the data

This example will use a synthetic customer churn dataset from the public `sagemaker-sample-files` bucket: The same as the previous Autopilot notebook. In a real scenario you would access your own data, typically on your own S3 bucket. 

In [None]:
# Copy the data from s3 to our local folder
!aws s3 cp s3://sagemaker-sample-files/datasets/tabular/synthetic/churn.txt ./data/churn.txt

# Copy the file to our own S3 bucket with a csv extension
raw_data_key = f"{prefix}/data/RawData.csv"
s3.Bucket(bucket).Object(raw_data_key).upload_file("./data/churn.txt")
raw_data_s3uri = f"s3://{bucket}/{raw_data_key}"

print(f"\nRaw data loaded to:\n{raw_data_s3uri}")

### 2. Processing step for feature engineering

For our use case we keep the actual pre-processing code within a separate python file [preprocess.py](preprocess.py). This script will drop irrelevant columns, encode categorical values, create a target value and split the data between test, train and validation.

> ⚠️ **Note:** Below we use these *Pipeline parameter objects* in place of actual values - for example:
>
> 1. `processing_instance_type`
> 1. `processing_instance_count`, and
> 1. `input_data`

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import CacheConfig, ProcessingStep

# 1. parameters for the pre-processing stage
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

processing_instance_count = ParameterInteger( name="ProcessingInstanceCount", default_value=1,)
processing_instance_type = ParameterString( name="ProcessingInstanceType", default_value="ml.m5.xlarge",)
input_data = ParameterString( name="InputDataUrl", default_value=raw_data_s3uri, )

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type=processing_instance_type,  # (Pipeline parameter)
    instance_count=processing_instance_count,  # (Pipeline parameter)
    # base_job_name=f"{base_job_prefix}/sklearn-CustomerChurn-preprocess",
    sagemaker_session=pipeline_session,
    role=role,
)

step_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(
            input_name="raw",
            source=input_data,  # (Pipeline parameter)
            destination="/opt/ml/processing/input/raw",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="./preprocess.py",
)

step_process = ProcessingStep(
    name="CustomerChurnProcess",
    step_args=step_args,
    cache_config=CacheConfig(enable_caching=True, expire_after="T2H"),
)


### 3. Training step for generating model artifacts

On this step we will define [XGBoost](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html) as our algorithm of choice. This example uses **both**:

- **Pipeline input parameters** which can be customized at execution time (`training_instance_type`, `training_max_depth`), and
- **Dependencies** from the previous processing step (`step_process.properties.{***}`)

In [None]:
from sagemaker.workflow.steps import TrainingStep
from sagemaker.inputs import TrainingInput
from sagemaker.estimator import Estimator

# 2. parameters for the training stage. No need to import ParameterString since it was imported earlier.
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge",)
training_max_depth = ParameterString(name="TrainingMaxDepth", default_value="5",)        

model_path = f"s3://{bucket}/{base_job_prefix}/CustomerChurnTrain"

image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",  # we are using the Sagemaker built in xgboost algorithm
    region=sm_session.boto_session.region_name,  # e.g. 'us-east-1'
    version="1.3-1",
    py_version="py3",
    instance_type=training_instance_type,  # (Pipeline parameter)
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,  # (Pipeline parameter)
    instance_count=1,
    output_path=model_path,
    base_job_name=f"{base_job_prefix}/CustomerChurn-train",
    sagemaker_session=pipeline_session,
    role=role,
)
xgb_train.set_hyperparameters(
    objective="binary:logistic",
    num_round=50,
    max_depth=training_max_depth,  # (Pipeline parameter)
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

step_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            # Use an output from one step as an input (dependency) in another:
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            # Use an output from one step as an input (dependency) in another:
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)
step_train = TrainingStep(
    name="CustomerChurnTrain",
    step_args=step_args,
)


### 4. Processing step for evaluation

This step uses one parameter which we can customize at execution time (`processing_instance_type`), and again references outputs from other steps.

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.properties import PropertyFile

# processing_instance_type parameter was already defined in stage 2 but it is reused here

script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,  # (Pipeline parameter)
    instance_count=1,
    base_job_name=f"{base_job_prefix}/script-CustomerChurn-eval",
    sagemaker_session=pipeline_session,
    role=role,
)
evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)
step_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[ ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"), ],
    code="./evaluate.py",
)

step_eval = ProcessingStep(
    name="CustomerChurnEval",
    step_args=step_args,
    property_files=[evaluation_report],
)


### 5. Register model step that will be conditionally executed

Pipelines allow us to have conditional steps, to register models into our registry and optionally to publish these to staging. A common usage is to check a minimum threshold (using F1 in our case) and only publish into our registry if it is higher than a given threshold (0.8 in our case). 

This step uses one parameter which we can customize at execution time: 
`model_approval_status`

> ▶️ **Question:** *Which other variables could we add from this section as parameter for the pipeline?*

In [None]:
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker import Model
from sagemaker.workflow.model_step import ModelStep

# Parameter for the model register stage
# (ModelApprovalStatus can be set to a default of "Approved" if you don't want manual approval)
model_approval_status = ParameterString( name="ModelApprovalStatus", default_value="PendingManualApproval",)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

model = Model(
    image_uri=xgb_train.image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)


# Register model step that will be conditionally executed
register_args = model.register(
    # estimator=xgb_train,
    # model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(
    name="CustomerChurnRegisterModel",
    step_args=register_args,
)

# Condition step for evaluating model quality and branching execution
cond_register = ConditionGreaterThanOrEqualTo(  # You can change the condition here
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        # This should follow the structure of your report_dict defined in the evaluate.py file:
        json_path="binary_classification_metrics.f1.value",
    ),
    right=0.8,  # TODO: add as a pipeline parameter
)    

step_cond = ConditionStep(
    name="CustomerChurnAccuracyCond",
    conditions=[cond_register],
    if_steps=[step_register],
    else_steps=[],
)

### 6. Construct Pipeline 

With the steps and pipeline-level parameters all defined, we're ready to create the overall [Pipeline](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#pipeline) and register it with SageMaker.

> ⚠️ **Remember:** if you added any additional parameters, you will need to add them to this pipeline definition as well.

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_max_depth,            
        training_instance_type,
        model_approval_status,
        input_data,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=sm_session,
)

The pipeline definition is not actually submitted to the SageMaker service until we `upsert()` it. The role passed in here with the definition will be used by the workflow service to create all the jobs defined in the steps:

In [None]:
pipeline.upsert(role_arn=role)

After `upsert()`ing your pipeline, it should be registered with the SageMaker APIs and ready to use. For example, you should see the pipeline available in the SageMaker Studio UI through the *Resources > Pipelines* sidebar menu:

![](imgs/sm-pipeline-registered.png "Screenshot of SageMaker Studio UI showing registered churn pipeline")

### 7. Run the Pipeline

Before we start experimenting with the execution parameters and UI though, let's see how to start and check the pipeline with default parameters from code:

In [None]:
execution = pipeline.start()

### 8. Pipeline Operations: examining and waiting for pipeline execution

Starting a pipeline creates an **execution** instance, which can be queried to list the steps in the execution and find out more about its progress:

In [None]:
execution.describe()

We can wait for the execution by invoking `wait()` on the execution:

In [None]:
execution.wait()

We can list the execution steps to check out the status and artifacts:

In [None]:
execution.list_steps()

### 9. Parameterized Executions

We can run additional executions of the pipeline specifying different pipeline parameters. The parameters argument is a dictionary whose names are the parameter names, and whose values are the primitive values to use as overrides of the defaults.

Of particular note, based on the performance of the model, we may want to kick off another pipeline execution, but this time on a compute-optimized instance type and set the model approval status automatically be "Approved". This means that the model package version generated by the `RegisterModel` step will automatically be ready for deployment through CI/CD pipelines, such as with SageMaker Projects.

In [None]:
execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceType="ml.c5.xlarge",
        ModelApprovalStatus="Approved",
        TrainingMaxDepth="6",
    )
)

In [None]:
execution.wait()

## Exercises: Pipelines

▶️ **Run** the previous execution from the **UI in Sagemaker Studio** instead of using a Notebook.

▶️ **Add** the minimum F1 score for model registration as a **parameter** to the pipeline, instead of the hard-coded `0.8` value used above. Check you can update and run your pipeline successfully to override the threshold!

## Exploring and Comparing Experiments in SageMaker Studio

So far we've seen that SageMaker Pipelines are useful for automating and orchestrating multi-step machine learning workflows - but what tools are available for exploring the results of these different experiments, and preparing for production deployment?

Two particularly relevant features here are:

- [SageMaker Model Registry](https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry.html) - which supports **approval workflows** tracking **model versions** with additional metrics and metadata (such as for model quality, bias, and data drift)
- [SageMaker Experiments](https://docs.aws.amazon.com/sagemaker/latest/dg/experiments.html) - through which we can organize, explore details of, and compare, past jobs through the Studio UI.

Like the Pipelines UI highlighted earlier, these tools and others are available through the "SageMaker Resources" tab in SageMaker Studio:

![](imgs/sm-resources-menu.png "SageMaker Studio screenshot showing resources sidebar menu")

### Viewing and Comparing Metrics in Model Registry

Our example pipeline should have created a "Model Package Group" already visible in the model registry (Per `model_package_group_name` above).

If you've run the pipeline with the original and modified `TrainingMaxDepth` parameters, you should see at least 2 successful executions... And with these parameters, both runs should have exceeded the `CustomerChurnAccuracyCond` threshold for being added to the model registry.

▶️ **Shift-click** to select *two or more* Model Versions in the Model Registry screen, and then **Right-click** to see the *Compare model versions* option

![](imgs/sm-registry-compare.png "SM Model registry with multiple model versions selected and right-click menu open")

For models like the ones in our example which are [registered](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateModelPackage.html) with [Model Quality Metrics](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html), you'll be able to see a comparison of model metrics - and even charts like [confusion matrices](https://en.wikipedia.org/wiki/Confusion_matrix) and [ROC curves](https://en.wikipedia.org/wiki/Receiver_operating_characteristic) if these are present in the model metadata.

▶️ **Which** of your model versions seemed to perform best by the different metrics? Was there much difference?

You can also update model versions' **approval status** through the Model Registry UI, which can be used to trigger deployment of your models to different environments.

### Exploring Trial Components in SageMaker Experiments

In SageMaker Experiments:

- A **`Trial Component`** corresponds to a particular job or stage - for example a previously executed training or pre-processing job.
- A **`Trial`** is a collection of multiple Trial Components in one end-to-end attempt (for example, pre-processing plus model training plus evaluation)
- An **`Experiment`** is a collection of different, comparable, *Trials* which attempted to tackle the same problem.

These map automatically to SageMaker pipeline definitions: A particular pipeline execution automatically creates a **`Trial`**, and a defined pipeline is an overall **`Experiment`**. Similarly, you'll see that SageMaker Autopilot automatically generates Experiments as it runs. You can also manually organize SageMaker jobs you create into your own Trials and Experiments.

In the SageMaker Experiments UI, double-clicking drills down through this Experiment > Trial > Component hierarchy - and at the top level you'll also see `Unassigned trial components`, where any training jobs (or processing, etc) that haven't yet been tagged to Experiments are available.

![](imgs/sm-execution-trial-components.png "Trial components list for a specific pipeline execution")

> ⚠️ **Different right-click options and double-click behaviours** are available at different points in the hierarchy, and for different job types.
>
> For example:
>
> - Right-clicking on an individual training job will show `Open Debugger for insights` as well as `Open in trial details`.
> - Right-clicking on an Autopilot Experiment will show `Describe AutoML Job` in addition to `Open in trial component list`.

▶️ **Find** A SageMaker Training Job in your existing experiments and **double-click** on it to open the job details: You should see customizable charts as well as metrics and other job metadata.

▶️ **Right-click** Your Training Job in Experiments to `Open Debugger for insights`. Can you find performance recommendations for your training job, and detailed resource utilization metrics per-node in the job?

## Comparing Jobs in SageMaker Experiments

Drilling in to individual Trial Components is fine for exploring particular training jobs, but can we compare metrics and charts across multiple jobs at once? Yes you can!

▶️ **Right-click** on your pipeline's top-level **Experiment** and select `Open in trial component list` to open the *list of multiple trial components* run by the pipeline

(You can also do this at Trial level in the hierarchy, but typically you'll want to be comparing across trials, rather than jobs within one trial)

![](imgs/sm-pipeline-tcs-list.png "Trial Components list for the demo pipeline, showing multiple training jobs")

▶️ **Shift-click** to select multiple *training job* components, and then click "Add Chart" to build a composite chart including the multiple training jobs.


▶️ **Configure** your chart via the right sidebar menu, to set up:

- `Summary statistics` from each training job (not time series)
- `Scatter plot` chart type
- `max_depth` X axis
- `validation:logloss_last` Y axis
- `trialComponentName` Color

You should be able to set up a comparative scatter plot illustrating the impact of max_depth on the final validation loss (lower = better) in model training, similar to the chart shown below.

![](imgs/sm-tcs-comparison-chart.png "Screenshot of scatter chart comparing multiple training jobs")

## Next Steps

In this section we've just scratched the surface of the tools available in SageMaker Studio to help manage your experiments, model versions, and model building pipelines.

The metrics you report during training jobs and tag against your Model Registry model versions enable these features. You can add additional metrics and hyperparameters during model training, and of course define additional steps and parameters for your model building pipelines.

You can find lots more information about all these topics, and additional MLOps features like [SageMaker Project Templates](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-projects.html), in the [SageMaker Developer Guide](https://docs.aws.amazon.com/sagemaker/latest/dg/whatis.html).