# Time Series Forecasting with AutoMLV2

This notebook uses Amazon SageMaker Autopilot to train a time-series model and produce predictions against the trained model. At the top-level, customers provide a set of tabular historical data on S3 and make an API to train a model. Once the model has been trained, you can elect to produce prediction as a batch or via a real-time endpoint. As part of the training process, SageMaker Autopilot manages and runs multiple time series models concurrently. All of these models are combined into a single ensembled model which blends the candidate models in a ratio that minimizes forecast error. Customers are provided with metadata and models for the ensemble and all underlying candidate models too. SageMaker Autopilot orchestrates this entire process and provides several artifacts as a result.

These artifacts include:

- backtest (holdout) forecasts per base model over multiple time windows,
- accuracy metrics per base model,
- backtest results and accuracy metrics for the ensembled model,
- a scaled explainability report displaying the importance of each covariate and static metadata feature.
- all model artifacts are provided as well on S3, which can be registered or use for batch/real-time inference



## Setup

This notebook has been run in Amazon SageMaker Studio. The space is configured with a `t3.medium`, with image `SageMaker Distribution 1.4` and kernel `Python 3 (ipykernel)`.

![space-config.png](images/1-space-config.png)

In [None]:
%pip install --upgrade pip boto3 botocore sagemaker --quiet

In [None]:
# SageMaker Python SDK Dependencies
import sagemaker
from sagemaker import get_execution_role, image_uris, Model
from sagemaker.automl.automlv2 import AutoMLV2, AutoMLTimeSeriesForecastingConfig, AutoMLDataChannel
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.transformer import Transformer
from sagemaker.workflow.pipeline_context import PipelineSession

# Local helper functions
from config.filling_config import filling_config
from pipeline.automlv2_inference_pipeline import run_inference_pipeline
from utils.utils import copy_download_training_data, split_train_test

# Other dependencies
import boto3
import datetime
import os
import pandas as pd
from io import StringIO
from time import gmtime, sleep, strftime

## Copy Data Between S3 Buckets 

We provide a sample set of data to accompany this notebook. You may use our synthetic dataset, or alter this notebook to accommodate your own data. As a note, the next cell will copy a file to your S3 bucket and prefix defined in the last cell. As an alternate, we provide a method to copy the file to your local disk too.

IMPORTANT: When training a model, your input data can contain a mixture of covariate and static item metadata. Take care to create future-dated rows that extend to the end of your prediction horizon. In the future-dated rows, carry your static item metadata and expected covariate values. Future-dated target-value (y) should be empty. Please download the example synthetic file using the S3 copy command in the next cell. You can observe the data programmatically or in a text editor as an example.

In [None]:
region = boto3.Session().region_name
sagemaker_session = sagemaker.Session()

# Modify the following default_bucket to use a bucket of your choosing
bucket = sagemaker_session.default_bucket()
prefix = 'automlv2-time-series-data'

# Configure SageMaker permissions
try:
    role = get_execution_role() 
except:
    # if you're running the notebook outside SageMaker Studio, replace with Execution Role name
    role = boto3.client("iam").get_role(RoleName="YOUR-SAGEMAKER-EXECUTION-ROLE-NAME")["Role"].get("Arn")

# This is the client we will use to interact with SageMaker Autopilot
sm = boto3.Session().client(service_name="sagemaker", region_name=region)

In [None]:
full_data_csv = copy_download_training_data(
    source_bucket="amazon-forecast-samples", 
    source_key="autopilot/synthetic-food-demand.csv",
    destination_bucket=bucket,
    destination_prefix=prefix, 
    destination_suffix="/full-data/synthetic-food-demand.csv", 
    download=True
)                             

In [None]:
df = pd.read_csv(full_data_csv)

df.head()

In [None]:
split_train_test(df)

In [None]:
s3_client = boto3.client('s3')
bucket_name = bucket
folder_prefix = 'automlv2-time-series-data'

s3_client.upload_file('data/train.csv', bucket, f"{folder_prefix}/train/train.csv")

s3_client.upload_file('data/train.csv', bucket, f"{folder_prefix}/batch_transform/input/batch-food-demand.csv")

s3_client.upload_file('data/test.csv', bucket, f"{folder_prefix}/test/test.csv")

In [None]:
timestamp_suffix = strftime("%Y%m%d-%H%M%S", gmtime())
auto_ml_job_name = "ts-" + timestamp_suffix
print("AutoMLJobName: " + auto_ml_job_name)

## Define our AutoML time series config 

In [None]:
time_series_config = AutoMLTimeSeriesForecastingConfig(
    forecast_frequency='W',  # The frequency of predictions in a forecast.
    forecast_horizon=4,  # The number of time-steps that the model predicts.
    forecast_quantiles=['p50','p60','p70','p80','p90'], # The quantiles used to train the model for forecasts at a specified quantile. 
    filling=filling_config,
    item_identifier_attribute_name="product_code",
    target_attribute_name='unit_sales',
    timestamp_attribute_name='timestamp',
    grouping_attribute_names=['location_code']
)

train_uri = 's3://{}/{}/train/'.format(bucket, prefix)

### Configuration Highlights:

`forecast_frequency`
* Description: Specifies how often predictions should be made.<br>
* Value 'W': Indicates that forecasts are expected on a weekly basis. The model will be trained to understand and predict data as a sequence of weekly observations.

`forecast_horizon`
* Description: Defines the number of future time-steps the model should predict.<br>
* Value 4: The model will forecast four time-steps into the future. Given the weekly frequency, this means the model will predict the next four weeks of data from the last known data point.

`forecast_quantiles`
* Description: Specifies the quantiles at which to generate probabilistic forecasts.<br>
* Values ['p50','p60','p70','p80','p90']: These quantiles represent the 50th, 60th, 70th, 80th, and 90th percentiles of the forecast distribution, providing a range of possible outcomes and capturing forecast uncertainty. For instance, the p50 quantile (median) might be used as a central forecast, while p90 provides a higher-end estimate, accounting for potential variability.

`filling`
* Description: Defines how missing data should be handled before training, specifying filling strategies for different scenarios and columns.<br>
* Value filling_config: This should be a dictionary detailing how to fill missing values in your dataset, such as filling missing promotional data with zeros or specific columns with predefined values. This ensures the model has a complete dataset to learn from, improving its ability to make accurate forecasts.

`item_identifier_attribute_name`
* Description: Specifies the column that uniquely identifies each time series in the dataset.<br>
Value "product_code": This setting indicates that each unique product code represents a distinct time series. The model will treat data for each product code as a separate forecasting problem.

`target_attribute_name`
* Description: The name of the column in your dataset that contains the values you want to predict.<br>
Value 'unit_sales': Designates the unit_sales column as the target variable for forecasts, meaning the model will be trained to predict future sales figures.

`timestamp_attribute_name`
* Description: The name of the column indicating the time point for each observation.<br>
Value 'timestamp': Specifies that the timestamp column contains the temporal information necessary for modeling the time series.

`grouping_attribute_names`
* Description: A list of column names that, in combination with the item identifier, can be used to create composite keys for forecasting.<br>
Value ['location_code']: This setting means that forecasts will be generated for each combination of product_code and location_code. It allows the model to account for location-specific trends and patterns in sales data.

## Create an AutoMLV2 Job to train the model

In [None]:
%%time
automl_sm_job = AutoMLV2(
    problem_config=time_series_config,
    role=role,
    sagemaker_session=sagemaker_session,
    base_job_name='time-series-forecasting-job',
    output_path=f's3://{bucket}/{prefix}/output'
)

In [None]:
automl_sm_job.fit(
    inputs=[AutoMLDataChannel(s3_data_type='S3Prefix', s3_uri=train_uri, channel_type='training')],
    wait=True,
    logs=True
)

## Retrieve the best model 

In [None]:
# Get the best candidate(s) from the AutoML job

# Option 1: Run if you've just fit your AutoML job
best_candidate = automl_sm_job.best_candidate()

# Option 2: Run if you know what job name you want to find the best candidate from 
# best_candidate = automl_sm_job.best_candidate(job_name='time-ser-2024-03-11-18-33-48-786')

best_candidate_name = best_candidate['CandidateName']

print('BestCandidateName:', best_candidate_name)

## Deploy the best model to a SageMaker Real-time endpoint

If you want to perform real-time inference, review this section. If you want to perform batch processing, you may skip the real-time inference section and move to [Batch Predictions (Inference)](https://github.com/aws/amazon-sagemaker-examples/blob/7f1133ebc4e1fe28bcedbeca127c3b95be566d9d/autopilot/#batch).

In [None]:
endpoint_name = f"ep-{best_candidate_name}-automl-ts"

automl_sm_model = automl_sm_job.create_model(name=best_candidate_name, candidate=best_candidate)

predictor = automl_sm_model.deploy(initial_instance_count=1, endpoint_name=endpoint_name, instance_type='ml.m5.large')

Now, we test the inference:

In [None]:
# A small sample file that corresponds to the sample training dataset and trained model schema
!aws s3 cp s3://amazon-forecast-samples/autopilot/real-time-payload.csv data/real-time-payload.csv

input_file = './data/real-time-payload.csv'
f=open(input_file,'r')
inference_data = f.read()
f.close()

In [None]:
from sagemaker.predictor import Predictor

realtime_predictor = Predictor(
    endpoint_name=endpoint_name,
    sagemaker_session=sagemaker_session
)

initial_args = {
    "EndpointName": endpoint_name,
    "Body": inference_data,
    "ContentType": "text/csv"
    }

In [None]:
response = realtime_predictor.predict(
    data=inference_data,
    initial_args=initial_args
)

In [None]:
# Decoding the byte data to a string, assuming UTF-8 encoding
decoded_data = response.decode('utf-8')

output_file = 'data/real-time-prediction-output.csv'
# Writing the decoded data to a CSV file
with open(output_file, 'w', newline='') as file:
    file.write(decoded_data)

In [None]:
df = pd.read_csv(StringIO(decoded_data), sep=',')
df.head(10)

## Deploy the best model to a SageMaker Asynchronous Endpoint

Amazon SageMaker Asynchronous Inference is a capability in SageMaker that queues incoming requests and processes them asynchronously. This option is ideal for requests with large payload sizes (up to 1GB), long processing times (up to one hour), and near real-time latency requirements. Asynchronous Inference enables you to save on costs by autoscaling the instance count to zero when there are no requests to process, so you only pay when your endpoint is processing requests.

In [None]:
# Retrieve the model artifacts
async_image = best_candidate['InferenceContainerDefinitions']['CPU'][0]['Image']
async_model_data = best_candidate['InferenceContainerDefinitions']['CPU'][0]['ModelDataUrl']
async_env = best_candidate['InferenceContainerDefinitions']['CPU'][0]['Environment']

In [None]:
automl_sm_async_model = Model(image_uri=async_image, model_data=async_model_data, env=async_env, role=sagemaker.get_execution_role())

In [None]:
from sagemaker.async_inference import AsyncInferenceConfig

endpoint_name_async = f"ep-{best_candidate_name}-automl-ts-async"

# Deploy the model to the asynchronous endpoint
async_predictor = automl_sm_async_model.deploy(
    endpoint_name=endpoint_name_async,
    instance_type="ml.m5.xlarge", initial_instance_count=1,
    serializer=sagemaker.serializers.CSVSerializer(), deserializer=sagemaker.deserializers.CSVDeserializer(),
    async_inference_config=AsyncInferenceConfig(output_path=f"s3://{bucket}/{prefix}/async_output/"),
)

In [None]:
input_location = copy_download_training_data(
    source_bucket="amazon-forecast-samples", 
    source_key="autopilot/real-time-payload.csv",
    destination_prefix=prefix,
    destination_bucket=bucket,
    destination_suffix="/realtime-data/real-time-payload.csv", 
    download=False
)

s3_input_location = f"s3://{input_location}"

In [None]:
from sagemaker.predictor_async import AsyncPredictor

async_realtime_predictor = AsyncPredictor(
    realtime_predictor
)

async_initial_args = {
    "EndpointName": endpoint_name_async,
    "InputLocation": s3_input_location,
    "InvocationTimeoutSeconds": 3600
    }

In [None]:
async_response = async_realtime_predictor.predict(
    input_path=s3_input_location,
    initial_args=async_initial_args
)

In [None]:
# Decoding the byte data to a string, assuming UTF-8 encoding
async_decoded_data = async_response.decode('utf-8')

async_output_file = 'data/async-real-time-prediction-output.csv'
# Writing the decoded data to a CSV file
with open(async_output_file, 'w', newline='') as file:
    file.write(async_decoded_data)

In [None]:
df = pd.read_csv(StringIO(async_decoded_data), sep=',')
df.head(10)

## Cleanup: Delete both endpoints

Once you're done with the endpoints, you can delete them.

In [None]:
sagemaker_client = boto3.client('sagemaker')

sagemaker_client.delete_endpoint(EndpointName=endpoint_name_async)
sagemaker_client.delete_endpoint(EndpointName=endpoint_name)

## MLOps with Amazon SageMaker Pipelines

In this code block, we are initializing the AutoML pipeline for a time series forecasting job using SageMaker's `AutoMLV2` class. The `PipelineSession` object is used to manage the SageMaker session specifically for pipeline operations. The `AutoMLV2` object is configured with several parameters:

- `problem_config` specifies the configuration for the time series problem, including aspects like forecast horizon, frequency, etc.

- `role` is the AWS Identity and Access Management (IAM) role that SageMaker assumes to perform operations on your behalf.

- `sagemaker_session` is the session object that `AutoMLV2` will use to interact with SageMaker services.

- `base_job_name` provides a base name for the job, helping with organizing and tracking jobs in SageMaker.

- `output_path` defines the S3 path where the output of the AutoML job, including models and artifacts, will be stored.

This setup initiates the AutoML process, preparing it to automatically analyze the data, select algorithms, and train models best suited for the time series forecasting problem.


In [None]:
pipeline_session = PipelineSession()

automl_pipeline_job = AutoMLV2(
    problem_config=time_series_config,
    role=role,
    sagemaker_session=pipeline_session,
    base_job_name='time-series-forecasting-job',
    output_path=f's3://{bucket}/{prefix}/output'
)

This segment of the code is focused on creating a SageMaker model from the best candidate generated by the AutoML job and retrieving insights about the model:

- `automl_pipeline_model` is created using the `create_model` method of the `AutoMLV2` object, which packages the best candidate model for deployment. This model is identified by `best_candidate_name` and originates from the `best_candidate` data structure.

- `model_insights_report` and `model_explainability_report` are extracted from the `best_candidate`'s properties. These reports provide valuable insights into the model's performance and explainability, offering an in-depth understanding of how the model makes its predictions.

These steps are crucial for model governance and interpretability, ensuring that the selected model is both effective and understandable.


In [None]:
automl_pipeline_model = automl_pipeline_job.create_model(name=best_candidate_name, candidate=best_candidate)

model_insights_report = best_candidate['CandidateProperties']['CandidateArtifactLocations']['ModelInsights']

model_explainability_report = best_candidate['CandidateProperties']['CandidateArtifactLocations']['Explainability']

Now run through a (batch) inference pipeline:

In [None]:
print(f"Running the inference pipeline with model_insights_report = {model_insights_report}")

print(f"Running the inference pipeline with model_explainability_report = {model_explainability_report}")

print(f"Running the inference pipeline with model_name = {best_candidate_name}")

print(f"Running the inference pipeline with model = {automl_pipeline_model}")

In the final block, an inference pipeline is executed, and its progress is monitored:

- `run_inference_pipeline` function is called with several parameters, including the session object, the AutoML model, model name, explainability report, and model insights report. This function is responsible for orchestrating the inference pipeline execution, which involves deploying the model and running inference tasks.

- `describe()` method is called on the `inference_pipeline_execution` object to get a detailed description of the pipeline execution status.

- `wait(delay=30, max_attempts=180)` is invoked to periodically check the status of the pipeline execution, with a 30-second delay between checks, up to a maximum of 1.5 hours. This ensures that the script waits for the inference to complete or fail before proceeding.

- Finally, `list_steps()` method lists all the steps involved in the pipeline execution, providing visibility into the pipeline's components and their statuses.

This comprehensive approach allows for a managed execution of the inference pipeline, ensuring that the deployed model is accurately making predictions and that the process is transparent and trackable.


In [None]:
inference_pipeline_execution = run_inference_pipeline(pipeline_session=PipelineSession(), automl_model=automl_pipeline_model, model_name=best_candidate_name, explainability=model_explainability_report, model_insights=model_insights_report)

inference_pipeline_execution.describe()

inference_pipeline_execution.wait(delay=30, max_attempts=180)  # max. wait: 1.5 hours

inference_pipeline_execution.list_steps()

## Batch Inference with SageMaker Batch Transform

Amazon SageMaker Batch Transform is a high-performance and scalable service designed for running batch predictions on large datasets. It allows users to easily transform data and make predictions by deploying machine learning models without the need to manage any infrastructure. This service is particularly useful for scenarios where you need to process a large amount of data in a batch manner, such as for generating predictions from a trained model on a schedule or in response to specific events. Batch Transform automatically manages the computing resources required, scales them to match the volume of data, and efficiently processes the data in batches, making it a cost-effective solution for batch inference needs.

In [None]:
# Generate a unique name for the transform job
timestamp_suffix = strftime("%Y%m%d-%H%M%S", gmtime())
transform_job_name = f'{best_candidate_name}-' + timestamp_suffix
print("BatchTransformJob: " + transform_job_name)

batch_s3_uri_input = f's3://{bucket}/{prefix}/batch_transform/input/batch-food-demand.csv'

# Creating a transformer object
transformer = Transformer(
    model_name=best_candidate_name,
    instance_count=1,
    instance_type='ml.m5.12xlarge',
    output_path=f's3://{bucket}/{prefix}/batch_transform/output/',
    sagemaker_session=sagemaker_session,
    max_payload=0,  # in MB
    strategy='SingleRecord',
    assemble_with='Line',
)

# Start the transform job
transformer.transform(
    data=batch_s3_uri_input,
    content_type='text/csv',
    split_type='None',
    job_name=transform_job_name
)

# Wait for the transform job to finish
transformer.wait()

# Optionally, to check the status after the job has been initiated
describe_response = sagemaker_session.sagemaker_client.describe_transform_job(TransformJobName=transform_job_name)
job_run_status = describe_response["TransformJobStatus"]

Once completed, resulting prediction files are available at the URI shown in the prior cell, S3OutputPath. We use the API method describe_transform_job to complete this step.

In [None]:
import boto3
import pandas as pd
from io import StringIO

# Initialize an S3 client
s3 = boto3.client('s3')
# Get the object from S3

object_key = '{}/batch_transform/output/batch-food-demand.csv.out'.format(prefix)
obj = s3.get_object(Bucket=bucket, Key=object_key)

# Read the data into a pandas DataFrame
data = obj['Body'].read().decode('utf-8')
df = pd.read_csv(StringIO(data))

# Display the top of the DataFrame
df.head()