####  Workflow
1. Loading the data
2. Creating training and test sets of time series
3. Formatting data as JSON files and uploading to S3
4. Instantiating and training a DeepAR estimator
5. Apply the estimator with Batch Transform on the input data
6. Evaluating the predictions

In [4]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

In [5]:
# load data 
df = pd.read_csv('prepared_data_all.csv')
df.head()

Unnamed: 0.1,Unnamed: 0,Store,DayOfWeek,Date,Sales,Open,Promo,StateHoliday,SchoolHoliday,ID,StoreType,Assortment,CompetitionDistance,Promo_2_active,Open_sunday
0,0,1,5,2015-07-31,5263,1,1,1,1,0,2,0,1270,0,0
1,1,2,5,2015-07-31,6064,1,1,1,1,1,0,0,570,1,0
2,2,3,5,2015-07-31,8314,1,1,1,1,2,0,0,14130,1,0
3,3,4,5,2015-07-31,13995,1,1,1,1,3,2,2,620,0,0
4,4,5,5,2015-07-31,4822,1,1,1,1,4,0,0,29910,0,0


In [6]:
df = df.drop(columns=['Unnamed: 0'])

In [7]:
# change order of df so latest date comes first
df = df.iloc[::-1]

In [8]:
# for the DeepAr Algorithm, Categorical features must be encoded as a 0-based sequence of positive integers
# thus the stores (which is the category I want to distinguish in this time series) needs to start from zero and not from 1

# reduce all store numbers by one
df ['Store'] =  df.Store.apply(lambda x: (x-1))
df.head()

Unnamed: 0,Store,DayOfWeek,Date,Sales,Open,Promo,StateHoliday,SchoolHoliday,ID,StoreType,Assortment,CompetitionDistance,Promo_2_active,Open_sunday
1017208,1114,2,2013-01-01,0,0,0,1,1,1017208,3,2,5350,0,0
1017207,1113,2,2013-01-01,0,0,0,1,1,1017207,0,2,870,0,0
1017206,1112,2,2013-01-01,0,0,0,1,1,1017206,0,2,9260,0,0
1017205,1111,2,2013-01-01,0,0,0,1,1,1017205,2,2,1880,0,0
1017204,1110,2,2013-01-01,0,0,0,1,1,1017204,0,0,1900,0,0


In [9]:
# filter into train/test before running transformation

# as defined in Proposal: 
# train range 07.01.2013 – 07.06.2015 (94,7% of data)
# test range 08.06.2015-26.07.2015 (5,3% of data) 

# TRAIN
# Filter out all rows with a date past 07.06.2015
df_train = df[df['Date']<'2015-06-08']
# Filter out all rows with a date before 07.01.2013
df_train = df_train[df_train['Date']>='2013-07-01']

# TEST
# !!!!! for this algorithm, the test set contains the complete range of each time series.!!!!!
# Filter out all rows with a date before 08.06.2015
df_test = df[df['Date']>='2013-07-01']
# Filter out all rows with a date past 26.07.2015
df_test = df_test[df_test['Date']<'2015-07-27']

In [10]:
df_train['Date'] = pd.to_datetime(df_train.Date)
df_test['Date'] = pd.to_datetime(df_test.Date)

In [11]:
df_train = df_train.set_index('Date')
df_test = df_test.set_index('Date')

#### Convert to JASON 

In [12]:
# subset a list of Stores to iterate over
store_nr = list(df_train['Store'].unique())

In [13]:
# import json for formatting data and os for saving
import json
import os 

# transforming df

def write_json_dataset(df, filename): 
    with open(filename, 'wb') as f:
        # for each of our times series, there is one JSON line
        for store in store_nr:
            df_store = df.loc[df['Store'] == store]
            obj = {"start": str(df_store.index[0]), "target": list(df_store.Sales), "cat": [int(store)], "dynamic_feat": [list(df_store.DayOfWeek),list(df_store.Open),list(df_store.Promo),list(df_store.StateHoliday),list(df_store.SchoolHoliday),list(df_store.StoreType),list(df_store.Assortment),list(df_store.CompetitionDistance),list(df_store.Promo_2_active),list(df_store.Open_sunday)]}
            json_line = json.dumps(obj) + '\n'
            json_line = json_line.encode('utf-8')
            f.write(json_line)
    print(filename + ' saved.')

In [14]:
# save this data to a local directory
data_dir = 'json_rossmann'

# make data dir, if it does not exist
if not os.path.exists(data_dir):
    os.makedirs(data_dir)

In [15]:
# directories to save train/test data
train_key = os.path.join(data_dir, 'train.json')
test_key = os.path.join(data_dir, 'test.json')

# write train/test JSON files
write_json_dataset(df_train, train_key)        
write_json_dataset(df_test, test_key)

json_rossmann/train.json saved.
json_rossmann/test.json saved.


#### Store to S3

In [16]:
import boto3
import sagemaker
from sagemaker import get_execution_role

In [17]:
# session, role, bucket
sagemaker_session = sagemaker.Session()
role = get_execution_role()

bucket = sagemaker_session.default_bucket()

In [18]:
# general prefix
prefix='deepar-rossmann'

# *unique* train/test prefixes
train_prefix   = '{}/{}'.format(prefix, 'train')
test_prefix    = '{}/{}'.format(prefix, 'test')

# uploading data to S3, and saving locations
train_path  = sagemaker_session.upload_data(train_key, bucket=bucket, key_prefix=train_prefix)
test_path   = sagemaker_session.upload_data(test_key,  bucket=bucket, key_prefix=test_prefix)

In [19]:
# check locations
print('Training data is stored in: '+ train_path)
print('Test data is stored in: '+ test_path)

Training data is stored in: s3://sagemaker-eu-central-1-395339144106/deepar-rossmann/train/train.json
Test data is stored in: s3://sagemaker-eu-central-1-395339144106/deepar-rossmann/test/test.json


checking a preview of the json the data at s3 through the aws console shows, that all the data is in the right format as indicated in the aws documentation: https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html

## Modelling DeepAR

#### 1. Setup

In [20]:
from sagemaker.amazon.amazon_estimator import get_image_uri

image_name = get_image_uri(boto3.Session().region_name, # get the region
                           'forecasting-deepar') # specify image


In [21]:
from sagemaker.estimator import Estimator

# dir to save model artifacts
s3_output_path = "s3://{}/{}/output".format(bucket, prefix)

# instantiate a DeepAR estimator
estimator = Estimator(sagemaker_session=sagemaker_session,
                      image_name=image_name,
                      role=role,
                      train_instance_count=1,
                      train_instance_type='ml.p2.xlarge',
                      output_path=s3_output_path
                      )

In [22]:
freq='D'
prediction_length= 49 # number of days in test data set (7 weeks x 7 days)
context_length= 490 # less then number of days in train data set (126 weeks x 7 days); 
# "a model can look further back in the time series than the value specified for context_length"
epochs = 50 # the maximum number of times to pass over the data when training
# Further parameter explenation: https://docs.aws.amazon.com/forecast/latest/dg/aws-forecast-recipe-deeparplus.html 

hyperparameters = {
    "epochs": str(epochs),
    "time_freq": freq,
    "prediction_length": str(prediction_length),
    "context_length": str(context_length),
    "num_cells": "50",
    "num_layers": "3",
    "mini_batch_size": "128",
    "learning_rate": "0.001",
    "early_stopping_patience": "10"
}

In [23]:
# set the hyperparams
estimator.set_hyperparameters(**hyperparameters)

#### 2. Training

In [24]:
%%time
# train and test channels
data_channels = {
    "train": train_path,
    "test": test_path
}

# fit the estimator
estimator.fit(inputs=data_channels)

2020-04-09 10:17:38 Starting - Starting the training job...
2020-04-09 10:17:40 Starting - Launching requested ML instances......
2020-04-09 10:19:05 Starting - Preparing the instances for training......
2020-04-09 10:19:59 Downloading - Downloading input data...
2020-04-09 10:20:10 Training - Downloading the training image...
2020-04-09 10:21:00 Training - Training image download completed. Training in progress.[34mArguments: train[0m
[34m[04/09/2020 10:21:01 INFO 140402030520128] Reading default configuration from /opt/amazon/lib/python2.7/site-packages/algorithm/resources/default-input.json: {u'num_dynamic_feat': u'auto', u'dropout_rate': u'0.10', u'mini_batch_size': u'128', u'test_quantiles': u'[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]', u'_tuning_objective_metric': u'', u'_num_gpus': u'auto', u'num_eval_samples': u'100', u'learning_rate': u'0.001', u'num_cells': u'40', u'num_layers': u'2', u'embedding_dimension': u'10', u'_kvstore': u'auto', u'_num_kv_servers': u'auto', u'c

[34m[04/09/2020 10:22:31 INFO 140402030520128] Epoch[2] Batch[5] avg_epoch_loss=6.071476[0m
[34m[04/09/2020 10:22:31 INFO 140402030520128] #quality_metric: host=algo-1, epoch=2, batch=5 train loss <loss>=6.0714764595[0m
[34m[04/09/2020 10:22:31 INFO 140402030520128] Epoch[2] Batch [5]#011Speed: 51.02 samples/sec#011loss=6.071476[0m
[34m[04/09/2020 10:22:43 INFO 140402030520128] Epoch[2] Batch[10] avg_epoch_loss=5.752288[0m
[34m[04/09/2020 10:22:43 INFO 140402030520128] #quality_metric: host=algo-1, epoch=2, batch=10 train loss <loss>=5.36926279068[0m
[34m[04/09/2020 10:22:43 INFO 140402030520128] Epoch[2] Batch [10]#011Speed: 54.37 samples/sec#011loss=5.369263[0m
[34m[04/09/2020 10:22:43 INFO 140402030520128] processed a total of 1292 examples[0m
[34m#metrics {"Metrics": {"update.time": {"count": 1, "max": 27725.69489479065, "sum": 27725.69489479065, "min": 27725.69489479065}}, "EndTime": 1586427763.153605, "Dimensions": {"Host": "algo-1", "Operation": "training", "Algor

[34m[04/09/2020 10:24:57 INFO 140402030520128] Epoch[7] Batch[10] avg_epoch_loss=6.237179[0m
[34m[04/09/2020 10:24:57 INFO 140402030520128] #quality_metric: host=algo-1, epoch=7, batch=10 train loss <loss>=6.54689407349[0m
[34m[04/09/2020 10:24:57 INFO 140402030520128] Epoch[7] Batch [10]#011Speed: 54.41 samples/sec#011loss=6.546894[0m
[34m[04/09/2020 10:24:57 INFO 140402030520128] processed a total of 1285 examples[0m
[34m#metrics {"Metrics": {"update.time": {"count": 1, "max": 27705.15489578247, "sum": 27705.15489578247, "min": 27705.15489578247}}, "EndTime": 1586427897.155901, "Dimensions": {"Host": "algo-1", "Operation": "training", "Algorithm": "AWS/DeepAR"}, "StartTime": 1586427869.450299}
[0m
[34m[04/09/2020 10:24:57 INFO 140402030520128] #throughput_metric: host=algo-1, train throughput=46.3810658896 records/second[0m
[34m[04/09/2020 10:24:57 INFO 140402030520128] #progress_metric: host=algo-1, completed 16 % of epochs[0m
[34m[04/09/2020 10:24:57 INFO 14040203052

[34m#metrics {"Metrics": {"get_graph.time": {"count": 1, "max": 10388.356924057007, "sum": 10388.356924057007, "min": 10388.356924057007}}, "EndTime": 1586428011.531126, "Dimensions": {"Host": "algo-1", "Operation": "training", "Algorithm": "AWS/DeepAR"}, "StartTime": 1586428001.141846}
[0m
[34m[04/09/2020 10:26:52 INFO 140402030520128] Number of GPUs being used: 1[0m
[34m#metrics {"Metrics": {"finalize.time": {"count": 1, "max": 11429.579973220825, "sum": 11429.579973220825, "min": 11429.579973220825}}, "EndTime": 1586428012.572312, "Dimensions": {"Host": "algo-1", "Operation": "training", "Algorithm": "AWS/DeepAR"}, "StartTime": 1586428011.531211}
[0m
[34m[04/09/2020 10:26:52 INFO 140402030520128] Serializing to /opt/ml/model/model_algo-1[0m
[34m[04/09/2020 10:26:52 INFO 140402030520128] Saved checkpoint to "/opt/ml/model/model_algo-1-0000.params"[0m
[34m#metrics {"Metrics": {"model.serialize.time": {"count": 1, "max": 166.12005233764648, "sum": 166.12005233764648, "min": 

#### 3. create predictor (by deploying estimator) 

In [33]:
%%time

# create a predictor
predictor = estimator.deploy(
    initial_instance_count=1,
    instance_type='ml.p2.xlarge',
    content_type="application/json" # specify that it will accept/produce JSON
)

---------------!CPU times: user 250 ms, sys: 5.66 ms, total: 255 ms
Wall time: 7min 32s


### 4. generating predictions

Calling the preictor directly after transformation of the input data caused a runtime error. Therefore I changed the code to a Batch Transform Prediction.  

#### function to change the df to a json format (similar to the actions for training the model)
each time series (here each store) is one line of json code after transformation
here in addition to the json_object, the configuration is also attached to each time series

In [50]:
def write_predictor_input(df, filename): 
    with open(filename, 'wb') as f:
        # for each of our times series, there is one JSON line
        instances = []
        for store in store_nr:
            df_store = df.loc[df['Store'] == store]
            l = list(df_store.Sales)
            del l[-49:]
            json_obj = {"start": str(df_store.index[0]), "target": l , "cat": [int(store)], "dynamic_feat": [list(df_store.DayOfWeek),list(df_store.Open),list(df_store.Promo),list(df_store.StateHoliday),list(df_store.SchoolHoliday),list(df_store.StoreType),list(df_store.Assortment),list(df_store.CompetitionDistance),list(df_store.Promo_2_active),list(df_store.Open_sunday)]}
            instances.append(json_obj)
            
        configuration = {"num_samples": 10, 
                         "output_types": ["mean","quantiles"], 
                         "quantiles": ['0.1', '0.5', '0.9']}
        request_data = {"instances": instances, 
                        "configuration": configuration}
        json_request = json.dumps(request_data) + '\n'
        json_request = json_request.encode('utf-8')
        f.write(json_request)
    print(filename + ' saved.')

#### Apply function to save data in the right format for the DeppAr Algorithm

In [51]:
# save this data to a local directory
data_dir = 'json_rossmann'

# directories to save train/test data
input_key = os.path.join(data_dir, 'prediction_input.json')

# write train/test JSON files       
write_predictor_input(df_test, input_key)

json_rossmann/prediction_input.json saved.


#### saving the data to S3

In [52]:
sagemaker_session = sagemaker.Session()
role = get_execution_role()
bucket = sagemaker_session.default_bucket()
prefix='deepar-rossmann'

# *unique* prefix
input_prefix   = '{}/{}'.format(prefix, 'input')

# uploading data to S3, and saving locations
predicton_input_path  = sagemaker_session.upload_data(input_key, bucket=bucket, key_prefix=input_prefix)

# check locations
print('Prediction input data is stored in: '+ predicton_input_path)

Prediction input data is stored in: s3://sagemaker-eu-central-1-395339144106/deepar-rossmann/input/prediction_input.json


#### Batch Transform job (since the amount of data would cause a timeout otherwise)

You can deploy a model in Amazon SageMaker in one of two ways:

1. Create a persistent HTTPS endpoint where the model provides real-time inference.
2. Run an Amazon SageMaker batch transform job that starts an endpoint, generates inferences on the stored dataset, outputs the inference predictions, and then shuts down the endpoint.

Ressource: https://idk.dev/kinect-energy-uses-amazon-sagemaker-to-forecast-energy-prices-with-machine-learning/

In [58]:
# Batch Transform

import boto3
# Create the SageMaker Boto3 client
boto3_sm = boto3.client('sagemaker')

import time
from time import gmtime, strftime

batch_job_name = 'Batch-Transform-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
input_location = 's3://sagemaker-eu-central-1-395339144106/deepar-rossmann/input/prediction_input.json'
output_location = 's3://{}/{}/output/{}'.format(bucket, prefix, batch_job_name)
# ModelName is copied from the finished training job at aws console

request = \
{
    "BatchStrategy": "SingleRecord",
    "MaxPayloadInMB": 100,
    "TransformJobName": batch_job_name,
    "ModelName": 'forecasting-deepar-2020-04-09-10-17-38-593',
    "TransformOutput": {
        "S3OutputPath": output_location,
        "Accept": "application/jsonlines",
        "AssembleWith": "Line"
    },
    "TransformInput": {
        "DataSource": {
            "S3DataSource": {
                "S3DataType": "S3Prefix",
                "S3Uri": input_location 
            }
        },
        "ContentType": "application/jsonlines",
        "SplitType": "Line",
        "CompressionType": "None"
    },
    "TransformResources": {
            "InstanceType": "ml.m4.xlarge",
            "InstanceCount": 1
    }
}

boto3_sm.create_transform_job(**request)
print("Created Transform job with name: ", batch_job_name)

# Wait until the job finishes
try:
    boto3_sm.get_waiter('transform_job_completed_or_stopped').wait(TransformJobName=batch_job_name)
finally:
    response = boto3_sm.describe_transform_job(TransformJobName=batch_job_name)
    status = response['TransformJobStatus']
    print("Transform job ended with status: " + status) 
    if status == 'Failed':
        message =response['FailureReason']
        print('Transform failed with the following error: {}'.format(message))
        raise Exception('Transform job failed')

Created Transform job with name:  Batch-Transform-2020-04-09-15-19-26
Transform job ended with status: Completed


#### inspect output

In [64]:
import json
import io
from urllib.parse import urlparse

def get_output_from_s3(s3uri, file_name):
    parsed_url = urlparse(s3uri)
    bucket_name = parsed_url.netloc
    prefix = parsed_url.path[1:]
    s3 = boto3.resource('s3')
    obj = s3.Object(bucket_name, '{}/{}'.format(prefix, file_name))
    return obj.get()["Body"].read().decode('utf-8')

In [65]:
output = get_output_from_s3(output_location, '{}.out'.format('prediction_input.json'))
#https://sagemaker-eu-central-1-395339144106.s3.eu-central-1.amazonaws.com/deepar-rossmann/output/Batch-Transform-2020-04-09-15-19-26/prediction_input.json.out
output_df = pd.read_csv(io.StringIO(output), sep=",", header=None)
output_df.head(8)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14
0,"{""error"":""'start' is a required property\n\nFa...",\n 'properties': {'cat': {'anyOf': [{'type...,\n {'item...,\n 'type...,\n 'dynamic_feat': {'items'...,\n ...,\n 'type':...,\n 'start': {'type': 'string'},\n 'target': {'items': {'an...,\n ...,\n ...,\n 'type': 'arra...,\n 'required': ['start','target'],\n 'type': 'object'}\n\nOn instance['insta...


### Conclusion

The output looks horrible and is definetly nothing I can continue using. At this point I am stuck and will stop the DeepAr approach. It seems to me that the algorithms documentation and application is very rare when it comes to using it with multiple categories and especially features. Also for the Batch Transform application I had to spent a lot of time to figure out a way that seemed to work. I think in every project there is a time for debugging and trying different approaches and there is a time for calling the project off. There sure are other possibilites to solve this use case, but for me the goal of this udacity capstone project was not to predict sales no matter what, but to apply the aws sagemaker DeepAr Algorithm.   

#### not used code snippets for decoding

In [None]:
def __decode_response(self, response, prediction_times, encoding="utf-8"):
    response_data = json.loads(response.decode(encoding))
    list_of_df = []
    for k in range(len(prediction_times)):
        prediction_index = pd.DatetimeIndex(start=prediction_times[k], freq=self.freq, periods=self.prediction_length)
        list_of_df.append(pd.DataFrame(data=response_data['predictions'][k]['quantiles'], index=prediction_index))
    return list_of_df

In [None]:
# display results 
def plot(
    predictor, 
    target_ts, 
    cat=None, 
    dynamic_feat=None, 
    forecast_date=end_training, 
    show_samples=False, 
    plot_history=7 * 12,
    confidence=80
):
    print("calling served model to generate predictions starting from {}".format(str(forecast_date)))
    assert(confidence > 50 and confidence < 100)
    low_quantile = 0.5 - confidence * 0.005
    up_quantile = confidence * 0.005 + 0.5
        
    # we first construct the argument to call our model
    args = {
        "ts": target_ts[:forecast_date],
        "return_samples": show_samples,
        "quantiles": [low_quantile, 0.5, up_quantile],
        "num_samples": 100
    }


    if dynamic_feat is not None:
        args["dynamic_feat"] = dynamic_feat
        fig = plt.figure(figsize=(20, 6))
        ax = plt.subplot(2, 1, 1)
    else:
        fig = plt.figure(figsize=(20, 3))
        ax = plt.subplot(1,1,1)
    
    if cat is not None:
        args["cat"] = cat
        ax.text(0.9, 0.9, 'cat = {}'.format(cat), transform=ax.transAxes)

    # call the end point to get the prediction
    prediction = predictor.predict(**args)

    # plot the samples
    if show_samples: 
        for key in prediction.keys():
            if "sample" in key:
                prediction[key].plot(color='lightskyblue', alpha=0.2, label='_nolegend_')
                
                
    # plot the target
    target_section = target_ts[forecast_date-plot_history:forecast_date+prediction_length]
    target_section.plot(color="black", label='target')
    
    # plot the confidence interval and the median predicted
    ax.fill_between(
        prediction[str(low_quantile)].index, 
        prediction[str(low_quantile)].values, 
        prediction[str(up_quantile)].values, 
        color="b", alpha=0.3, label='{}% confidence interval'.format(confidence)
    )
    prediction["0.5"].plot(color="b", label='P50')
    ax.legend(loc=2)    
    
    # fix the scale as the samples may change it
    ax.set_ylim(target_section.min() * 0.5, target_section.max() * 1.5)
    
    if dynamic_feat is not None:
        for i, f in enumerate(dynamic_feat, start=1):
            ax = plt.subplot(len(dynamic_feat) * 2, 1, len(dynamic_feat) + i, sharex=ax)
            feat_ts = pd.Series(
                index=pd.DatetimeIndex(start=target_ts.index[0], freq=target_ts.index.freq, periods=len(f)),
                data=f
            )
            feat_ts[forecast_date-plot_history:forecast_date+prediction_length].plot(ax=ax, color='g')