# Incorporating RTS Data (Python SDK)

> *This notebook should work well in the `Python 3 (Data Science)` kernel in SageMaker Studio, or `conda_python3` in SageMaker Notebook Instances*

**In this notebook** we'll use the **AWS Python SDK** to:

- Import the prepared *Related Time-Series* data from notebook 3 to our existing Amazon Forecast *Dataset Group*
- Train new predictors and generate forecasts using the additional data
- Explore how the extra information affects our forecast quality

Check out Notebook [4a. Incorporating RTS Data (Console)](4a.%20Incorporating%20RTS%20Data%20(Console).ipynb) for an alternative guide through the same steps using the [Amazon Forecast Console](https://console.aws.amazon.com/forecast/home) instead!

Before starting we'll load the required libraries, restore our saved variables from previous notebooks, and establish a connection to the Forecast service:

In [None]:
%load_ext autoreload
%autoreload 2

# Python Built-Ins:
import json
from types import SimpleNamespace

# External Dependencies:
import boto3
from IPython.display import Markdown
import pandas as pd
from pprint import pprint as prettyprint

# Local Dependencies:
import util

In [None]:
%store -r

In [None]:
session = boto3.Session(region_name=region)

forecast = session.client("forecast")
forecast_query = session.client("forecastquery")

s3 = session.resource("s3")
export_bucket = s3.Bucket(export_bucket_name)

## Defining the RTS Dataset

Since our Dataset Group is already created, the first step will be to add Related Time-Series data will be to define the structure of the dataset.

We'll define the **data schema** as below (check this matches the data as shown at the end of the RTS preparation notebook!):

In [None]:
# (Make sure the order of columns matches the data files!)
related_schema = {
    "Attributes": [
        {
            "AttributeName": "timestamp",
            "AttributeType": "timestamp",
        },
        {
            "AttributeName": "temperature",
            "AttributeType": "float",
        },
        {
            "AttributeName": "rain_1h",
            "AttributeType": "float",
        },
        {
            "AttributeName": "snow_1h",
            "AttributeType": "float",
        },
        {
            "AttributeName": "clouds_all",
            "AttributeType": "float",
        },
        {
            "AttributeName": "item_id",
            "AttributeType": "string",
        },
    ],
}

As before, this schema together with a **name**, **frequency** and **domain** will define the dataset entity in the Forecast service:

In [None]:
response = forecast.create_dataset(
    Domain="CUSTOM",
    DatasetType="RELATED_TIME_SERIES",
    DatasetName=project + "_rts",
    DataFrequency="H", 
    Schema=related_schema,
)

rts_arn = response["DatasetArn"]
%store rts_arn
print(f"Created dataset {rts_arn}")

In [None]:
forecast.describe_dataset(DatasetArn=rts_arn)

...And we must **attach** our new dataset to the dataset group, to associate it and have it show in the console:

In [None]:
forecast.update_dataset_group(
    # If you don't have your dataset group ARN (perhaps because you created it via the console), you can set it given
    # your AWS Account ID, region, and dataset group name - something like this:
    # DatasetGroupArn="arn:aws:forecast:ap-southeast-1:123456789012:dataset-group/forecast_poc"
    DatasetGroupArn=dataset_group_arn,
    # If you'd like to temporarily un-link the RTS from your dataset group and then bring it back again, this command
    # can be a useful tool!
    DatasetArns=[tts_arn, rts_arn],
)

## Importing the RTS Data

We're now ready to populate our Amazon Forecast dataset with the contents of our CSV file from S3.

Since this requires the Amazon Forecast service to access the Amazon S3 bucket, this is where we need the service role created in Notebook 0: Which has access to the target bucket and trusts the Forecast service. If you don't have such a role set up in your account yet, refer to notebook 0 for details:

In [None]:
%store -r forecast_role_arn
assert isinstance(forecast_role_arn, str), "`forecast_role_arn` must be an IAM role ARN (string)"

Below we trigger a **dataset import job**, which is a batch **overwriting** process that clears out any pre-existing data in the dataset: *not* appending data to existing records.

Triggering the import requires:

- **Naming** the import job, which will be trackable as an entity in the console
- Identifying the **target dataset** by its Amazon Resource Name (ARN)
- Configuring the **data source**, including the S3 location and also the IAM role used to grant access
- Specifying the **timestamp format**, since some variations are permitted according to the [dataset guidelines](https://docs.aws.amazon.com/forecast/latest/dg/dataset-import-guidelines-troubleshooting.html)

In [None]:
rts_import_job_response = forecast.create_dataset_import_job(
    # You might append a timestamp to the import name in practice, to keep it unique... But here we choose a
    # *static* value deliberately, to avoid accidentally & unnecessarily re-importing the PoC data!
    DatasetImportJobName="poc_import_rts",
    DatasetArn=rts_arn,
    DataSource={
        "S3Config": {
            "Path": related_s3uri,
            "RoleArn": forecast_role_arn,
        },
    },
    # (e.g. daily data might omit the hh:mm:ss component)
    TimestampFormat="yyyy-MM-dd hh:mm:ss",
)

rts_import_job_arn = rts_import_job_response["DatasetImportJobArn"]
print(rts_import_job_arn)

> ⏰ The import process can **take a little time** (on the order of ~10-15 minutes for our sample dataset) because of validation, filling & aggregation, and the overhead of spinning up infrastructure to execute the import

On small datasets like this, overheads can dominate the run-time and you should expect much better-than-linear scaling as dataset size is increased from this level.

As before with TTS, we'll set up a poll to check the status of the import and wait to complete:

In [None]:
def is_import_status_finished(desc):
    status = desc["Status"]
    if status == "ACTIVE":
        return True
    elif status == "CREATE_FAILED":
        raise ValueError(f"Data import failed!\n{desc}")

util.progress.polling_spinner(
    fn_poll_result=lambda: forecast.describe_dataset_import_job(DatasetImportJobArn=rts_import_job_arn),
    fn_is_finished=is_import_status_finished,
    fn_stringify_result=lambda d: d["Status"],
    poll_secs=30,  # Poll every 30s
    timeout_secs=60*60,  # Max 1 hour
)
print("Data imported")

As an added check, we may use the `DescribeDatasetImportJob` API to verify the results of the import:

In [None]:
forecast.describe_dataset_import_job(DatasetImportJobArn=rts_import_job_arn)

## Creating and Training Predictors

As before, we'll fix most of the parameters for our forecast and create models to compare the performance of different algorithms: This time just testing Prophet and DeepAR+ (and adding CNN-QR if you like), since ARIMA is not capable of utilizing extra RTS information.

We'll first re-define the RTS-aware Algorithm ARNs for reference:

In [None]:
prophet_algorithm_arn = "arn:aws:forecast:::algorithm/Prophet"
deeparp_algorithm_arn = "arn:aws:forecast:::algorithm/Deep_AR_Plus"
cnnqr_algorithm_arn = "arn:aws:forecast:::algorithm/CNN-QR"

...and declare other static forecast configurations the same as we did for notebook 2:

In [None]:
forecast_frequency = "H"
forecast_horizon = 240

evaluation_parameters = {
    "NumberOfBacktestWindows": 1,
    "BackTestWindowOffset": 240,
}

input_data_config = {
    "DatasetGroupArn": dataset_group_arn,
    "SupplementaryFeatures": [
        { "Name": "holiday", "Value": "US" },
    ],
}

featurization_config = {
    "ForecastFrequency": forecast_frequency,
    "Featurizations": [
        {
            "AttributeName": "target_value",
            "FeaturizationPipeline": [
                {
                    "FeaturizationMethodName": "filling",
                    "FeaturizationMethodParameters": {
                        "frontfill": "none",
                        "middlefill": "zero",
                        "backfill": "zero",
                    },
                },
            ],
        },
    ],
}

Our `results` dictionary should already have been declared in notebook 2b (but we'll create a new one if not), so we're now ready to create our new RTS-aware predictors:

In [None]:
try:
    if results:
        print("Found existing 'results' dictionary")
except NameError:
    results = {}
    print("No existing results found - created new dict")

### Prophet

In [None]:
prophet_create_predictor_response = forecast.create_predictor(
    PredictorName=f"{project}_prophet_rts",
    AlgorithmArn=prophet_algorithm_arn,
    ForecastHorizon=forecast_horizon,
    PerformAutoML=False,
    PerformHPO=False,
    EvaluationParameters=evaluation_parameters,
    InputDataConfig=input_data_config,
    FeaturizationConfig=featurization_config,
)
results["Prophet with RTS"] = SimpleNamespace(predictor_arn=prophet_create_predictor_response["PredictorArn"])

### DeepAR+

In [None]:
deeparp_create_predictor_response = forecast.create_predictor(
    PredictorName=f"{project}_deeparp_rts",
    AlgorithmArn=deeparp_algorithm_arn,
    ForecastHorizon=forecast_horizon,
    PerformAutoML=False,
    PerformHPO=False,
    EvaluationParameters=evaluation_parameters,
    InputDataConfig=input_data_config,
    FeaturizationConfig=featurization_config,
)
results["DeepAR+ with RTS"] = SimpleNamespace(predictor_arn=deeparp_create_predictor_response["PredictorArn"])

### CNN-QR

In [None]:
# cnnqr_create_predictor_response = forecast.create_predictor(
#     PredictorName=f"{project}_cnnqr_rts",
#     AlgorithmArn=cnnqr_algorithm_arn,
#     ForecastHorizon=forecast_horizon,
#     PerformAutoML=False,
#     PerformHPO=False,
#     EvaluationParameters=evaluation_parameters,
#     InputDataConfig=input_data_config,
#     FeaturizationConfig=featurization_config,
# )
# results["CNN-QR with RTS"] = SimpleNamespace(predictor_arn=cnnqr_create_predictor_response["PredictorArn"])

Again we'll need to **wait** for our predictors to finish training, to compare results. The below will poll to check:

In [None]:
in_progress_predictors = [results[r].predictor_arn for r in results]
failed_predictors = []

def check_status():
    """Check and update in_progress_predictors"""
    just_stopped = []  # Can't edit the in_progress list directly the loop!
    for arn in in_progress_predictors:
        predictor_desc = forecast.describe_predictor(PredictorArn=arn)
        status = predictor_desc["Status"]
        if status == "ACTIVE":
            print(f"\nBuild succeeded for {arn}")
            just_stopped.append(arn)
        elif "FAILED" in status:
            print(f"\nBuild failed for {arn}")
            just_stopped.append(arn)
            failed_predictors.append(arn)
    for arn in just_stopped:
        in_progress_predictors.remove(arn)
    return in_progress_predictors

util.progress.polling_spinner(
    fn_poll_result=check_status,
    fn_is_finished=lambda l: len(l) == 0,
    fn_stringify_result=lambda l: f"{len(l)} predictor builds in progress",
    poll_secs=60,  # Poll every minute
    timeout_secs=3*60*60,  # Max 3 hours
)

if len(failed_predictors):
    raise RuntimeError(f"The following predictors failed to train:\n{failed_predictors}")

> ⏰ Predictor training can **take some time**: Simpler algorithms like ARIMA or ETS will typically train faster (may be ready in ~20mins on this example dataset), whereas more complex algorithms like DeepAR+ will usually take longer (may be approx 1hr on this example dataset)

## Examining the Predictors

Once each of the Predictors is in an `Active` state, we can get metrics about it to better understand its accuracy and behavior. These are computed based on the hold out periods we defined when building the Predictor. The metrics are meant to guide our decisions when we use a particular Predictor to generate a forecast.

As before, we'll define a utility function below to retriee the raw accuracy metrics response, and also build up our leaderboard. In the following cells, we'll run the function against each trained predictor.

In [None]:
def evaluate_trial_metrics(trial_name=None) -> pd.DataFrame:
    """Utility to fetch the accuracy metrics for a predictor and output the leaderboard so far"""
    if (trial_name):
        # Print the raw API response:
        metrics_response = forecast.get_accuracy_metrics(PredictorArn=results[trial_name].predictor_arn)
        print(f"Raw metrics for {trial_name}:")
        prettyprint(metrics_response)

        # Save the payload section to results:
        evaluation_results = metrics_response["PredictorEvaluationResults"]
        results[trial_name].evaluation_results = evaluation_results

        # Construct simplified version for our comparison:
        try:
            summary_metrics = next(
                w for w in evaluation_results[0]["TestWindows"] if w["EvaluationType"] == "SUMMARY"
            )["Metrics"]
        except StopIteration:
            raise ValueError("Couldn't find SUMMARY metrics in Forecast API response")
        results[trial_name].summary_metrics = {
            "RMSE": summary_metrics["RMSE"],
            "10% wQL": next(
                l["LossValue"] for l in summary_metrics["WeightedQuantileLosses"] if l["Quantile"] == 0.1
            ),
            "50% wQL (MAPE)": next(
                l["LossValue"] for l in summary_metrics["WeightedQuantileLosses"] if l["Quantile"] == 0.5
            ),
            "90% wQL": next(
                l["LossValue"] for l in summary_metrics["WeightedQuantileLosses"] if l["Quantile"] == 0.9
            ),
        }
    # Render the leaderboard:
    return pd.DataFrame([
        { "Predictor": name, **results[name].summary_metrics } for name in results
        if "summary_metrics" in results[name].__dict__
    ]).set_index("Predictor")

### Prophet

Let's compare the metrics for Prophet with RTS added to the previous predictors:

In [None]:
evaluate_trial_metrics("Prophet with RTS")

In our test (your results may vary) the MAPE/wQL0.5; RMSE; and wQL0.9 scores were all slightly improved for the Prophet predictor by incorporating the RTS data. However, its performance still appeared generally worse than the DeepAR+ algorithm fitted on TTS data alone.

### DeepAR+

Let's add the metrics from our new RTS-aware DeepAR+ model into the view:

In [None]:
evaluate_trial_metrics("DeepAR+ with RTS")

In our test, all quantile losses improved slightly for the DeepAR+ model when RTS data was added - although the RMSE did increase a little.

In general, we could conclude that the models were improved by the addition of weather data but there could still be further work to do in finding additional significant factors that contribute to traffic volume, or possibly consolidating/combining the weather features to extract clearer signals for forecasting. 

### CNN-QR

In [None]:
# evaluate_trial_metrics("CNN-QR with RTS")

## All Done!

In this notebook, we updated our dataset group with a *Related Time-Series* dataset of additional (weather) data to try and improve the forecast from the initial baseline using the traffic volume history alone.

You can refer to the previous notebooks 2a and 2b for guidance on visualizing the forecasts in the console and exporting + downloading them to compare against actual validation data.

Identifying important related data such as stock availability, holiday & promotion calendars, pricing and similar can have dramatic impacts on real-world forecasting use cases; but it's just as important to understand any gaps, errors, or aggregations in your datasets to check that your models are interpreting your data as you expect.

Check out the [Cleanup notebook](Cleanup.ipynb) for guidance on cleaning up your Amazon Forecast environment and also your Amazon S3 and AWS IAM setup from these experiments.

If you've prepared your own data for import to Amazon Forecast, you might also be interested in the [Data Diagnostic notebook](Data%20Diagnostic.ipynb) which can help run some basic checks and graphs on your data.