# Bike-Share Demand Forecasting 2b: SageMaker DeepAR Algorithm

We'll look at 3 ways to tackle the bike-share demand forecasting problem set up previously in the data preparation notebook:

1. Applying an AWS "Managed AI" service ([Amazon Forecast](https://aws.amazon.com/forecast/)), to tackle the scenario as a common/commodity business problem
2. Using a SageMaker built-in algorithm ([DeepAR](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html)), to approach it as a common/commodity algorithm in our own data science workbench
3. Using a custom SageMaker algorithm, to take on the core modelling as a value-added differentiator working in our data science workbench.

These approaches represent different cost/control trade-offs that we might make as a business.

**This notebook shows how to apply the SageMaker DeepAR built-in algorithm.**

## Dependencies and configuration

As usual we start by loading libraries, defining configuration, and connecting to AWS SDKs

In [None]:
# Basic data configuration is initialised and stored in the Data Preparation notebook
# ...We just retrieve it here:
%store -r
assert bucket, "Variable `bucket` missing from IPython store"

assert data_prefix, "Variable `data_prefix` missing from IPython store"
assert target_train_filename, "Variable `target_train_filename` missing from IPython store"
assert target_test_filename, "Variable `target_test_filename` missing from IPython store"
assert related_filename, "Variable `related_filename` missing from IPython store"

sm_train_filename = "train.json"
sm_test_filename = "test.json"
sm_inference_filename = "predict_input.json"

In [None]:
%load_ext autoreload
%autoreload 1

# Built-Ins:
from datetime import datetime, timedelta
import json

# External Dependencies:
import boto3
import matplotlib as mpl
import matplotlib.pyplot as plt
import pandas as pd
import sagemaker

# Local Dependencies:
%aimport util

Now we connect to our AWS SDKs, and initialise our access role (which may wait a little while to ensure any newly created permissions propagate):

In [None]:
session = boto3.Session()
region = session.region_name
smsession = sagemaker.Session()
s3 = session.client(service_name="s3")

In [None]:
sm_role_arn = sagemaker.get_execution_role()

## Overview

**TODO**

## Step 1: Determine your algorithm details

Choosing to use SageMaker DeepAR algorithm for this problem, we'll need to configure the algorithm and provide data in the format it's expecting.

In particular, some but not all built-in algorithms have support in the [SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable/index.html)...

```python
# So while we can do this...
estimator = sagemaker.KMeans(...)
# We can't yet do this...
estimator = sagemaker.DeepAR(...)
```

This means we'll need to provide the URL for the container image, as listed on the [SageMaker built-in algorithms common parameters doc](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-algo-docker-registry-paths.html).

Luckily for us, there's a nice programmatic way (below) to fetch the image path instead of copy/paste: But it's still worth checking out the Common Parameters page for details on whether the algorithm supports GPU acceleration, distributed training, various input/output formats, etc.

It's also worth checking out the [DeepAR algorithm documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html) for more details.

In [None]:
training_image = sagemaker.amazon.amazon_estimator.get_image_uri(
    region,
    "forecasting-deepar",
    repo_version="latest"
)
print(training_image)

## Step 2: Prepare training data

As mentioned in the common docs, and further detailed in the [DeepAR algorithm docs](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html), the DeepAR algorithm expects training data in the following format, delivered either in JSONLines or Parquet format:

```json
{"start": "2009-11-01 00:00:00", "target": [target timeseries...], "cat": [categorical features...], "dynamic_feat": [[related TS 1...], [related TS 2...]}
{"start": "2009-11-02 00:00:00", "target": [target timeseries...], "cat": [categorical features...], "dynamic_feat": [[related TS 1...], [related TS 2...]}                                                                               ...
```

We'll start out by loading our generic data files from the Data Prep notebook:

In [None]:
target_train_df = pd.read_csv(f"./data/{target_train_filename}")
target_test_df = pd.read_csv(f"./data/{target_test_filename}")
related_df = pd.read_csv(f"./data/{related_filename}")

related_df.head()

First up, only numeric `dynamic_feat`s are allowed in DeepAR so we'll need to convert our binary related fields:

In [None]:
related_df["holiday"] = related_df["holiday"].astype(int)
related_df["workingday"] = related_df["workingday"].astype(int)

related_df.head()

Next, we previously split **target** timeseries into separate training and test data sets, and kept **related** datasets as one big list: Which was great for Amazon Forecast.

SageMaker DeepAR wants something a little different though, as documented in the [best practices](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html#deepar_best_practices):

* For the `train` data channel in training, training period data only (*target* + *related*)
* For the `test` data channel in training, whole data set (*target* + *related*)
* For the **inference** data set, training period *target* series + whole period *related* series

In [None]:
first_test_ts = target_test_df["timestamp"][0]
related_train_df = related_df[related_df["timestamp"] < first_test_ts]
related_test_df = related_df[related_df["timestamp"] >= first_test_ts]

Now we need to convert our training and test sets into target format:

In [None]:
# We'll loop through our customer types creating a record for each:
customer_types = target_train_df["customer_type"].unique()

# Related timeseries are general, not per-custtype, so we can format out here:
dynamic_feats_train = related_train_df.drop(columns="timestamp")
dynamic_feats_train = [dynamic_feats_train[col].to_list() for col in dynamic_feats_train.columns]
dynamic_feats_test = related_test_df.drop(columns="timestamp")
dynamic_feats_test = [dynamic_feats_test[col].to_list() for col in dynamic_feats_test.columns]

# Training data set (training timestamps only):
train_lines = []
# Test data set (training + test timestamps):
test_lines = []
# Inference data set (training target + full related series):
inference_lines = []

for customer_type in customer_types:
    ctmr_target_train_df = target_train_df[target_train_df["customer_type"] == customer_type]
    target_train = ctmr_target_train_df["demand"].to_list()
    target_test = target_test_df[target_test_df["customer_type"] == customer_type]["demand"].to_list()
    
    train_lines.append({
        "start": ctmr_target_train_df["timestamp"].iloc[0],
        "target": target_train,
        "dynamic_feat": dynamic_feats_train
    })
    test_lines.append({
        "start": ctmr_target_train_df["timestamp"].iloc[0],
        "target": target_train + target_test,
        "dynamic_feat": [
            dynamic_feats_train[ixf] + dynamic_feats_test[ixf] for ixf in range(len(dynamic_feats_train))
        ]
    })
    inference_lines.append({
        "start": ctmr_target_train_df["timestamp"].iloc[0],
        "target": target_train,
        "dynamic_feat": [
            dynamic_feats_train[ixf] + dynamic_feats_test[ixf] for ixf in range(len(dynamic_feats_train))
        ]
    })

*JSON Lines* format is JSON, but with newline-separated records instead of a parent `[... , ...]` array: So a whole JSON Lines file is *not* valid JSON, but each line of the file *is*.

Write our training and test files in JSON Lines, and upload to S3:

In [None]:
print("Writing data sets to file...")
!mkdir -p ./data/smdeepar

with open(f"./data/smdeepar/{sm_train_filename}", "w") as f:
    for ix in range(len(train_lines)):
        if (ix > 0):
            f.write("\n")
        f.write(json.dumps(train_lines[ix]))

with open(f"./data/smdeepar/{sm_test_filename}", "w") as f:
    for ix in range(len(test_lines)):
        if (ix > 0):
            f.write("\n")
        f.write(json.dumps(test_lines[ix]))

with open(f"./data/smdeepar/{sm_inference_filename}", "w") as f:
    for ix in range(len(inference_lines)):
        if (ix > 0):
            f.write("\n")
        f.write(json.dumps(inference_lines[ix]))

print("Uploading dataframes to S3...")
s3.upload_file(
    Filename=f"./data/smdeepar/{sm_train_filename}",
    Bucket=bucket,
    Key=f"{data_prefix}smdeepar/{sm_train_filename}"
)
print(f"s3://{bucket}/{data_prefix}smdeepar/{sm_train_filename}")
s3.upload_file(
    Filename=f"./data/smdeepar/{sm_test_filename}",
    Bucket=bucket,
    Key=f"{data_prefix}smdeepar/{sm_test_filename}"
)
print(f"s3://{bucket}/{data_prefix}smdeepar/{sm_test_filename}")
s3.upload_file(
    Filename=f"./data/smdeepar/{sm_inference_filename}",
    Bucket=bucket,
    Key=f"{data_prefix}smdeepar/{sm_inference_filename}"
)
print(f"s3://{bucket}/{data_prefix}smdeepar/{sm_inference_filename}")

## Step 3: Set up the SageMaker estimator and train the model

Now the useful part - we'll be using the [Python SageMaker SDK](https://sagemaker.readthedocs.io/en/stable/index.html) to:

1. Create our [Estimator](https://sagemaker.readthedocs.io/en/stable/estimators.html) defining the algorithm and fitting/hyper-parameters
2. Define our [data channels](https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-training-algo-running-container.html#your-algorithms-training-algo-running-container-inputdataconfig) to fit and validate on
3. [Fit](https://sagemaker.readthedocs.io/en/stable/estimators.html#sagemaker.estimator.Estimator.fit) a model to the data

In [None]:
estimator = sagemaker.estimator.Estimator(
    # As discussed above, we need to provide the Docker image location because there's no specific
    # `sagemaker.estimator.DeepAR` implemented (yet!):
    image_name=training_image,
    role=sm_role_arn,
    # Per the docs DeepAR *can* use distributed training and GPU, but probably doesn't really need
    # either for this configuration:
    train_instance_count=1,
    train_instance_type="ml.p3.2xlarge",
    output_path=f"s3://{bucket}/output/smdeepar/",
    base_job_name="bike-demo-deepar",
    hyperparameters={
        "context_length": 24*14, # 2 weeks, same as our target forecast window
        "epochs": 100,
        "prediction_length": 24*14,
        "time_freq": "1H",
        "early_stopping_patience": 20,
        "num_eval_samples": 24*14,
        #"mini_batch_size": 128
    },
    train_max_run=3*60*60,
    # SageMaker managed spot training (which can reduce training cost by up to 90%!) is as easy as
    # requesting it when setting up the estimator:
    #train_use_spot_instances=True,
    #train_max_wait=5*60*60
    # (We only avoid it here to make sure this reliably runs on-demand for large group workshops!)
)

# Training channels can be specified simply as an S3 path string, or using the s3_input API like 
# this for more control over distribution and format parameters:
train_channel = sagemaker.session.s3_input(
    f"s3://{bucket}/{data_prefix}smdeepar/{sm_train_filename}",
    content_type="json", # (The correct MIME type for JSON lines is still in community debate...)
    s3_data_type="S3Prefix"
)
test_channel = sagemaker.session.s3_input(
    f"s3://{bucket}/{data_prefix}smdeepar/{sm_test_filename}",
    content_type="json", 
    s3_data_type="S3Prefix"
)

In [None]:
# This will block until training is complete, showing console output below:
estimator.fit({ "train": train_channel, "test": test_channel })

## Step 4: While the model trains...

DeepAR is a deep neural model, so can take a little while to train. Why not use this time to check back on how our Amazon Forecast models are doing?

## Step 5: Kick off a Hyperparameter Optimization to improve performance

The hyperparameters above are a bit of a guess, guided by the [DeepAR tuning documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar-tuning.html) and the characteristics of our data set.

Tuning these parameters is a pain because every permutation test takes so long! Naive strategies like grid search and manual tuning get expensive fast - whether by computational costs or labour.

Instead we let SageMaker's [Automatic Model Tuning](https://docs.aws.amazon.com/sagemaker/latest/dg/automatic-model-tuning.html) take care of it: with a Bayesian optimization strategy specifically designed for these high-evaluation-cost optimization problems.

Note that the [HyperparameterTuner](https://sagemaker.readthedocs.io/en/stable/tuner.html)'s `fit()` method **doesn't** block by default like `Estimator`'s (because HPO jobs usually take really long times).

The "Hyperparameter Tuning Jobs" section of the SageMaker console provides a nice UI for checking the detailed status and metrics of ongoing jobs. **You don't need to wait for this job to finish to move on to the next section.**

In [None]:
tuner = sagemaker.tuner.HyperparameterTuner(
    estimator,
    #Alternative e.g. objective_metric_name="test:RMSE",
    objective_metric_name="test:mean_wQuantileLoss",
    hyperparameter_ranges={
        "mini_batch_size": sagemaker.tuner.IntegerParameter(1, 500),
        "context_length": sagemaker.tuner.IntegerParameter(10, round(24*14*1.1)),
        "num_cells": sagemaker.tuner.IntegerParameter(30, 200),
        "num_layers": sagemaker.tuner.IntegerParameter(1, 8),
    },
    objective_type="Minimize",
    # Defining the maximum number and parallelism of HPO training jobs:
    # Note that accounts have protective limits on number of GPU instances by default.
    # For Event Engine accounts, default max ml.p3.2xlarge = 2
    # Set max_parallel_jobs = (limit / train_instance_count) - 1
    # (minus one lets you run HPO and non-HPO in parallel)
    max_jobs=# TODO: Ideally 12 or more
    max_parallel_jobs=# TODO: Maybe only 1 for Event Engine, 2-3 if possible
    base_tuning_job_name="bike-demo-deepar-tuning"
)

tuner.fit({ "train": train_channel, "test": test_channel })

# Uncomment if you like locking up your notebook for hours:
# tuner.wait()

## Step 6: Generating predictions with SageMaker Batch Transform

There are two primary ways to use trained SageMaker models: [`deploy()`](https://sagemaker.readthedocs.io/en/stable/estimators.html#sagemaker.estimator.Estimator.deploy) them as endpoints for real-time inference, or apply them to stored data sets as a batch transform.

Since we already have our test scenarios stored in S3, we'll take the batch route for this example... But you might choose to set up an endpoint if, for example, providing an interactive service where users could run "what-if" forecasts with different weather data.

In this batch transform job, SageMaker will:

* Deploy our model on temporary instance(s) - similarly to how it would be deployed for a real-time endpoint
* Read our input data from S3
* Submit the input data to the model instance(s)
* Collect the result data into S3
* Clean up the temporary instances

So we *don't* have to worry about deploying and deleting instances, but we *do* need to help SageMaker understand how to split input data out to model instances and how to collect the results together:

In [None]:
transformer = estimator.transformer(
    instance_count=1, # We only have 2 "records" (customer types) - multi-instance would be overkill!
    instance_type="ml.c4.2xlarge", # Per the docs, DeepAR only uses CPU at inference time
    strategy="SingleRecord", # Send records to the model one at a time
    assemble_with="Line", # Join results with a newline in the output file (JSONLines)
    output_path=f"s3://{bucket}/results/smdeepar/",
    env={
        # We only want the p10, p50, p90 configs to compare with Amazon Forecast, so will override default output:
        "DEEPAR_INFERENCE_CONFIG": json.dumps({
            "num_samples": 100,
            "output_types": ["mean", "quantiles"],
            "quantiles": ["0.1", "0.5", "0.9"]
        })
    }
)

transformer.transform(
    f"s3://{bucket}/{data_prefix}smdeepar/{sm_inference_filename}",
    split_type="Line", # Records are separated by a newline in the input file (JSONLines)
    logs=True,
    wait=True
)

## Step 7: Download and reformat the results

Our batch transform results are stored in S3, so first let's download them to this notebook:

In [None]:
batch_results = transformer.output_path

!mkdir -p results/smdeepar
!aws s3 cp --recursive $batch_results results/smdeepar/
print("SMDeepAR results folder contents:")
!ls results/smdeepar/

...then load the JSON lines file in to memory:

In [None]:
results_local_filename = f"results/smdeepar/{sm_inference_filename}.out"
results_raw = []
with open(results_local_filename) as f:
    for line in f:
        results_raw.append(json.loads(line))

# Note the order of records (hence the correspondence to customer_types) is preserved in SM batch:
assert (
    len(results_raw) == len(customer_types)
), "Mismatch: Batch transform should return one prediction per customer type!"

...and convert those predictions into the same standardized form as we did with Amazon Forecast:

In [None]:
model_id = "sagemaker-deepar"

first_test_ts = target_test_df["timestamp"].iloc[0]
test_start_dt = datetime(
    int(first_test_ts[0:4]),
    int(first_test_ts[5:7]),
    int(first_test_ts[8:10]),
    int(first_test_ts[11:13]),
    int(first_test_ts[14:16]),
    int(first_test_ts[17:])
)
test_end_dt = test_start_dt + timedelta(days=14)

# Timestamps aren't listed in the DeepAR output, so we synthesize them from the test data:
test_df = target_test_df.copy()
test_df["timestamp"] = pd.to_datetime(test_df["timestamp"])
test_df = test_df[test_df["timestamp"] < test_end_dt]
ctype_test_dfs = {
    ctype: test_df[test_df["customer_type"] == ctype]
for ctype in customer_types}

clean_results_df = pd.DataFrame()
for ix in range(len(customer_types)):
    prediction = results_raw[ix]

    df = pd.DataFrame()
    df["timestamp"] = ctype_test_dfs[customer_types[ix]]["timestamp"]
    df["model"] = model_id
    df["customer_type"] = customer_types[ix]
    df["mean"] = prediction["mean"]
    df["p10"] = prediction["quantiles"]["0.1"]
    df["p50"] = prediction["quantiles"]["0.5"]
    df["p90"] = prediction["quantiles"]["0.9"]

    clean_results_df = clean_results_df.append(df)

clean_results_df.to_csv(
    f"./results/smdeepar/results_clean.csv",
    index=False
)
print("Clean results saved to ./results/smdeepar/results_clean.csv")
clean_results_df.head()

## Step 8: Plot the performance

Now that the results are in our standardized format, we can plot them using the same utility function as in the Amazon Forecast notebook

In [None]:
first_plot_dt = test_end_dt - timedelta(days=21)
actuals_plot_df = target_train_df.append(target_test_df)
actuals_plot_df["timestamp"] = pd.to_datetime(actuals_plot_df["timestamp"])
actuals_plot_df = actuals_plot_df[
    (actuals_plot_df["timestamp"] >= first_plot_dt)
    & (actuals_plot_df["timestamp"] < test_end_dt)
]
util.plot_fcst_results(actuals_plot_df, clean_results_df)

These plots should be directly comparable to the figures in the Amazon Forecast notebook.

**How does "SageMaker DeepAR" compare to Amazon Forecast's "DeepAR Plus"?**

With the settings we've used here, the two are likely broadly comparable. We haven't extensively optimized DeepAR's hyperparameters; neither have we utilised much of the structure of the Amazon Forecast **retail domain schema**: So there should be room for improvement on both results!

## Step 9: Using our hyperparameter tuning job

Check out the status of our hyperparameter tuning job in the "Hyperparameter Tuning Jobs" section of the SageMaker console: Hopefully yours is "Completed" like the screenshot below, though it might take up to a few hours!

Note that it's possible to "create models" (register the model artifacts with SageMaker) and from those deploy real-time endpoints or start batch transform jobs, direct from within the console.

The UI parameters correspond pretty directly to the code used earlier in this notebook, but the SDK's `fit()` and `deploy()` methods short-cut/simplify the Model and Endpoint Configuration parts of the flow.

**Exercise: Using the already existing `Inference > Models` and `Inference > Batch transform jobs` as a guide, and referring to our code in steps 3 and 6, can you use the console to run your best HPO-tuned model against the same inference data set?**

Note:

* You want to create a batch transform job, *not* deploy an endpoint
* Choose a different output path for the new transform job to avoid overwriting our previous model's results, but put it **in a subfolder** of the previous path, for Step 9 below to work

<img src="BlogImages/HPOComplete.png"/>

## Step 10: Comparing HPO-tuned and best-guess performance

If you managed the last step successfully and your training job has completed, you should have a new `.out` file somewhere in your bucket.

First, let's keep copies of our previous results:

In [None]:
untuned_results_filename = results_local_filename
untuned_clean_results_df = clean_results_df
!mv results/smdeepar/results_clean.csv results/smdeepar/results_clean_untuned.csv

Now **re-run Step 7 making these edits:**

* Update the definition of `results_local_filename` to point at your new, HPO-tuned result file.
* Update the `model_id` to `sagemaker-deepar-hpo` (adding `-hpo` on the end)

In [None]:
raise ValueError("I will delete this exception when I've done the above!")

Finally we'll combine the untuned and HPO results files together, and plot the comparison as we did with Amazon Forecast:

In [None]:
!mv results/smdeepar/results_clean.csv results/smdeepar/results_clean_hpo.csv

comparison_results_df = untuned_clean_results_df.append(clean_results_df)

comparison_results_df.to_csv(
    f"./results/smdeepar/results_clean.csv",
    index=False
)
print("Full results saved to ./results/smdeepar/results_clean.csv")
comparison_results_df.head()

In [None]:
util.plot_fcst_results(actuals_plot_df, comparison_results_df)

How do the updated graphs compare? Has the model qualitatively improved?

What about the metrics reported in the console on your hyperparameter tuning job, and the normal training job? Do they suggest any quantitative changes?

## Extension exercises and exploring further

As with Amazon Forecast, both of our models' (untuned and HPO) results should now be stored in a standard format.

Can you calculate RMSE and weighted quartile loss metrics to characterise the test data-set performance?

What are the relative strengths and weaknesses of the Amazon Forecast and SageMaker DeepAR approaches?

Does SageMaker DeepAR have the same sensitivity as Amazon Forecast to shifting timestamps by one calendar day? Or removing the `holiday` and `workingday` features?

## Thanks for joining in! (Clean-up time)

As with Amazon Forecast, we didn't deploy any real-time predictor endpoints in this workshop but did still create some artifacts whose continued storage is (while relatively inexpensive) chargeable. Don't forget to check through all the sidebar tabs of the Amazon SageMaker console, and your S3 bucket, and consider cleaning up anything you don't want to keep!

As always, remember also to stop this SageMaker notebook when no longer using it.

We hope you've enjoyed this section and any others you're still working on. If you have any feedback for this workshop, please do get in touch via the GitHub or workshop facilitators!