<sup> Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. </sup>
<sup> SPDX-License-Identifier: MIT-0 </sup>

# Chronos Pipeline - Training

The purpose of this notebook is to demonstrate how to train a Chronos model using Amazon SageMaker.

**Jupyter Kernel**:

- Please ensure you are using the **Python 3 (Pytorch 2.1.0 Python 3.10 CPU Optimized)** kernel

**Run All**:

- If you are in a SageMaker Notebook instance, you can go to *Cell tab -> Run All*
- If you are in SageMaker Studio, you can go to *Run tab -> Run All Cells*

**Overview**:
- [Pipeline Configuration](#pipeline_configuration)
- [Data Generation and Processing](#data_processing)
- [Model Training and Hyperparameter Tuning](#training)
- [Model Registration](#model_registration)
- [Pipeline Execution](#pipeline_execution)

**Authors**:
- Alston Chan
- Maria Masood
- Nick Biso

In [None]:
import boto3
import sagemaker

from sagemaker import image_uris
from sagemaker.inputs import TrainingInput
from sagemaker.processing import ProcessingOutput
from sagemaker.pytorch import PyTorchModel
from sagemaker.pytorch import PyTorch, PyTorchProcessor
from sagemaker.tuner import ContinuousParameter, HyperparameterTuner
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.functions import Join
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.workflow.steps import ProcessingStep, CacheConfig, TuningStep

In [None]:
account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.Session().region_name
sm_client = boto3.client("sagemaker")
s3_client = boto3.resource('s3')

sagemaker_session = sagemaker.Session()
bucket_name = sagemaker_session.default_bucket()
role = sagemaker.get_execution_role()

s3_resource = boto3.resource('s3', region_name=region)
s3_bucket = s3_resource.Bucket(bucket_name)

print(f"account_id: {account_id}")
print(f"region: {region}")
print(f"bucket_name: {bucket_name}")
print(f"role: {role}")

<a id='pipeline_configuration'></a>
### Pipeline Configuration

Before we dive into creating our SageMaker pipeline for training the Chronos model, we need to set up some key configuration variables. These variables will define the naming conventions for our project components and establish the necessary S3 paths for our pipeline artifacts.

We will then define the parameters for our pipeline. 

In [None]:
project_name = "chronos"
pipeline_name = project_name + "-Pipeline"
experiment_name = pipeline_name + "-Experiment"
model_package_group_name = project_name + "-ModelGroup"

# Store variable for chronos_pipeline_endpoint_inference.ipynb
%store model_package_group_name

# Return an S3 path based on the id of this pipeline execution, which is a property only
# resolved at runtime but can be accessed at compile time as an execution variable
def dynamic_S3_path(path):
    return Join(
        on="/",
            values=[
                "s3:/",
                bucket_name,
                pipeline_name,
                "executions",
                ExecutionVariables.PIPELINE_EXECUTION_ID,
                path,
            ],
    )

In [None]:
instance_type = "ml.p3.2xlarge"

In [None]:
pipeline_session = PipelineSession()

In [None]:
cache_config = CacheConfig(
    enable_caching=True,
    expire_after="7d"
)

In [None]:
pipeline_parameters = {}

pipeline_parameters['train_data_size'] = ParameterInteger(
    name="TrainDataSize",
    default_value=100,
)

pipeline_parameters['val_data_size'] = ParameterInteger(
    name="ValidationDataSize",
    default_value=100,
)

pipeline_parameters['test_data_size'] = ParameterInteger(
    name="TestDataSize",
    default_value=100,
)

pipeline_parameters['model_id'] = ParameterString(
    name="ModelId",
    default_value="amazon/chronos-t5-small"
)

pipeline_parameters['context_length'] = ParameterString(
    name="ContextLength", 
    default_value="100"
)

pipeline_parameters['num_samples'] = ParameterString(
    name="NumSamples", 
    default_value="20"
)

In [None]:
pipeline_parameters['training_instance_type'] = ParameterString(
    name="TrainingInstanceType",
    default_value=instance_type,
)

In [None]:
pipeline_parameters['max_jobs'] = ParameterInteger(
    name="MaxJobs", 
    default_value=2
)

pipeline_parameters['max_parallel_jobs'] = ParameterInteger(
    name="MaxParallelJobs", 
    default_value=2
)

In [None]:
pipeline_parameter_list = list(pipeline_parameters.values())

<a id='data_processing'></a>
### Data Generation and Processing

A crucial step in our Chronos model training pipeline is generating and processing the data. In this section, we'll set up a processing step that generates synthetic data for training, validation, and testing our model.

In [None]:
train_image_uri = image_uris.retrieve(
    framework='pytorch',
    region=region,
    version='2.0',
    py_version='py310',
    image_scope='training', 
    instance_type=instance_type
)

inference_image_uri = image_uris.retrieve(
    framework='pytorch',
    region=region,
    version='2.0',
    py_version='py310',
    image_scope='inference', 
    instance_type=instance_type
)

In [None]:
base_job_name = f"{pipeline_name}/data-generation-step"

script_processor = PyTorchProcessor( 
    command=['python3'],
    role=role,
    instance_count=1,
    instance_type="ml.c5.2xlarge",
    base_job_name=base_job_name,
    sagemaker_session=pipeline_session,
    framework_version='1.13',
    py_version='py39'
)

processor_run_args = script_processor.run(
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/train",
            destination=dynamic_S3_path("train")
        ),
        ProcessingOutput(
            output_name="validation",
            source="/opt/ml/processing/validation",
            destination=dynamic_S3_path("validation")
        ),
        ProcessingOutput(
            output_name="test",
            source="/opt/ml/processing/test",
            destination=dynamic_S3_path("test")
        ),
    ],
    code="processing/generate_data.py",
)

step_process = ProcessingStep(
    name="GenerateData",
    step_args=processor_run_args,
    job_arguments=[
        "--train_size",
        str(pipeline_parameters['train_data_size'].default_value),
        "--validation_size",
        str(pipeline_parameters['val_data_size'].default_value),
        "--test_size",
        str(pipeline_parameters['test_data_size'].default_value),
    ],
    cache_config=cache_config
)

<a id='training'></a>
### Model Training and Hyperparameter Tuning

After generating our data, the next crucial step in our pipeline is to train the Chronos model and optimize its hyperparameters. In this section, we will set up a hyperparameter tuning job that will search for the best model configuration.

In [None]:
def create_model_component(model_name):
    estimator = PyTorch(
        role=role,
        instance_type=pipeline_parameters['training_instance_type'],
        output_path=f"s3://{bucket_name}/{pipeline_name}/models/",
        instance_count=1,
        source_dir='model',
        image_uri=train_image_uri,
        entry_point=model_name + ".py",
        base_job_name = f"{pipeline_name}/training/job",
    )

    hyper_ranges = {
        'learning-rate': ContinuousParameter(1e-5, 1e-4),
    }

    objective_name = "logloss"
    metric_definitions = [{"Name": objective_name, "Regex": "'loss': ([0-9\\.]+),"}]

    tuner_log = HyperparameterTuner(
        estimator,
        objective_name,
        hyper_ranges,
        metric_definitions,
        max_jobs=pipeline_parameters['max_jobs'], 
        max_parallel_jobs=pipeline_parameters['max_parallel_jobs'],
        objective_type="Minimize",
        base_tuning_job_name=f"{pipeline_name}/HPTuning/{model_name}",
        random_seed=10
    )

    step_tuning = TuningStep(
        name=f"{model_name}-HpTuning",
        display_name=f"{model_name}-HpTuning",
        tuner=tuner_log,
        inputs={
            'train': TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
                content_type="text/csv",
            ),
           "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
        job_arguments=[
            "--model_id",
            str(pipeline_parameters['model_id'].default_value),
            "--context_length",
            str(pipeline_parameters['context_length'].default_value),
            "--num_samples",
            str(pipeline_parameters['num_samples'].default_value),
        ],
        cache_config=cache_config
    )
    
    return step_tuning
    
model_component_name = "chronostraining"
tuning_step = create_model_component(model_component_name)

In [None]:
def create_model_step(tuning_step):
    model_name = tuning_step.display_name.split('-')[0]
    best_model = PyTorchModel(
        source_dir='model',
        entry_point=model_name + ".py",
        role=role,
        model_data=tuning_step.get_top_model_s3_uri(
            top_k=0, 
            s3_bucket=bucket_name, 
            prefix=f"{pipeline_name}/models"
        ),
        image_uri=inference_image_uri,
        sagemaker_session=pipeline_session,
    )

    model_step = ModelStep(
        name=f'{model_name}-CreateModel',
        display_name=f'{model_name}-CreateModel',
        step_args=best_model.create(instance_type=instance_type),
    )
    return best_model, model_step, model_name

best_model, model_step, model_name = create_model_step(tuning_step)

<a id='model_registration'></a>
### Model Registration

After training and tuning our Chronos model, the final steps in our pipeline involve registering the best model and assembling all the steps into a cohesive pipeline.

In [None]:
registration_steps = {}

register_args = best_model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=[instance_type],
    transform_instances=[instance_type],
    model_package_group_name=model_package_group_name,
    domain="MACHINE_LEARNING",
    description="Chronos",
    task="REGRESSION",
    framework="PYTORCH",
    image_uri=inference_image_uri
)
registration_steps = ModelStep(
    name=model_name, 
    step_args=register_args
)

In [None]:
steps = [step_process, tuning_step, registration_steps]

pipeline = Pipeline(
    name=pipeline_name,
    parameters=pipeline_parameter_list,
    steps=steps,
    pipeline_experiment_config=PipelineExperimentConfig(
        experiment_name,
        Join(
            on="-", 
            values=[
                "ChronosForecastTrialExperiment", 
                pipeline_name
            ]
        ),
    ),
)
pipeline.upsert(role_arn=role)

<a id='pipeline_execution'></a>
### Pipeline Execution

After creating the pipeline, we will start the pipeline, wait for completion, list pipeline steps, and get a detailed description of the pipeline execution. 

In [None]:
execution = pipeline.start()
execution_id = execution.describe()['PipelineExecutionArn'].split('/')[-1]
print(f"Pipeline Execution ID: {execution_id}")
print(f"Execution Artifacts Link: https://s3.console.aws.amazon.com/s3/buckets/sagemaker-{region}-{account_id}?prefix={pipeline_name}/executions/{execution_id}/&region={region}")

In [None]:
%%time
try:
    execution.wait()
except Exception:
    print(execution.list_steps())

In [None]:
execution.list_steps()

In [None]:
execution.describe()