# AWS SageMaker DeepAR on store sales forecasting

For more information see the DeepAR [documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html) or [paper](https://arxiv.org/abs/1704.04110), 

In [1]:
from __future__ import print_function

%matplotlib inline

import sys
import zipfile
from dateutil.parser import parse
import json
from random import shuffle
import random
import datetime
import os

import boto3
import s3fs
import sagemaker
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import timedelta

from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets
from ipywidgets import IntSlider, FloatSlider, Checkbox

In [2]:
# set random seeds for reproducibility
np.random.seed(42)
random.seed(42)

In [3]:
# sagemaker_session = sagemaker.Session()
sagemaker_session = sagemaker.Session(boto3.Session(region_name='us-east-1'))

In [4]:
sagemaker_session

<sagemaker.session.Session at 0x7f145b668c10>

Before starting, we can override the default values for the following:
- The S3 bucket and prefix to use for training and model data. This should be within the same region as the Notebook Instance, training, and hosting.
- The IAM role arn used to give training and hosting access to your data. 

In [5]:
s3_bucket = "spring23-fts"                    # Set a default S3 bucket
# s3_bucket = sagemaker.Session.default_bucket()  # replace with an existing bucket if needed
s3_prefix = "deepar-exp"  # prefix used for all data stored within the bucket

role = sagemaker.get_execution_role()  # IAM role to use by SageMaker

In [6]:
region = sagemaker_session.boto_region_name

s3_data_path = "s3://{}/{}/data".format(s3_bucket, s3_prefix)
s3_output_path = "s3://{}/{}/output".format(s3_bucket, s3_prefix)
print(s3_data_path)
print(s3_output_path)
print(region)

s3://spring23-fts/deepar-exp/data
s3://spring23-fts/deepar-exp/output
us-east-1


Next, we configure the container image to be used for the region that we are running in.

In [7]:
image_name = sagemaker.image_uris.retrieve("forecasting-deepar", region)

Load and parse the dataset and convert it to a collection of Pandas time series, which makes common time series operations such as indexing by time periods or resampling much easier. Here we want to forecast longer periods (~two weeks).

In [8]:
# write dictionary in json file
def write_dicts_to_file(path, data):
  with open(path, "wb") as fp:
      for d in data:
          fp.write(json.dumps(d).encode("utf-8"))
          fp.write("\n".encode("utf-8"))

In [9]:
columns_to_skip = ['id','evt_type', 'evt_locale', 'evt_desc', 'evt_desc_gran', 'evt_selected', 'imp_event', 'imp_evt_enc']
train_path = 's3://{}/{}'.format(s3_bucket, "train_cleaned_crs.csv")
test_path = 's3://{}/{}'.format(s3_bucket, "test_cleaned_crs.csv")

def prepare_data(train_path, test_path, columns_to_skip, out_path, out_sf_path,
                 process_test = False, use_rows = None, min_day = -1):
  if process_test:
    df = pd.concat([
        pd.read_csv(
            train_path,
            usecols=lambda x: x not in columns_to_skip,
            dtype={
                "store_nbr": "category", "family": "category",
                "store_type": "category", "cluster": "category",
                "city": "category", "state": "category"
            },
            nrows = use_rows,
            index_col=['date'], parse_dates=['date']
        ),
        pd.read_csv(
            test_path,
            usecols=lambda x: x not in columns_to_skip,
            dtype={
                "store_nbr": "category", "family": "category",
                "store_type": "category", "cluster": "category",
                "city": "category", "state": "category"
            },
            nrows = use_rows,
            index_col=['date'], parse_dates=['date']
        )
    ])
  else:
    df = pd.read_csv(        
        train_path,
        usecols=lambda x: x not in columns_to_skip,
        dtype={
            "store_nbr": "category", "family": "category",
            "store_type": "category", "cluster": "category",
            "city": "category", "state": "category"
        },
        nrows = use_rows,
        index_col=['date'], parse_dates=['date']
      )

  # store - family combo in training
  st_fm = (
      df
      .value_counts(['store_nbr', 'family'], ascending=True)
      .reset_index()
      .rename(columns={0: "days"})
  )
  if process_test:
    st_fm[st_fm.days > min_day].to_csv(out_sf_path)

  # role of variables
  target_col = "sales"
  cat_cols = ["store_nbr", "family", "store_type", "cluster", "city", "state"]
  feat_cols = (
      ["first_jan", "liquor_sunday", "onpromotion_log", "crs_str_prm_rat", "dcoilwtico"]
      + [c for c in df.columns if ("onpromotion_log_" in c) or ("evt_sel_hot_" in c)]
  )
  # Store the data in json format as required by AWS DeepAR
  out_data = [
      {
          "start": str(min(df[(df.store_nbr==s)&(df.family==f)].index)),
          "target": np.log1p(df[(df.store_nbr==s)&(df.family==f)][target_col].dropna()).tolist(),
          "dynamic_feat": [df[(df.store_nbr==s)&(df.family==f)][x].tolist() for x in feat_cols],
          "cat": [int(df[(df.store_nbr==s)&(df.family==f)][x].cat.codes[0]) for x in cat_cols],
      }
      for (s,f) in zip(
          st_fm[st_fm.days > min_day].store_nbr[:],
          st_fm[st_fm.days > min_day].family[:]
      )
  ]
  print(f"Number of time series in processed data {len(out_data)}")
  # write to disk
  write_dicts_to_file(out_path, out_data)


In [10]:
%%time
# prepare_data(train_path, test_path, columns_to_skip, 
#              "train.json", "st_fm.csv",
#              process_test = False, use_rows = 100000, min_day = -1)

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 6.44 µs


In [11]:
%%time
# prepare_data(train_path, test_path, columns_to_skip,
#              "test.json", "st_fm.csv",
#              process_test = True, use_rows = None, min_day = -1)


CPU times: user 3 µs, sys: 1 µs, total: 4 µs
Wall time: 6.44 µs


In [12]:
s3 = boto3.resource("s3")


def copy_to_s3(local_file, s3_path, override=False):
    assert s3_path.startswith("s3://")
    split = s3_path.split("/")
    bucket = split[2]
    path = "/".join(split[3:])
    buk = s3.Bucket(bucket)

    if len(list(buk.objects.filter(Prefix=path))) > 0:
        if not override:
            print(
                "File s3://{}/{} already exists.\nSet override to upload anyway.\n".format(
                    s3_bucket, s3_path
                )
            )
            return
        else:
            print("Overwriting existing file")
    with open(local_file, "rb") as data:
        print("Uploading file to {}".format(s3_path))
        buk.put_object(Key=path, Body=data)

In [13]:
%%time
# copy_to_s3("train.json", s3_data_path + "/train/train.json", override=True)
# copy_to_s3("test.json", s3_data_path + "/test/test.json", override=True)

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 5.72 µs


Let's have a look to what we just wrote to S3.

In [14]:
s3_sample = s3.Object(s3_bucket, s3_prefix + "/data/train/train.json").get()["Body"].read()
StringVariable = s3_sample.decode("UTF-8", "ignore")
lines = StringVariable.split("\n")
print(lines[0][:100] + "...")

{"start": "2017-07-10 00:00:00", "target": [1.6094379124341003, 1.0986122886681096, 0.0, 1.098612288...


In [15]:
# we use 1 day frequency for the time series
freq = "D"

# we predict for 16 days
prediction_length = 16

# we can use 16 days as context length, this is the number of state updates accomplished before making predictions
context_length = 25 #16

### Train a model

Here we define the estimator that will launch the training job.

In [16]:
estimator = sagemaker.estimator.Estimator(
    image_uri=image_name,
    sagemaker_session=sagemaker_session,
    role=role,
    train_instance_count=1,
#     train_instance_type="ml.c4.2xlarge",
    train_instance_type="ml.m5.xlarge",
    use_spot_instances=True,
    max_wait=3600,
    max_run=3600,
    base_job_name="deepar-exp",
    output_path=s3_output_path,
)

train_instance_count has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.
train_instance_type has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


Next we need to set the hyperparameters for the training job. For example frequency of the time series used, number of data points the model will look at in the past, number of predicted data points. The other hyperparameters concern the model to train (number of layers, number of cells per layer, likelihood function) and the training options (number of epochs, batch size, learning rate...). We use default parameters for every optional parameter in this case (we can always use [Sagemaker Automated Model Tuning](https://aws.amazon.com/blogs/aws/sagemaker-automatic-model-tuning/) to tune them).

In [17]:
hyperparameters = {
    "time_freq": freq,
    "epochs": "197", #"100", opt: "197"
    "early_stopping_patience": "40",
    "mini_batch_size": "108", #64, opt: 108
    "learning_rate": "0.000669277", #5E-4, opt: 0.000669277
    "context_length": str(context_length), # opt: 25
    "prediction_length": str(prediction_length),
}

estimator.set_hyperparameters(**hyperparameters)


We are ready to launch the training job. SageMaker will start an EC2 instance, download the data from S3, start training the model and save the trained model.

If we would have provided the `test` data channel then DeepAR will also calculate accuracy metrics for the trained model on this test. This is done by predicting the last `prediction_length` points of each time-series in the test set and comparing this to the actual value of the time-series. 

**Note:** the next cell may take a some time to complete, depending on data size, model complexity, training options.


In [18]:
%%time
# data_channels = {"train": "{}/train/".format(s3_data_path), "test": "{}/test/".format(s3_data_path)}
data_channels = {"train": "{}/train/".format(s3_data_path)}
estimator.fit(inputs=data_channels, wait=True)

INFO:sagemaker:Creating training-job with name: deepar-exp-2023-04-24-22-20-55-488


2023-04-24 22:20:56 Starting - Starting the training job...
2023-04-24 22:21:11 Starting - Preparing the instances for training...
2023-04-24 22:21:58 Downloading - Downloading input data......
2023-04-24 22:22:38 Training - Downloading the training image...
2023-04-24 22:23:09 Training - Training image download completed. Training in progress.[34mDocker entrypoint called with argument(s): train[0m
[34mRunning default environment configuration script[0m
[34mRunning custom environment configuration script[0m
  from collections import Mapping, MutableMapping, Sequence[0m
[34m[04/24/2023 22:23:21 INFO 140561760122688] Reading default configuration from /opt/amazon/lib/python3.7/site-packages/algorithm/resources/default-input.json: {'_kvstore': 'auto', '_num_gpus': 'auto', '_num_kv_servers': 'auto', '_tuning_objective_metric': '', 'cardinality': 'auto', 'dropout_rate': '0.10', 'early_stopping_patience': '', 'embedding_dimension': '10', 'learning_rate': '0.001', 'likelihood': 'stude



Training seconds: 3641
Billable seconds: 2206
Managed Spot Training savings: 39.4%
CPU times: user 20.9 s, sys: 456 ms, total: 21.4 s
Wall time: 1h 2min 12s


**Batch Transform** using `estimator.transformer`, high level sdk

In [19]:
job_name = estimator.latest_training_job.name
print(job_name)
estimator.training_job_analytics.export_csv("training_summary.csv")

deepar-exp-2023-04-24-22-20-55-488




In [20]:
%%time

# ---- Use below if want to reuse prexisting training job -----
# estimator = sagemaker.estimator.Estimator.attach("deepar-exp-2023-04-24-01-12-58-190")

deepar_transformer = estimator.transformer(
    instance_count=1,
#     instance_type="ml.c5.9xlarge",
    instance_type="ml.m4.xlarge",
    output_path="s3://{}/{}/transform".format(s3_bucket, s3_prefix),
    strategy="SingleRecord",
    assemble_with="Line",
)
deepar_transformer.transform(
    data = s3_data_path + "/test/test.json", content_type="application/jsonlines", split_type="Line",
#     join_source = "Input"
)
deepar_transformer.wait()

INFO:sagemaker:Creating model with name: deepar-exp-2023-04-24-23-23-08-361
INFO:sagemaker:Creating transform job with name: deepar-exp-2023-04-24-23-23-08-959


....................................[34mDocker entrypoint called with argument(s): serve[0m
[34mRunning default environment configuration script[0m
[34mRunning custom environment configuration script[0m
[34mFailed to set debug level to 20, using INFO[0m
[34m[04/24/2023 23:29:16 INFO 139978782734144] Estimated memory required per model 15.984892845153809MB.[0m
[34m[04/24/2023 23:29:16 INFO 139978782734144] Estimated available memory 14941.698183059692MB.[0m
[34m[04/24/2023 23:29:16 INFO 139978782734144] Estimated maximum number of workers for the available memory is 934.[0m
[34m[04/24/2023 23:29:16 INFO 139978782734144] Using 4 workers[0m
[34m[04/24/2023 23:29:16 INFO 139978782734144] loading entry points[0m
[34m[04/24/2023 23:29:16 INFO 139978782734144] Prediction endpoint operating in batch mode[0m
[34m[04/24/2023 23:29:16 INFO 139978782734144] loaded request iterator application/jsonlines[0m
[34m[04/24/2023 23:29:16 INFO 139978782734144] loaded response encoder

**Let's look at the output of batch transform!**

In [21]:
s3_sample = s3.Object(s3_bucket, s3_prefix + "/transform/test.json.out").get()["Body"].read()
StringVariable = s3_sample.decode("UTF-8", "ignore")
lines = StringVariable.split("\n")
print(lines[0][:500] + "...")

{"mean":[2.8673315048,2.3319144249,2.2663228512,2.2955989838,2.2561569214,2.375431776,2.5436377525,2.7615175247,2.7619309425,3.0046942234,3.019310236,3.0666129589,3.2672317028,3.0390007496,3.3445601463,3.2858316898],"quantiles":{"0.1":[1.5224021673,1.3472399712,1.2359864712,1.1427799463,1.146871686,1.1890159845,1.363530159,1.5294101238,1.4539458752,1.7713561058,1.4912682772,1.5076121092,1.6815936565,1.6383969784,1.9736554623,1.8269717693],"0.2":[2.0961575508,1.7298599482,1.6315221786,1.508690357...


**Get the predictions**

In [22]:
# json.loads(StringVariable.split("\n")[0])

# Extract 10, 50 and 90th percentile forecast

qs = ['0.1', '0.5', '0.9']

predictions = (
    pd.melt(
        pd.DataFrame([json.loads(l)['quantiles'][qs[0]] for l in StringVariable.splitlines()]),
        var_name='period', value_name=qs[0], ignore_index=False
    )
    .reset_index()
    .merge(
        pd.melt(
            pd.DataFrame([json.loads(l)['quantiles'][qs[1]] for l in StringVariable.splitlines()]),
            var_name='period', value_name=qs[1], ignore_index=False
        )
        .reset_index(),
        on = ['index', 'period']
    )
    .merge(
        pd.melt(
            pd.DataFrame([json.loads(l)['quantiles'][qs[2]] for l in StringVariable.splitlines()]),
            var_name='period', value_name=qs[2], ignore_index=False
        )
        .reset_index(),
        on = ['index', 'period']
    )
    .rename(columns = {'index': 'st_fm_index'})
)

# Reverse log1p transformation on sales
predictions['0.1'] = np.exp(predictions['0.1']) - 1
predictions['0.5'] = np.exp(predictions['0.5']) - 1
predictions['0.9'] = np.exp(predictions['0.9']) - 1

predictions[predictions < 0] = 0

predictions

Unnamed: 0,st_fm_index,period,0.1,0.5,0.9
0,0,0,3.583222,17.660732,63.671560
1,1,0,3.922411,24.449983,108.082121
2,2,0,4.426658,22.656217,86.585022
3,3,0,1.955716,9.645138,32.705269
4,4,0,1.682634,14.355573,62.049729
...,...,...,...,...,...
28507,1777,15,110.548919,206.365049,564.710685
28508,1778,15,121.838461,161.098070,252.796287
28509,1779,15,60.474930,85.147618,134.012823
28510,1780,15,44.812384,71.261815,114.303210


In [23]:
# Get the store and family details based on the store-family index
# This is to identify the correct order of the predicted data
st_fm_index = (
    pd.read_csv("st_fm.csv", index_col=0)
    .reset_index()
    .rename(columns = {'index': 'st_fm_index'})
    .drop(columns=['days'])
)
st_fm_index

Unnamed: 0,st_fm_index,store_nbr,family
0,0,40,BOOKS
1,1,30,BOOKS
2,2,17,BOOKS
3,3,1,BABY CARE
4,4,43,BOOKS
...,...,...,...
1777,1777,25,"LIQUOR,WINE,BEER"
1778,1778,25,MEATS
1779,1779,25,PERSONAL CARE
1780,1780,25,FROZEN FOODS


In [24]:
# Merge the store-family index with prediction
final_predictions = st_fm_index.merge(predictions, on=['st_fm_index']).sort_values(by=['st_fm_index', 'period'])

final_predictions.to_csv("deep_pred.csv")

final_predictions

Unnamed: 0,st_fm_index,store_nbr,family,period,0.1,0.5,0.9
0,0,40,BOOKS,0,3.583222,17.660732,63.671560
1,0,40,BOOKS,1,2.846794,10.154344,21.873889
2,0,40,BOOKS,2,2.441772,7.531136,29.325595
3,0,40,BOOKS,3,2.135473,9.015605,23.382294
4,0,40,BOOKS,4,2.148329,8.647682,30.238159
...,...,...,...,...,...,...,...
28507,1781,25,PREPARED FOODS,11,29.792719,46.261498,75.335735
28508,1781,25,PREPARED FOODS,12,25.178310,33.686935,57.368467
28509,1781,25,PREPARED FOODS,13,16.702669,27.035960,43.530136
28510,1781,25,PREPARED FOODS,14,22.288656,33.257516,47.831939


# Hyperparameter Autotuner

In [74]:
from sagemaker.tuner import (
    IntegerParameter,
    CategoricalParameter,
    ContinuousParameter,
    HyperparameterTuner,
)

# epochs (1-1000, 100), context_length (1-200, 16), mini_batch_size (32-1028, 64)
# learning_rate (10^-5 to 10^-1, 5*10^-4), and num_cells (30-200, 40)

hyperparameter_ranges = {
    "epochs": IntegerParameter(100, 200),
    "context_length": IntegerParameter(16, 30),
    "mini_batch_size": IntegerParameter(64, 128),
    "learning_rate": ContinuousParameter(0.0005, 0.001),
}

objective_metric_name = "train:final_loss" # test:RMSE

tuner = HyperparameterTuner(
    estimator,
    objective_metric_name,
    hyperparameter_ranges,
    objective_type="Minimize",
    max_jobs=5,
    max_parallel_jobs=2,
    early_stopping_type ="Auto"
)

Now we can launch a hyperparameter tuning job by calling fit in tuner

In [None]:
tuner.fit(
    {"train": "{}/train/".format(s3_data_path)},
    include_cls_metadata=False,
)
tuner.wait()

INFO:sagemaker:Creating hyperparameter tuning job with name: forecasting-deepar-230423-2226


........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................

After the tuning finished, the top 5 performing hyperparameters can be listed below

In [None]:
print(tuner.latest_tuning_job.job_name)

In [25]:
boto3_sm.describe_hyper_parameter_tuning_job(
    HyperParameterTuningJobName="forecasting-deepar-230423-2226"#tuner.latest_tuning_job.job_name
)["HyperParameterTuningJobStatus"]


'Stopped'

In [27]:
# tuner_metrics = sagemaker.HyperparameterTuningJobAnalytics(tuner.latest_tuning_job.job_name)
# tuner_metrics.dataframe().sort_values(["FinalObjectiveValue"], ascending=True).head(5)