# How to build an CV Training Pipeline using SageMaker Pipeline

1. [Introduction](#Introduction)
2. [Prerequisites](#Prerequisites)
3. [Setup](#Setup)
4. [Dataset](#Dataset)
5. [Build SageMaker Pipeline](#Build-SageMaker-Pipeline)
    1. [Bring Your Own Container (BYOC)](#Bring-Your-Own-Container-(BYOC))
    2. [Set Pipeline input parameters](#Set-Pipeline-input-parameters)
    3. [Define Cache Configuration](#Define-Cache-Configuration)
    4. [Preprocess data step](#Preprocess-data-step)
    5. [Training step](#Training-step)
    6. [Model evaluation step](#Model-evaluation-step)
    7. [Register model step](#Register-model-step)
    8. [Accuracy condition step](#Accuracy-condition-step)
    9. [Pipeline Creation](#Pipeline-Creation)
    10. [Submit and trig pipeline](#Submit-and-trig-pipeline)
    11. [Analyzing Results](#Analyzing-Results)
6. [Execute same pipeline in one continuous script](#Execute-same-pipeline-in-one-continuous-script)
7. [Build Custom Project Templates (Optional)](#Build-Custom-Project-Templates-(Optional))
    1. [Setup Service Catalog Portfolio](#Setup-Service-Catalog-Portfolio)
7. [Clean Up](#Clean-up)

# Introduction

This notebook demonstrate how to build a reusable computer vision (CV) pattern using **SageMaker Pipeline**. This particular pattern goes through preprocessing, training, and evaluating steps for 2 different training jobs:1) Spot training and 2) On Demand training.  If the accuracy meets certain requirements, the models are then registered with SageMaker Model Registry.

We have also tagged the training workloads: `TrainingType: Spot or OnDemand`.  If you are interested and have permission to access billing of your AWS account, you the can see the cost savings from spot training from the side-by-side comparison. To enable custom cost allocation tags, please follow this [AWS documentation](https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/activating-tags.html).  It takes 12-48 hrs for the new tag to show in your cost explore.

![Spot Training](statics/cost-explore.png)

SageMaker pipelines works on the concept of steps. The order steps are executed in is inferred from the dependencies each step have. If a step has a dependency on the output from a previous step, it's not executed until after that step has completed successfully. This also allows SageMaker to create a **Direct Acyclic Graph, DAG,** that can be visuallized in Amazon SageMaker Studio (see diagram below). The DAG can be used to track pipeline executions, inputs/outputs and metrics, giving user the full lineage of the model creation.

![Training Pipeline](statics/cv-training-pipeline-zoomed.png)

** Note: This Notebook was tested on Data Science Kernel in SageMaker Studio**

## Prerequisites

To run this notebook, you can simply execute each cell in order. To understand what's happening, you'll need:

- Access to the SageMaker default S3 bucket
- Access to Elastic Container Registry (ECR)
- For the optional portion of this lab, you will need access to CloudFormation, Service Catalog, and Cost Explorer
- Familiarity with Training on Amazon SageMaker
- Familiarity with Python
- Familiarity with AWS S3
- Basic understanding of CloudFormaton and concept of deploy infra as code
- Basic understanding of tagging and cost governance
- Basic familiarity with AWS Command Line Interface (CLI) -- ideally, you should have it set up with credentials to access the AWS account you're running this notebook from.
- SageMaker Studio is preferred for the full UI integration

## Setup

Here we define the sagemaker session, default bucket, job prefixes, pipeline and model group names.

In [None]:
!pip install -U sagemaker --quiet # Ensure latest version of SageMaker is installed

In [None]:
import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
account = sagemaker_session.account_id()
role = sagemaker.get_execution_role()

default_bucket = sagemaker_session.default_bucket() # or use your own custom bucket name
base_job_prefix = "cv-sagemaker-immersionday" # or define your own prefix

model_package_group_name = f"{base_job_prefix}-model-group"  # Model name in model registry
pipeline_name = f"{base_job_prefix}-pipeline"  # SageMaker Pipeline name

## Dataset

The dataset we are using is from [Caltech Birds (CUB 200 2011)](http://www.vision.caltech.edu/visipedia/CUB-200-2011.html). 

Here we are using the artifact from previous labs:

- S3 path to images

In [None]:
s3_raw_data = f's3://{default_bucket}/{base_job_prefix}/full/data'
print(s3_raw_data)

## Build SageMaker Pipeline

Capture the ECR URI here from [model_evaluation lab](../03_model_evaluation/model-evaluation-processing-job.ipynb), we may use it later on. 


In [None]:
container_name = "sagemaker-tf-container"
container_version = "2.0"
image_uri = "{}.dkr.ecr.{}.amazonaws.com/{}:{}".format(account, region, container_name, container_version)

print(f'image_uri: {image_uri}')

### Set Pipeline input parameters
Define Pipeline parameters that you can use to parametrize the pipeline. Parameters enable custom pipeline executions and schedules without having to modify the Pipeline definition.

The supported parameter types include:

* ParameterString - represents a str Python type
* ParameterInteger - represents an int Python type
* ParameterFloat - represents a float Python type

These parameters support providing a default value, which can be overridden on pipeline execution. The default value specified should be an instance of the type of the parameter.

![Parameter Input](statics/parameters-input-zoomed.png)

The pipeline that we create follows a typical Machine Learning Application pattern of pre-processing, training, evaluation, and model registration, as depicted in picture below.
    
![Pipeline](statics/pipeline.png)

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

# Parameters for pipeline execution
processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount", default_value=1
)

input_data = ParameterString(
    name="InputDataUrl",
    default_value=s3_raw_data
)

input_annotation = ParameterString(
    name="AnnotationFileName",
    default_value="classes.txt"
)

# This is a large dataset, we are only going to train a subset of the classes
class_selection = ParameterString(
    name="ClassSelection",
    default_value="13, 17, 35, 36, 47, 68, 73, 87" #If use the mini dataset, please make sure to use the class index with the available list
)

processing_instance_type = "ml.m5.xlarge"
training_instance_count = 1
training_instance_type = "ml.c5.4xlarge"

### Define Cache Configuration
When step cache is defined, before SageMaker Pipelines executes a step, it attempts to find a previous execution of a step that was called with the same arguments.

Pipelines doesn't check whether the actual data or code that the arguments point to has changed. If a previous execution is found, Pipelines will propagates the values from the cache hit during execution, rather than recomputing the step.

Step caching is available for the following step types:

* Training
* Tuning
* Processing
* Transform

In [None]:
from sagemaker.workflow.steps import CacheConfig

## By enabling cache, if you run this pipeline again, without changing the input 
## parameters it will skip the training part and reuse the previous trained model
cache_config = CacheConfig(enable_caching=True, expire_after="30d")

### Preprocess data step
We are taking the original code in Jupyter notebook and create a containerized script to run in a preprocessing job.

The [preprocess.py](./preprocess.py) script takes in the raw images files and splits them into training, validation and test sets by class.
It merges the class annotation files so that you have a manifest file for each separate data set. And exposes two parameters: classes (allows you to filter the number of classes you want to train the model on; default is all classes) and input-data (the human readable name of the classes).

We are going to use **SKLearnProcessor** to process the data. For more detail on different type of processing jobs, please refer to the amazon documentation [here](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html).

![Pipeline](statics/pipeline-1.png)

In [None]:
from sagemaker.workflow.steps import ProcessingStep

from sagemaker.sklearn.processing import SKLearnProcessor

from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
)
import uuid

# SKlearnProcessor for preprocessing

sklearn_processor = SKLearnProcessor(base_job_name = f"{base_job_prefix}-preprocess",  # choose any name
                                    framework_version='0.20.0',
                                    role=role,
                                    instance_type=processing_instance_type,
                                    instance_count=processing_instance_count)

output_s3_uri = f's3://{default_bucket}/{base_job_prefix}/outputs/pipelines/{uuid.uuid4()}'

step_process = ProcessingStep(
    name="BirdClassificationPreProcess",  # choose any name
    processor=sklearn_processor,
    code="preprocess.py",
    job_arguments=["--classes", class_selection,
                "--input-data", input_annotation],
    inputs=[ProcessingInput(source=input_data, 
            destination="/opt/ml/processing/input")],
    outputs=[
        ProcessingOutput(output_name='train_data', 
                         source="/opt/ml/processing/output/train", 
                         destination = output_s3_uri +'/train'),
        ProcessingOutput(output_name='val_data',
                         source="/opt/ml/processing/output/validation", 
                         destination = output_s3_uri +'/validation'),
        ProcessingOutput(output_name='test_data',
                         source="/opt/ml/processing/output/test", 
                         destination = output_s3_uri +'/test'),
        ProcessingOutput(output_name='manifest',
                         source="/opt/ml/processing/output/manifest", 
                         destination = output_s3_uri +'/manifest'),
    ],
    cache_config=cache_config
    )

### Training step
We are using SageMaker's TensorFlow container, the custom TensorFlow training code is provided via a Python script in a separate file that gets passed to SageMaker ([train-mobilenet.py](./code/train-mobilenet.py)).

Our Pipeline experiments with 2 training jobs, Spot and On Demand, side-by-side.  Each workload is tagged using 'TrainingType'.  It you have the permission, you can enable the User defined tag in Cost Explorer and compare the cost difference between spot and on demand training.  [Here](https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/activating-tags.html) is how to enable user-defined tags.

![Pipeline](statics/pipeline-2.png)

In [None]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.tensorflow import TensorFlow

TF_FRAMEWORK_VERSION = '2.4.1'

hyperparameters = {'initial_epochs':     5,
                   'batch_size':         8,
                   'fine_tuning_epochs': 20, 
                   'dropout':            0.4,
                   'data_dir':           '/opt/ml/input/data'}

metric_definitions = [{'Name': 'loss',      'Regex': 'loss: ([0-9\\.]+)'},
                  {'Name': 'acc',       'Regex': 'accuracy: ([0-9\\.]+)'},
                  {'Name': 'val_loss',  'Regex': 'val_loss: ([0-9\\.]+)'},
                  {'Name': 'val_acc',   'Regex': 'val_accuracy: ([0-9\\.]+)'}]

if training_instance_count > 1:
    distribution = {'parameter_server': {'enabled': True}}
    DISTRIBUTION_MODE = 'ShardedByS3Key'
else:
    distribution = {'parameter_server': {'enabled': False}}
    DISTRIBUTION_MODE = 'FullyReplicated'
    
train_in = TrainingInput(s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri,
                         distribution=DISTRIBUTION_MODE)
test_in  = TrainingInput(s3_data=step_process.properties.ProcessingOutputConfig.Outputs["test_data"].S3Output.S3Uri,
                         distribution=DISTRIBUTION_MODE)
val_in   = TrainingInput(s3_data=step_process.properties.ProcessingOutputConfig.Outputs["val_data"].S3Output.S3Uri,
                         distribution=DISTRIBUTION_MODE)

inputs = {'train':train_in, 'test': test_in, 'validation': val_in}

In [None]:
training_steps = dict()

training_estimators = dict()
# get out model artifacts location
models = dict()

training_options = ['Spot', 'OnDemand']

for t in training_options:
    tags = dict()
    tags['Key'] = 'TrainingType'
    tags['Value'] = t
        # Training step for generating model artifacts
    model_path = f"{output_s3_uri}/models"
    checkpoint_s3_uri = f"{output_s3_uri}/outputcheckpoints"
    
    if t.lower() == 'spot':
        estimator = TensorFlow(entry_point='train-mobilenet.py',
                               source_dir='code',
                               output_path=model_path,
                               instance_type=training_instance_type,
                               instance_count=training_instance_count,
                               distribution=distribution,
                               hyperparameters=hyperparameters,
                               metric_definitions=metric_definitions,
                               role=role,
                               use_spot_instances=True,
                               max_run=60*60*10,
                               max_wait=60*60*12, # Seconds to wait for spot instances to become available
                               checkpoint_s3_uri=checkpoint_s3_uri,
                               framework_version=TF_FRAMEWORK_VERSION, 
                               py_version='py37',
                               base_job_name=base_job_prefix,
                               script_mode=True,
                               tags=[tags])
    else:
        estimator = TensorFlow(entry_point='train-mobilenet.py',
                       source_dir='code',
                       output_path=model_path,
                       instance_type=training_instance_type,
                       instance_count=training_instance_count,
                       distribution=distribution,
                       hyperparameters=hyperparameters,
                       metric_definitions=metric_definitions,
                       role=role,
                       framework_version=TF_FRAMEWORK_VERSION, 
                       py_version='py37',
                       base_job_name=base_job_prefix,
                       script_mode=True,
                       tags=[tags])
        
    step_train = TrainingStep(
        name=f"BirdClassification{t}Train",
        estimator=estimator,
        inputs=inputs,
        cache_config=cache_config
    )
    
    training_steps[t] = step_train
    training_estimators[t] = estimator
    models[t] = step_train.properties.ModelArtifacts.S3ModelArtifacts

### Model evaluation step
We are going to use a ProcessingStep for our model evaluation, and we are going to use our own container from the earlier step.

[evaluation.py](./evaluation.py) script  does the following:
1. Load the tf model 
2. Run prediction
3. Compare predicts vs actuals and generate the confussion matrix

![Pipeline](statics/pipeline-3.png)


When you register this model to the model registery, metrics generated from this step will be attached to the model version and can be visualized in SageMaker Studio like this:

![Confusion Matrix](statics/confussion_matrix.png)

Here are more details on the list of metric available for each type of ML problems: [AWS Documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html)

In [None]:
# from sagemaker.processing import ScriptProcessor
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.pipeline_context import PipelineSession

from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    ScriptProcessor,
)

eval_steps = dict()
eval_reports = dict()

pipeline_session = PipelineSession()

for t in training_options:
    
    script_eval = ScriptProcessor(
        base_job_name = f"{base_job_prefix}-evaluation",
        command=['python3'],
        image_uri=image_uri,
        role=role,
        instance_count=processing_instance_count,
        instance_type=processing_instance_type,
        sagemaker_session = pipeline_session)
        
    step_args = script_eval.run(
        code='evaluation.py',
        arguments=["--model-file", "model.tar.gz"],
        inputs=[ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs["test_data"].S3Output.S3Uri, 
                                destination="/opt/ml/processing/input/test"),
                ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs["manifest"].S3Output.S3Uri, 
                                destination="/opt/ml/processing/input/manifest"),
                ProcessingInput(source=models[t], 
                                destination="/opt/ml/processing/model"),
               ],
        outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
        ]
    )
    
    evaluation_report = PropertyFile(
        name=f"Evaluation{t}Report",
        output_name="evaluation",
        path="evaluation.json",
    )
    
    step_eval = ProcessingStep(
        name=f"BirdClassification{t}Eval",
        step_args = step_args,
        property_files=[evaluation_report],
        cache_config=cache_config
    )

    eval_steps[t] = step_eval
    eval_reports[t] = evaluation_report

### Register model step
In this step, the resulting model artifacts is register as a model package in a model package group. 

A model package is a reusable model artifacts abstraction that packages all ingredients required for inference. It also captures the metrics from the evaluation step for future comparison.

A model package group is a collection of model packages, usually different model versions.  It also enables the user to compare metric accross different models.  

Specifically, pass in the S3ModelArtifacts from the TrainingStep, step_train properties. The TrainingStep properties attribute matches the object model of the DescribeTrainingJob response object.

![Pipeline](statics/pipeline-5.png)

In [None]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel

model_register_steps = dict()

for t in training_options:
    # Create ModelMetrics object using the evaluation report from the evaluation step
    # A ModelMetrics object contains metrics captured from a model.
    model_metrics = ModelMetrics(
        model_statistics=MetricsSource(
            s3_uri="{}/evaluation.json".format(
                eval_steps[t].arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"][
                    "S3Uri"
                ]
            ),
            content_type="application/json",
        )
    )
    
    # Crete a RegisterModel step, which registers the model with Sagemaker Model Registry.
    step_register = RegisterModel(
        name=f"Register{t}Model",
        estimator=training_estimators[t],
        model_data=models[t],
        content_types=["application/x-image"],
        response_types=["application/json"],
        inference_instances=["ml.t2.medium", "ml.m5.large"],
        transform_instances=["ml.m5.large"],
        model_package_group_name=model_package_group_name,
        model_metrics=model_metrics,
    )
    
    model_register_steps[t] = step_register

### Accuracy condition step
This condition step only allows the model to be registered if the accuracy of the model, as determined by the evaluation step step_eval, exceeded a specified value. A ConditionStep enables pipelines to support conditional execution in the pipeline DAG based on the conditions of the step properties.

![Pipeline](statics/pipeline-4.png)

In [None]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

condition_steps = dict()

for t in training_options:
    
    # Create accuracy condition to ensure the model meets performance requirements.
    # Models with a test accuracy lower than the condition will not be registered with the model registry.
    cond_gte = ConditionGreaterThanOrEqualTo(
        left=JsonGet(
            step_name=eval_steps[t].name,
            property_file=eval_reports[t],
            json_path="multiclass_classification_metrics.accuracy.value",
        ),
        right=0.7,
    )

    # Create a Sagemaker Pipelines ConditionStep, using the condition above.
    # Enter the steps to perform if the condition returns True / False.
    step_cond = ConditionStep(
        name=f"BirdClassification{t}Condition",
        conditions=[cond_gte],
        if_steps=[model_register_steps[t]],
        else_steps=[],
    )
    
    condition_steps[t] = step_cond

### Pipeline Creation

Last step is to combine all the previous steps into a Pipeline so it can be executed.

A pipeline requires a name, parameters, and steps. Names must be unique within an (account, region) pair.

In [None]:
from sagemaker.workflow.pipeline import Pipeline


# Create a Sagemaker Pipeline.
# Each parameter for the pipeline must be set as a parameter explicitly when the pipeline is created.

# build the steps
steps = [step_process]
for t in training_steps:
    steps.append(training_steps[t])
    
for e in eval_steps:
    steps.append(eval_steps[e])
    
for c in condition_steps:
    steps.append(condition_steps[c])

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        input_data,
        input_annotation,
        class_selection
    ],
    steps=steps,
    sagemaker_session=sagemaker_session,
)

### Submit and trig pipeline
Submit the pipeline definition to the Pipeline service. The role passed in will be used by the Pipeline service to create all the jobs defined in the steps.

Once a pipeline has been submited (pipeline.upsert()), user can trigger the pipeline using the API (pipeline.start()) or through the SageMaker Studo UI:

![Pipeline UI Trigger](statics/studio-ui-pipeline.png)

![Pipeline Code Trigger](statics/execute-pipeline.png)

In [None]:
# Submit pipline
pipeline.upsert(role_arn=role)

# Execute pipeline using the default parameters.
execution = pipeline.start()

### Analyzing Results
You can compre different version of model by selecting multiple versions and right-click -> Compare model versions.  If you have visuallizations, you graph may overlap depending on how complete your use case is.

![Model Comparison](statics/compare-model.png)

## Execute same pipeline in one continuous script
To operationalize this pipeline, we also provide this code in a continuous script.  Please review this [pipeline.py](./pipeline.py) file.  Here is the code to execute the script to build and run a pipeline.

In [None]:
import boto3
import sagemaker
import time
import json

sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()

default_bucket = sagemaker_session.default_bucket() # or use your own custom bucket name
base_job_prefix = "cv-sagemaker-immersionday" # or define your own prefix

model_package_group_name2 = f"{base_job_prefix}-model-group2"  # Model name in model registry
pipeline_name2 = f"{base_job_prefix}-pipeline2"  # SageMaker Pipeline name

This is how to load the pipeline definition.

In [None]:
from pipeline import get_pipeline

pipeline2 = get_pipeline(
    region=region,
    role=role,
    default_bucket=default_bucket,
    model_package_group_name=model_package_group_name2,
    pipeline_name=pipeline_name2,
    base_job_prefix=base_job_prefix,
    container_name=container_name,
    container_version=container_version
)

This is how to submit/update the pipeline.

In [None]:
pipeline2.upsert(role_arn=role)

This is how to run the pipeline and overwrite the parameters.

In [None]:
execution = pipeline2.start(
    parameters=dict(
        InputDataUrl=s3_raw_data, # loaction of the raw data
        ProcessingInstanceCount=1,
#         ProcessingInstanceType="ml.m5.xlarge",
#         TrainingInstanceCount=1,
#         TrainingInstanceType="ml.c5.4xlarge",#"ml.p3.2xlarge",#
        AnnotationFileName="classes.txt",
        ClassSelection="13, 17, 35, 36"
    )
)

## Manual approval for deployment

---
After you create a model version, you typically want to evaluate its performance before you deploy it. So the pipeline default the approval status to `PendingManualApproval`. You can manually update or update using API to change the status to Approved or Rejected.  Here is how you manually update from SageMaker studio UI:

![Manual Approval](statics/manual_approval.png)

Amazon EventBridge monitors status change events in Amazon SageMaker. EventBridge enables you to automate SageMaker and respond automatically to events such as a training job status change, endpoint status change, or **Model package state change**.

Please reference [AWS Documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/automating-sagemaker-with-eventbridge.html#eventbridge-model-package) documentation for the entire event of Model package state change.

To automate the deployment process, You can use event bridge to Invoke a **deployment Lambda function** that checks the `ModelApprovalStatus` attribute in the event. If the status is **Approved** the Lambda will continue with the deployement.