<sup> Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. </sup>
<sup> SPDX-License-Identifier: MIT-0 </sup>

# Robust Time-Series Forecasting with MLOps

The purpose of this notebook is to create a reusable resource that educates users about the Temporal Convolutional Network with Spliced Binned Pareto Distribution and demonstrate various SageMaker MLOps features such as SageMaker Pipelines, Model Registry, and Experiments.

**Jupyter Kernel**:
* Please ensure you are using the **Python 3 (Pytorch 1.13 Python 3.9 CPU Optimized)** kernel

**Run All**: 

* If you are in a SageMaker Notebook instance, you can *go to Cell tab -> Run All*
* If you are in SageMaker Studio, you can *go to Run tab -> Run All Cells*

**Overview**

* [Phase I: Model Introduction](#phase_1)
* [Phase II: Training Pipeline](#phase_2)
* [Phase III: Model Performance Analysis](#phase_3)
* [Phase IV: Model Serving](#phase_4)


**Authors:**
* Nick Biso
* Alston Chan
* Maria Masood

In [None]:
!pip install gluonts==0.11.12

In [None]:
from IPython.display import display

import boto3
import sagemaker

import ast
import time
import json
from io import BytesIO

from sagemaker import image_uris
from sagemaker import ModelPackage
from sagemaker.analytics import ExperimentAnalytics
from sagemaker.deserializers import JSONDeserializer
from sagemaker.inputs import TrainingInput
from sagemaker.predictor import Predictor
from sagemaker.processing import  ProcessingInput, ProcessingOutput
from sagemaker.pytorch import PyTorchModel
from sagemaker.pytorch import PyTorch, PyTorchProcessor
from sagemaker.serializers import JSONSerializer
from sagemaker.tuner import IntegerParameter, ContinuousParameter, HyperparameterTuner
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.functions import Join, JsonGet
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.workflow.steps import ProcessingStep, CacheConfig, TuningStep
from sagemaker.workflow.properties import PropertyFile
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from smexperiments.search_expression import Filter, Operator, SearchExpression

import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import matplotlib.image as mpimg

np.random.seed(0)

from gluonts.nursery.spliced_binned_pareto.training_functions import highlight_min
from processing.generate_data import create_ds_asymmetric
from model.utils import plot_sbp_distribution

pd.set_option('display.max_columns', None)

In [None]:
account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.Session().region_name
sm_client = boto3.client("sagemaker")
s3_client = boto3.resource('s3')

sagemaker_session = sagemaker.Session()
bucket_name = sagemaker_session.default_bucket()
role = sagemaker.get_execution_role()

s3_resource = boto3.resource('s3', region_name=region)
s3_bucket = s3_resource.Bucket(bucket_name)

print(f"account_id: {account_id}")
print(f"region: {region}")
print(f"bucket_name: {bucket_name}")
print(f"role: {role}")

<a id='phase_1'></a>
## Phase I: Model Introduction

For this exercise, we utilize a [DistributionalTCN with Spliced Binned-Pareto distribution](https://www.amazon.science/publications/spliced-binned-pareto-distribution-for-robust-modeling-of-heavy-tailed-time-series). This model is specifically designed for time-series forecasting and offers robustness against extreme observations in addition to capturing the main distribution.

The architecture of the model consists of convolutional layers responsible for extracting temporal features. These are followed by fully connected layer(s) and finally a distributional layer which represents the probability distribution of the predictions. We then fit the Spliced Binned-Pareto distribution into the aforementioned distributional layer, which utilizes a flexible binned distribution to model the base and two Generalized Pareto Distributions to capture the tails. Displayed below is a table that illustrates the flexibility of the distribution compared to predecessor methods.

<center>  <img src="images/comparison.png" alt="comparison" width="700"/> </center> 

A multivariate time-series concatenated with static features version of this solution has been applied to model [NFL's Next Gen stat's new Passing Metric](https://www.amazon.science/blog/the-science-behind-nfl-next-gen-stats-new-passing-metric). Model performance can be found in the link provided.

<a id='phase_2'></a>
## Phase II: Training Pipeline

In this section, we will guide you through the training pipeline employing SageMaker Pipelines, which is composed of the following steps:
* [Data Generation](#data_generation)
* [Hyperparameter Tuning Step](#hyperparameter_tuning)
* [Model Step](#model_step)
* [Model Evaluation Step](#model_evaluation)
* [Model Registration](#model_registration)
* [Condition Step](#condition)

<center>  <img src="images/pipeline.png" alt="pipeline" width="75%" height="75%"/> </center>  

Define Variables and helper function

In [None]:
project_name = "SBP"
pipeline_name = project_name + "-Pipeline"
experiment_name = pipeline_name + "-Experiment"
model_package_group_name = project_name + "-ModelGroup"

# Return an S3 path based on the id of this pipeline execution, which is a property only
# resolved at runtime but can be accessed at compile time as an execution variable
def dynamic_S3_path(path):
    return Join(
        on="/",
            values=[
                "s3:/",
                bucket_name,
                pipeline_name,
                "executions",
                ExecutionVariables.PIPELINE_EXECUTION_ID,
                path,
            ],
    )

### [Sagemaker Pipelines](https://aws.amazon.com/sagemaker/pipelines/)

This powerful tool allows the creation of ML workflows seamlessly integrated with sagemaker features, using  a user-friendly Python SDK. These workflows can be easily visualized and managed in SageMaker Studio under `Home → Pipelines` on the left tab.

When using Sagemaker Pipelines, you can store and reuse workflow steps leading to increased efficiency and scalability. Furthermore, built-in templates facilitate the swift building, experimenting, evaluating, and registration of models expediting the implementation of CI/CD in your ML environment.

SageMaker Pipelines create a Directed Acyclic Graph (DAG) where each node is referred to as a `Step` which have a variety of [types](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#build-and-manage-steps-types) and whose Edges are its requisites that can either be defined using [data dependency](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#build-and-manage-properties) or a [custom type dependency](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#build-and-manage-custom-dependency).

**Define Variables**

Create Pipeline Session

In [None]:
pipeline_session = PipelineSession()

[Cache Steps](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching.html) \
To optimize cost and time, we utilize step caching in the pipeline. By caching the steps, we avoid re-executing them if their outputs remain the same. This means that if a step has already been successfully executed with the same input data and parameters, the cached result will be used instead of recomputing the step.

In [None]:
cache_config = CacheConfig(
    enable_caching=True,
    expire_after="7d"
)

**Define Sagemaker Pipelines parameters** \
Below variables are the default values used in our pipeline. It can be overwritten in [this cell](#overwrite_variables) of the notebook and when executing using the UI. This can also be done by going on the left pane and then clicking: `Home → Pipelines → Pipeline Name → Create Execution`. 

In [None]:
pipeline_parameters = {}

pipeline_parameters['train_data_size'] = ParameterInteger(
    name="TrainDataSize",
    default_value=5_000,
)

pipeline_parameters['val_data_size'] = ParameterInteger(
    name="ValidationDataSize",
    default_value=1_000,
)

pipeline_parameters['test_data_size'] = ParameterInteger(
    name="TestDataSize",
    default_value=1_000,
)

pipeline_parameters['model_approval_status'] = ParameterString(
    name="ModelApprovalStatus", 
    default_value="Approved"
)

pipeline_parameters['context_length'] = ParameterString(
    name="ContextLength", 
    default_value="100"
)

pipeline_parameters['lead_time'] = ParameterString(
    name="LeadTime", 
    default_value="1"
)



We will set the default training instance type as `ml.g4dn.xlarge` which is a GPU instance. The comprehensive list of instance types can be found [here](https://docs.aws.amazon.com/sagemaker/latest/dg/notebooks-available-instance-types.html). To save on cost, we will be [overwriting this instance with CPUs](#overwrite_variables). Furthermore, the use of multiple GPUs will require a [service quota increase request](https://docs.aws.amazon.com/servicequotas/latest/userguide/request-quota-increase.html). It may take time and further communication to get approval depending on type of instance. The reason you may want to consider use of GPUs is because they may lower compute time for training neural networks especially for larger batch sizes. It is recommended to analyze compute time and cost whenever you are performing rigorous hyperparameter tuning.

### Table of Price Comparison (us-east-1)
Below is the estimated cost per training job of an SBP model. When changing training instance type, ensure that you also change the instance type of the [training image](#image). For our use-case, due to the very small dataset we are using coupled with the simple architecture, we are not noticing any major differences in training time. Pricing is dependent on the [region](https://aws.amazon.com/sagemaker/pricing/).

| Instance Type                     | Price per Hour | Compute Time |     Total Cost |
|:----------------------------------|---------------:|-------------:|---------------:|
| ml.m5.xlarge (Standard Instance)  |  &dollar;0.204 |      ~8 Mins | &dollar; 0.027 |
| ml.m5.2xlarge (Standard Instance) |   &dollar;0.23 |      ~7 Mins | &dollar; 0.027 |
| ml.c5.xlarge (Compute-Optimized)  |  &dollar;0.204 |      ~7 Mins | &dollar; 0.024 |
| ml.c5.2xlarge (Compute-Optimized) |  &dollar;0.408 |      ~5 Mins | &dollar; 0.034 |
| ml.g4dn.xlarge (GPU)              | &dollar;0.7364 |      ~8 Mins | &dollar; 0.098 |
| ml.g4dn.2xlarge (GPU)             |   &dollar;0.94 |      ~7 Mins | &dollar; 0.110 |

In [None]:
pipeline_parameters['training_instance_type'] = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.g4dn.xlarge",
)

All of the below training parameters are within the default ranges of SageMaker [Service Quotas](https://us-west-1.console.aws.amazon.com/servicequotas/home/services/sagemaker/quotas). If needed, feel free to [request an increase](https://docs.aws.amazon.com/servicequotas/latest/userguide/request-quota-increase.html) for any of the following:

In [None]:
pipeline_parameters['max_jobs'] = ParameterInteger(
    name="MaxJobs", 
    default_value=2
)

pipeline_parameters['max_parallel_jobs'] = ParameterInteger(
    name="MaxParallelJobs", 
    default_value=2
)

In [None]:
pipeline_parameter_list = list(pipeline_parameters.values())

<a id='image'></a>
**Define Images** \
We will be using Deep Learning Containers (DLC's) to run the majority of the Pipeline Steps. Full list of images can be found in this [link](https://github.com/aws/deep-learning-containers/blob/master/available_images.md).

In [None]:
train_image_uri = image_uris.retrieve(
    framework='pytorch',
    region=region,
    version='1.13',
    py_version='py39',
    image_scope='training', 
    instance_type='ml.c5.2xlarge'
)

inference_image_uri = image_uris.retrieve(
    framework='pytorch',
    region=region,
    version='1.13',
    py_version='py39',
    image_scope='inference', 
    instance_type="ml.c5.2xlarge"
)

<a id='data_generation'></a>
#### Data Generation Step
We will be leveraging a synthetic time series dataset that exhibits a sinusoidal mean and asymmetric heavy-tailed noise. This data will be generated during the execution of SageMaker Pipelines.

To generate the data, we will utilize the `create_ds_asymmetric` function from the `processing.generate_data` module. Below is a sample of the data generated using this function. You can access the actual data generated by your pipeline in the `plots` directory of the execution folder associated with this notebook. Please ensure that you have run all the necessary steps leading up to this point for the link in the [cell](#s3_path) to work and generate the data.

To perform these calculations we are using a [Processing Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing).

<center>  <img src="images/train_data.png" alt="train_data"/> </center> 

In [None]:
base_job_name = f"{pipeline_name}/data-generation-step"

script_processor = PyTorchProcessor( 
    command=['python3'],
    role=role,
    instance_count=1,
    instance_type="ml.c5.2xlarge",
    base_job_name=base_job_name,
    sagemaker_session=pipeline_session,
    framework_version='1.13',
    py_version='py39'
)


processor_run_args = script_processor.run(
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/train",
            destination=dynamic_S3_path("train")
        ),
        ProcessingOutput(
            output_name="validation",
            source="/opt/ml/processing/validation",
            destination=dynamic_S3_path("validation")
        ),
        ProcessingOutput(
            output_name="test",
            source="/opt/ml/processing/test",
            destination=dynamic_S3_path("test")
        ),
        ProcessingOutput(
            output_name="plots",
            source="/opt/ml/processing/plots",
            destination=dynamic_S3_path("plots")
        ),
    ],
    code="processing/generate_data.py",
)

step_process = ProcessingStep(
    name="GenerateData",
    step_args=processor_run_args,
    job_arguments=[
        "--train_size",
        str(pipeline_parameters['train_data_size'].default_value),
        "--validation_size",
        str(pipeline_parameters['val_data_size'].default_value),
        "--test_size",
        str(pipeline_parameters['test_data_size'].default_value),
    ],
    cache_config=cache_config
)


<a id='hyperparameter_tuning'></a>
#### [Hyperparameter Tuning Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-tuning)

Sagemaker provides a [hyperparameter tuning](https://aws.amazon.com/sagemaker/automatic-model-tuning/) feature that is also available within SageMaker Pipelines. This step runs multiple jobs optionally in parallel, in a [variety of methods](https://docs.aws.amazon.com/sagemaker/latest/dg/automatic-model-tuning-how-it-works.html) using predefined [Hyperparameter ranges](https://docs.aws.amazon.com/sagemaker/latest/dg/automatic-model-tuning-define-ranges.html). During training, this feature conveniently tracks [model metrics](https://docs.aws.amazon.com/sagemaker/latest/dg/automatic-model-tuning-define-metrics-variables.html) by parsing **stdout** and **strerr**. This step returns the best model based on the aforementioned model metrics.

To optimize costs, we will focus our tuning efforts on a select subset of the available hyperparameters, specifically those that effectively showcase SageMaker's Hyperparameter Tuning feature. There are three types of hyperparameters at our disposal for this purpose: 
 * **Time-series Related:** due to the synthetic nature of our data, this set of parameters will be irrelevant, so we will be keeping it static.
     * `Context Length` - Number of recent historical time steps that will be fed into the model
     * `Lead Time` - Number of time steps to predict AKA forecast horizon 
 * **Neural Network Related:** we will be tuning our model to the hyperparameters below. 
     * `Learning Rate` - Yes
     * `Epochs` - Yes
     * `Number of TCN Layers` - No 
 * **Spliced Binned Pareto [Specific Parameters](model/sbp.py):**
     * `Number of Bins`  (100) - This number is used to create the number of bins used to model the base of the distribution. The authors of the model have tested this parameter across different datasets in many different industries, and this number has shown to be most effective.
     * `Percentile Tail`  (0.05) - This parameter represents the size of the generalized Pareto distributions at the tail. Similar to the previous parameter, it has been rigorously tested by the authors.


In [None]:
distributions = ["sbp", "gaussian"] 

def create_model_component(model_name):
    estimator = PyTorch(
        role=role,
        instance_type=pipeline_parameters['training_instance_type'],
        output_path=f"s3://{bucket_name}/{pipeline_name}/models/",
        instance_count=1,
        source_dir='model',
        image_uri=train_image_uri,
        entry_point=model_name + ".py",
        base_job_name = f"{pipeline_name}/training/job",
    )

    hyper_ranges = {
        "learning-rate": ContinuousParameter(1e-5, 1e-4,  scaling_type="Logarithmic"),
        "epochs": IntegerParameter(30, 40),
    }

    objective_name = "logloss"
    metric_definitions = [{"Name": objective_name, "Regex": "Validation Loss: ([0-9\\.]+)"}]

    tuner_log = HyperparameterTuner(
        estimator,
        objective_name,
        hyper_ranges,
        metric_definitions,
        max_jobs=pipeline_parameters['max_jobs'], 
        max_parallel_jobs=pipeline_parameters['max_parallel_jobs'],
        objective_type="Minimize",
        base_tuning_job_name=f"{pipeline_name}/HPTuning/{model_name}",
        random_seed=10
    )

    step_tuning = TuningStep(
        name=f"{model_name}-HpTuning",
        display_name=f"{model_name}-HpTuning",
        tuner=tuner_log,
        inputs={
            'train': TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
                content_type="text/csv",
            ),
           "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
        job_arguments=[
            "--lead_time",
            str(pipeline_parameters['lead_time'].default_value),
            "--context_length",
            str(pipeline_parameters['context_length'].default_value),
        ],
        cache_config=cache_config
    )
    
    return step_tuning
    
tuning_steps = [create_model_component(distribution) for distribution in distributions]

<a id='model_step'></a>
#### [Model Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-model)

Upon finishing the hyperparameter tuning process, we will create a [SageMaker Model Object](https://sagemaker.readthedocs.io/en/stable/api/inference/model.html) from the outputs of the best tuning job. This SageMaker Model object will be used in the [endpoint step](#endpoint_step) to deploy the trained model.

In [None]:
def create_model_step(tuning_step):
    model_name = tuning_step.display_name.split('-')[0]
    best_model = PyTorchModel(
        source_dir='model',
        entry_point=model_name + ".py",
        role=role,
        model_data=tuning_step.get_top_model_s3_uri(
            top_k=0, 
            s3_bucket=bucket_name, 
            prefix=f"{pipeline_name}/models"
        ),
        image_uri=inference_image_uri,
        sagemaker_session=pipeline_session,
    )

    model_step = ModelStep(
        name=f'{model_name}-CreateModel',
        display_name=f'{model_name}-CreateModel',
        step_args=best_model.create(instance_type="ml.c5.2xlarge"),
    )
    return best_model, model_step, model_name

best_models = {}
model_steps = []
for tuning_step in tuning_steps:
    best_model, model_step, model_name = create_model_step(tuning_step)
    best_models[model_name] = best_model
    model_steps.append(model_step)

<a id='model_evaluation'></a>
#### Model Evaluation Step
To perform these calculations we are using a [Pytorch Processor](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job-frameworks-pytorch.html), which is a type of  [Processing Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing) in SageMaker Pipelines. This step measures the performance of our trained model using the test set, which was not used during the hyperparameter tuning stage. The test set contains data that has never been seen before, making it a good representation of the model's accuracy in real-world scenarios.

To evaluate the accuracy of the density estimation of each of the method, we use Probability-Probability (PP) plots (PP-plots). For a given quantile level $q$, we compute $ y_q$, which is the fraction of points that fell below the given quantile $z_q{(t)} $ of their corresponding predictive distribution:
\begin{align}
  y_q = \frac{\sum_{t=2}^{T} \mathbb{I}[ {x}_t < z_{1-q}{(t)} ] }{T}, \hspace{40pt}   z_q{(t)} : p\left( {x}_{t} > z_q{(t)} \middle| {x}_{1:t-1} \right)< q
\end{align}
To obtain a quantitative score, we measure how good the tail estimate is by computing the Root Mean Square Error (RMSE) between  $y_q $ and $q $ for all measured quantiles $q$.



In [None]:
script_eval = PyTorchProcessor(
    command=["python3"],
    instance_type="ml.c5.2xlarge",
    instance_count=1,
    base_job_name = f"{pipeline_name}/evaluation/job",
    sagemaker_session=pipeline_session,
    role=role,
    framework_version='1.13',
    py_version='py39',
)


eval_inputs = [
    ProcessingInput(
        source=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
        destination="/opt/ml/processing/input/train",
    ),
    ProcessingInput(
        source=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
        destination="/opt/ml/processing/input/validation",
    ),
    ProcessingInput(
        source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
        destination="/opt/ml/processing/input/test",
    )
]

for tuning_step in tuning_steps:
    eval_inputs.append(
        ProcessingInput(
            source=tuning_step.get_top_model_s3_uri(
                top_k=0, 
                s3_bucket=bucket_name, 
                prefix=f"{pipeline_name}/models"
            ),
            destination=f"/opt/ml/processing/model/{tuning_step.display_name.split('-')[0]}",
        ),
    )

processor_args = script_eval.run(
    code="processing/evaluate.py",
    inputs=eval_inputs,
    outputs=[
        ProcessingOutput(
            output_name="evaluation", 
            source="/opt/ml/processing/evaluation",
            destination=dynamic_S3_path("evaluation")
        ),
        ProcessingOutput(
            output_name="plots",
            source="/opt/ml/processing/plots",
            destination=dynamic_S3_path("plots")
        ),
    ],
)


[Property Files](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html)

After performing the evaluation and generating the desired metrics and results in the PyTorch Processor step, we store the outputs in a property file. This property file serves as a record of the evaluation and contains important information such as the model's performance metrics, accuracy measures, and any other relevant evaluation results.

Storing the outputs in a property file allows for easy access and retrieval of the evaluation results. It provides a structured format to organize and store the information generated during the evaluation process. This file can be used for further analysis, reporting, or tracking the performance of different models over time.



In [None]:
evaluation_report = PropertyFile(
    name="BestTuningModelEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

step_eval = ProcessingStep(
    name="EvaluateTopModel",
    step_args=processor_args,
    property_files=[evaluation_report],
   cache_config=cache_config
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=Join(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"],
            'evaluation.json'
        ),
        content_type="application/json",
    )
)

### [Model Registry](https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry.html)
In this step we are preparing the Sagemaker Model Object for deployment. The register method, specific to SageMaker, generates a model package encompassing essential parameters such as content types and AWS instances for inference and transformation. Each model is then wrapped in a `ModelStep`, essentially a package of the model with its associated metadata, ready for deployment.

By leveraging SageMaker Model Registry, we can catalog models primed for production and also manage model versions, associate important metadata, and control the approval status of a model. The registry also facilitates model deployment to production and sets the stage for continuous integration and continuous deployment (CI/CD) pipelines, ensuring seamless model deployment.

<a id='model_registration'></a>
#### Model Registration Step 

In [None]:
registration_steps = {}
for model_name in best_models.keys():
    register_args = best_models[model_name].register(
        content_types=["text/csv"],
        response_types=["text/csv"],
        inference_instances=["ml.c5.2xlarge"],
        transform_instances=["ml.c5.2xlarge"],
        model_package_group_name=model_package_group_name,
        approval_status=pipeline_parameters['model_approval_status'],
        domain="MACHINE_LEARNING",
        description="Robust Deep Time-Series Forecasting",
        task="REGRESSION",
        framework="PYTORCH",
        image_uri=inference_image_uri
    )
    registration_steps[model_name] = ModelStep(
        name=model_name, 
        step_args=register_args
    )

<a id='condition'></a>
#### [Condition Step ](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-condition)

Once we have evaluated the models with the best set of hyperparameters, we will now need to choose one model type (either SBP or Gaussian) to upload to our model registry. We make this selection based on the technique that performed better in overall full distribution accuracy.

SageMaker Pipelines has a condition step that enables us to do this by inputting the evaluation_report (`PropertyFile`) of both models and comparing them. For our purposes, we will be selecting the model type that has the lower RMSE, deeming it the "best" and will move on to the next step.

By leveraging the condition step in SageMaker Pipelines, we can automate this decision-making process and seamlessly move forward with the selected model type for further steps in the pipeline.

In [None]:
condition_steps = []

cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="sbp.full_distribution"
    ),
    right=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="gaussian.full_distribution"
    )
)

step_cond = ConditionStep(
    name="SbpLowerRmse",
    conditions=[cond_lte],
    if_steps=[registration_steps['sbp']],
    else_steps=[registration_steps['gaussian']],
)

condition_steps.append(step_cond)

<a id='create_pipeline'></a>
#### Create Pipeline

`PipelineExperimentConfig` links an experiment to a specific Pipeline execution using a unique run group identifier, such as `SbpForecastTrialExperiment-SBP-Pipeline`. This identifier groups together runs that are associated with the same execution, simplifying the tracking and management of experiments.

This feature enables us to track and compare the performance and outcomes of different runs, allowing for effective experimentation and iteration in machine learning workflows.

In [None]:
steps = [step_process] + tuning_steps + [step_eval] + condition_steps 

pipeline = Pipeline(
    name=pipeline_name,
    parameters=pipeline_parameter_list,
    steps=steps,
    pipeline_experiment_config=PipelineExperimentConfig(
        experiment_name,
        Join(
            on="-", 
            values=[
                "SbpForecastTrialExperiment", 
                pipeline_name
            ]
        ),
    ),
)
pipeline.upsert(role_arn=role)

<a id='overwrite_variables'></a>
<b>Overwrite default variables</b>

In [None]:
execution = pipeline.start(
    parameters=dict(
        TrainingInstanceType="ml.c5.2xlarge"
    )
)

<a id='s3_path'></a>
<b>Execution Details:</b>

In [None]:
execution_id = execution.describe()['PipelineExecutionArn'].split('/')[-1]
print(f"Pipeline Execution ID: {execution_id}")
print(f"Execution Artifacts Link: https://s3.console.aws.amazon.com/s3/buckets/sagemaker-{region}-{account_id}?prefix={pipeline_name}/executions/{execution_id}/&region={region}")

In [None]:
%%time
execution.wait()

In [None]:
execution.list_steps()

In [None]:
execution.describe()

<a id='phase_3'></a>
## Phase III: Model Performance Analysis

### Evaluation Results

Below is the table used by the [condition step](#condition) to select the better model between the TCN with Gaussian distribution and TCN with SBP distribution. The Spliced Binned Pareto Distribution produced a lower RMSE on all segments based on its best model against the Gaussian Distribution's model.

In [None]:
content_object = s3_resource.Object(bucket_name, f"{pipeline_name}/executions/{execution_id}/evaluation/evaluation.json")
file_content = content_object.get()['Body'].read().decode('utf-8')
json_content = json.loads(file_content)
rmse_table = pd.DataFrame(json_content).T

In [None]:
display(
    rmse_table.style.set_caption("Root Mean Square Error (RMSE)")
    .set_table_styles(
        [{"selector": "caption", "props": [("font-size", "16px")]}]
    )
    .apply(highlight_min)
)

### PP-Plots

The Probability-Probability Plot assesses the goodness of fit between two distributions (actual vs predicted). A perfect fit would entail all points to be on the diagonal, and the further away it is, the less it is conforming to the actual distribution. In the below plots, we are comparing the SBP vs Gaussian's predicted distribution against the actual distribution. As we can observe, SBP's predicted distribution is closer to the actual distribution compared to Gaussian's predicted distribution.

The below plot was created in the [Model Evaluation step](#model_evaluation).

In [None]:
def plot_from_s3(img_name, bucket=s3_bucket):
    image_object = bucket.Object(f'{pipeline_name}/executions/{execution_id}/plots/{img_name}')
    image = mpimg.imread(BytesIO(image_object.get()['Body'].read()), 'png')

    plt.figure(figsize=(36, 8))
    plt.axis('off')
    plt.imshow(image)


In [None]:
plot_from_s3("evaluation_plot.png")

### [Experiments](https://aws.amazon.com/sagemaker/experiments/)

SageMaker Experiments is a managed service for creating, managing, and analyzing ML experiments at scale. Experiments are defined as a collection of runs, which consist of all the inputs, parameters, configurations, and results for one interaction of model training. 

SageMaker Experiments is [integrated with hyperparameter optimization (HPO) jobs](https://docs.aws.amazon.com/sagemaker/latest/dg/experiments-sm-integrations.html), so the [hyperparameter tuning step](#hyperparameter_tuning) automatically creates HPO experiments and runs for each training job completed. 

SageMaker Experiments is also [integrated with Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-experiments.html), so the [pipeline creation step](#create_pipeline) automatically creates Pipeline experiments and runs if they do not already exist. This step sets the experiment name to `SBP-Pipeline-experiment` and defines a run group identifier `SbpForecastTrialExperiment-SBP-Pipeline` to group runs associated with the same pipeline execution. 

You can view and analyze these experiments and runs by using SageMaker Studio's [experiment browser](https://docs.aws.amazon.com/sagemaker/latest/dg/experiments-view-compare.html).

For further exploration of the hyperparameter tuning results, you can leverage the [capability to search for training job runs using the current execution id](https://github.com/aws-samples/sagemaker-experiments-and-pipelines/blob/main/02-PipelineExperiments.ipynb). The code below provides an approach to delve into the details and insights gained from the hyperparameter tuning process.

In [None]:
# SM Pipeline injects the Execution ID into the display name
source_arn_filter = Filter(
    name="DisplayName", 
    operator=Operator.CONTAINS, 
    value=execution_id
)

source_type_filter = Filter(
    name="Source.SourceType", 
    operator=Operator.EQUALS, 
    value="SageMakerTrainingJob"
)

search_expression = SearchExpression(
    filters=[source_arn_filter, source_type_filter]
)

component_analytics = ExperimentAnalytics(
    sagemaker_session=sagemaker_session,
    search_expression=search_expression.to_boto()
)
analytic_table = component_analytics.dataframe()
analytic_table = analytic_table[~analytic_table['sagemaker_estimator_module'].isnull()] # Remove repack model training job (model/_repack_model.py)
analytic_table.head()

### Load Best Model From Model Registry

In [None]:
model_package_dict = sm_client.list_model_packages(
        ModelPackageGroupName=model_package_group_name,
        ModelApprovalStatus="Approved",
        SortBy="CreationTime",
        MaxResults=100,
    )["ModelPackageSummaryList"][0]

model_description = sm_client.describe_model_package(
    ModelPackageName=model_package_dict["ModelPackageArn"]
)

model_package_arn = model_description["ModelPackageArn"]
model = ModelPackage(
    role=role, 
    model_package_arn=model_package_arn, 
    sagemaker_session=sagemaker_session
)

<a id='phase_4'></a>
## Phase IV: Model Serving

In terms of scoring the model (applying the model to a new dataset), you have two options:
1. Using [Batch Transform](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html) as part of the same pipeline or the use of a separate [Inference Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/inference-pipeline-batch.html). A scenario of when you might want to consider using these pipelines is if you don't require live predictions and can tolerate some buffer when processing because this methodology will save you on cost. This technique is optimal for forecasting larger time steps like daily, weekly or monthly intervals.
2. Creating a [Real-Time Inference Endpoint](https://docs.aws.amazon.com/sagemaker/latest/dg/realtime-endpoints.html) or a [Serverless Inference Endpoint](https://docs.aws.amazon.com/sagemaker/latest/dg/serverless-endpoints.html). This is perfect for cases when you need to perform time-sensitive predictions at the expense of higher financial cost. Currently, SageMaker Pipelines does not support the inclusion of deploying and scoring an endpoint within a pipeline.

The choice between Batch Transform and a real-time inference endpoint depends on your specific requirements. Consider your application's needs, including the requirement for real-time predictions and the associated cost considerations, when deciding which approach to take for scoring your model within the SageMaker pipeline.


<a id='endpoint_step'></a>
### [Deploy Endpoint](https://aws.amazon.com/sagemaker/deploy)

In this example pipeline, we deploy a real-time inference endpoint. You can view this endpoint in Sagemaker Studio under `Home -> Deployments -> Endpoints`.

Our PyTorch model [serves requests](https://sagemaker.readthedocs.io/en/stable/frameworks/pytorch/using_pytorch.html#serve-a-pytorch-model) according to [model/endpoint_serving.py](model/endpoint_serving.py).


In [None]:
endpoint_name = "SBP-endpoint-" + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
print(f"EndpointName: {endpoint_name}")
model.deploy(
    initial_instance_count=1, 
    instance_type="ml.c5.2xlarge",
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer(),
    endpoint_name=endpoint_name
)

In [None]:
predictor = Predictor(endpoint_name=endpoint_name)

### Inference

Score the model with completely new data. This is an example of how the model functions within a production environment, generating predictions for datasets that have not been encountered previously.

In [None]:
np.random.seed(275)
new_data = create_ds_asymmetric(300)
payload = {"inputs": [[new_data.tolist()[0][0][-103:]]]}
jstr = json.dumps(payload)

In [None]:
p = predictor.predict(
    jstr,
    initial_args={
        "ContentType": 'application/json'
    }
)
prediction = ast.literal_eval(p.decode("utf-8"))
prediction_df = pd.DataFrame(prediction)
historical_list = new_data.tolist()[0][0]

plot_sbp_distribution(
    prediction_df,
    historical_list,
    empty_list=[None]*(len(historical_list)-1),
    last_value=[historical_list[-1]],
    connect_length=5,
    line_width=5
)

display(prediction_df)

## Cleanup

In [None]:
for d in sm_client.list_model_packages(ModelPackageGroupName=model_package_group_name)[
    "ModelPackageSummaryList"
]:
    print(d["ModelPackageArn"])
    sm_client.delete_model_package(ModelPackageName=d["ModelPackageArn"])

sm_client.delete_model_package_group(ModelPackageGroupName=model_package_group_name)

Clean up Amazon SageMaker experiment resources with the [SageMaker Python SDK](https://docs.aws.amazon.com/sagemaker/latest/dg/experiments-cleanup.html)

In [None]:
from sagemaker.experiments.experiment import _Experiment

exp = _Experiment.load(experiment_name=experiment_name, sagemaker_session=sagemaker_session)
exp._delete_all(action="--force")

In [None]:
predictor.delete_endpoint()

In [None]:
pipeline.delete()