# SageMaker/DeepAR demo on sample price dataset

This notebook complements the [DeepAR introduction notebook](https://github.com/awslabs/amazon-sagemaker-examples/blob/master/introduction_to_amazon_algorithms/deepar_synthetic/deepar_synthetic.ipynb). 

Here, we will consider a real use case and show how to use DeepAR on SageMaker for predicting 
In particular, we will see how to:
* Prepare the dataset
* Use the SageMaker Python SDK to train a DeepAR model and deploy it
* Make requests to the deployed model to obtain forecasts interactively
* Illustrate advanced features of DeepAR: missing values, additional time features, non-regular frequencies and category information

Running this notebook takes around 20 min on a ml.c4.2xlarge for the training, and inference is done on a ml.m4.xlarge (the usage time will depend on how long you leave your served model running).

For more information see the DeepAR [documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html) or [paper](https://arxiv.org/abs/1704.04110), 

In [None]:
%matplotlib inline

import sys
from urllib.request import urlretrieve
import zipfile
from dateutil.parser import parse
import json
from random import shuffle
import random
import datetime
import os

import boto3
import s3fs
import sagemaker
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from __future__ import print_function
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets
from ipywidgets import IntSlider, FloatSlider, Checkbox

In [None]:
# set random seeds for reproducibility
np.random.seed(42)
random.seed(42)

In [None]:
# input data to train and test model
# Important step for the workshop - Update input_bucket with your s3 bucket that has the data
input_bucket = 'mod-da6d820750784dd7-simplebucket-1jeg10o4329yx' 
input_csv_location = 's3://' + input_bucket + '/data/tsla/tsla.csv'

In [None]:
sagemaker_session = sagemaker.Session()

Before starting, we can override the default values for the following:
- The S3 bucket and prefix that you want to use for training and model data. This should be within the same region as the Notebook Instance, training, and hosting.
- The IAM role arn used to give training and hosting access to your data. See the documentation for how to create these.

In [None]:
s3_bucket = sagemaker.Session().default_bucket()  # replace with an existing bucket if needed
s3_prefix = 'deepar-fdi-demo-notebook'    # prefix used for all data stored within the bucket

role = sagemaker.get_execution_role()             # IAM role to use by SageMaker

In [None]:
region = sagemaker_session.boto_region_name

s3_data_path = "s3://{}/{}/data".format(s3_bucket, s3_prefix)
s3_output_path = "s3://{}/{}/output".format(s3_bucket, s3_prefix)

Next, we configure the container image to be used for the region that we are running in.

In [None]:
image_name = sagemaker.amazon.amazon_estimator.get_image_uri(region, "forecasting-deepar", "latest")

### Import dataset and upload it to S3 to make it available for Sagemaker

Then, we load and parse the dataset and convert it to a collection of Pandas time series, which makes common time series operations such as indexing by time periods or resampling much easier. The data is originally recorded in 15min interval, which we could use directly. Here we want to forecast longer periods (one week) and resample the data to a granularity of 2 hours.

In [None]:
data1 = pd.read_csv(input_csv_location, index_col=0, parse_dates=True, decimal=',')
data1

In [None]:
data1 = data1[data1.index.hour != 20]
data_norm = data1.resample('1H').agg({'high': np.max, 'low': np.min, 'open': 'first', 'close': 'last', 'volume':np.sum}).dropna()

data_norm.high = pd.to_numeric(data_norm.high)
data_norm.low = pd.to_numeric(data_norm.low)
data_norm.open = pd.to_numeric(data_norm.open)
data_norm.close = pd.to_numeric(data_norm.close)
data_norm.volume = pd.to_numeric(data_norm.volume)
data_norm.index.name = None
num_timeseries1 = data_norm.shape[1]

timeseries1 = []
for i in range(num_timeseries1-1):
    timeseries1.append(np.trim_zeros(data_norm.iloc[:,i], trim='f'))
timeseries1[1].index[-1]

In [None]:
num_timeseries1

In [None]:
data_norm

Let us plot the resulting time series.

In [None]:
fig, axs1 = plt.subplots(2, 2, figsize=(20, 20), sharex=True)
axx1 = axs1.ravel()
for i in range(0, 4):
    timeseries1[i].loc["2018-04-25":"2018-06-26"].interpolate().plot(ax=axx1[i])
    axx1[i].set_xlabel("date")    
    axx1[i].set_ylabel("Value")   
    axx1[i].grid(which='minor', axis='x')

### Train and Test splits

Often times one is interested in evaluating the model or tuning its hyperparameters by looking at error metrics on a hold-out test set. Here we split the available data into train and test sets for evaluating the trained model. For standard machine learning tasks such as classification and regression, one typically obtains this split by randomly separating examples into train and test sets. However, in forecasting it is important to do this train/test split based on time rather than by time series.

In this example, we will reserve the last section of each of the time series for evalutation purpose and use only the first part as training data. 

In [None]:
# we use 1 hour frequency for the time series
freq = '1H'

# we predict for 7 days
prediction_length = 7 * 12

# we also use 7 days as context length, this is the number of state updates accomplished before making predictions
context_length = 7 * 12

We specify here the portion of the data that is used for training: the model sees data from 2018-04-25 to 2018-06-30 for training.

In [None]:
start_dataset = pd.Timestamp("2018-04-25 00:00:00", freq=freq)
end_training = pd.Timestamp("2018-06-30 00:00:00", freq=freq)

The DeepAR JSON input format represents each time series as a JSON object. In the simplest case each time series just consists of a start time stamp (``start``) and a list of values (``target``). For more complex cases, DeepAR also supports the fields ``dynamic_feat`` for time-series features and ``cat`` for categorical features, which we will use  later.

In [None]:
training_data = [
    {
        "start": str(start_dataset),
        "target": ts[start_dataset:end_training - 1].tolist()  # We use -1, because pandas indexing includes the upper bound 
    }
    for ts in timeseries1
]
print(len(training_data))
# training_data

As test data, we will consider time series extending beyond the training range: these will be used for computing test scores, by using the trained model to forecast their trailing 7 days, and comparing predictions with actual values.
To evaluate our model performance on more than one week, we generate test data that extends to 1, 2, 3, 4 weeks beyond the training range. This way we perform *rolling evaluation* of our model.

Let's now write the dictionary to the `jsonlines` file format that DeepAR understands (it also supports gzipped jsonlines and parquet).

In [None]:
num_test_windows = 4

test_data = [
    {
        "start": str(start_dataset),
        "target": ts[start_dataset:end_training + k * prediction_length].tolist()
    }
    for k in range(1, num_test_windows + 1) 
    for ts in timeseries1
]
print(len(test_data))

In [None]:
test_data[-1]

In [None]:
timeseries1

In [None]:
def write_dicts_to_file(path, data):
    with open(path, 'wb') as fp:
        for d in data:
            fp.write(json.dumps(d).encode("utf-8"))
            fp.write("\n".encode('utf-8'))

In [None]:
%%time
write_dicts_to_file("train.json", training_data)
write_dicts_to_file("test.json", test_data)

Now that we have the data files locally, let us copy them to S3 where DeepAR can access them. Depending on your connection, this may take a couple of minutes.

In [None]:
s3 = boto3.resource('s3')
def copy_to_s3(local_file, s3_path, override=False):
    assert s3_path.startswith('s3://')
    split = s3_path.split('/')
    bucket = split[2]
    path = '/'.join(split[3:])
    buk = s3.Bucket(bucket)
    
    if len(list(buk.objects.filter(Prefix=path))) > 0:
        if not override:
            print('File s3://{}/{} already exists.\nSet override to upload anyway.\n'.format(s3_bucket, s3_path))
            return
        else:
            print('Overwriting existing file')
    with open(local_file, 'rb') as data:
        print('Uploading file to {}'.format(s3_path))
        buk.put_object(Key=path, Body=data)

In [None]:
%%time
copy_to_s3("train.json", s3_data_path + "/train/train.json", override=True)
copy_to_s3("test.json", s3_data_path + "/test/test.json", override=True)

Let's have a look to what we just wrote to S3.

In [None]:
s3filesystem = s3fs.S3FileSystem()
with s3filesystem.open(s3_data_path + "/train/train.json", 'rb') as fp:
    print(fp.readline().decode("utf-8")[:100] + "...")

### Train a model

Here we define the estimator that will launch the training job.

In [None]:
estimator = sagemaker.estimator.Estimator(
    sagemaker_session=sagemaker_session,
    image_name=image_name,
    role=role,
    train_instance_count=1,
    train_instance_type='ml.c4.2xlarge',
    base_job_name='deepar-fdi-demo',
    output_path=s3_output_path
)

We are all set with our dataset processing, we can now call DeepAR to train a model and generate predictions.

Next we need to set the hyperparameters for the training job. For example frequency of the time series used, number of data points the model will look at in the past, number of predicted data points. The other hyperparameters concern the model to train (number of layers, number of cells per layer, likelihood function) and the training options (number of epochs, batch size, learning rate...). We use default parameters for every optional parameter in this case (you can always use [Sagemaker Automated Model Tuning](https://aws.amazon.com/blogs/aws/sagemaker-automatic-model-tuning/) to tune them).

In [None]:
hyperparameters = {
    "time_freq": freq,
    "epochs": "400",
    "early_stopping_patience": "40",
    "mini_batch_size": "64",
    "learning_rate": "5E-4",
    "context_length": str(context_length),
    "prediction_length": str(prediction_length)
}

In [None]:
estimator.set_hyperparameters(**hyperparameters)

We are ready to launch the training job. SageMaker will start an EC2 instance, download the data from S3, start training the model and save the trained model.

If you provide the `test` data channel as we do in this example, DeepAR will also calculate accuracy metrics for the trained model on this test. This is done by predicting the last `prediction_length` points of each time-series in the test set and comparing this to the actual value of the time-series. 

**Note:** the next cell may take a few minutes to complete, depending on data size, model complexity, training options.

In [None]:
%%time
data_channels = {
    "train": "{}/train/".format(s3_data_path),
    "test": "{}/test/".format(s3_data_path)
}

estimator.fit(inputs=data_channels, wait=True)

Since you pass a test set in this example, accuracy metrics for the forecast are computed and logged (see bottom of the log).
You can find the definition of these metrics from [our documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html). You can use these to optimize the parameters and tune your model or use SageMaker's [Automated Model Tuning service](https://aws.amazon.com/blogs/aws/sagemaker-automatic-model-tuning/) to tune the model for you.

### Create endpoint and predictor

Now that we have a trained model, we can use it to perform predictions by deploying it to an endpoint.

**Note: Remember to delete the endpoint after running this experiment. A cell at the very bottom of this notebook will do that: make sure you run it at the end.**

To query the endpoint and perform predictions, we can define the following utility class: this allows making requests using `pandas.Series` objects rather than raw JSON strings.

In [None]:
class DeepARPredictor(sagemaker.predictor.RealTimePredictor):
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, content_type=sagemaker.content_types.CONTENT_TYPE_JSON, **kwargs)
        
    def predict(self, ts, cat=None, dynamic_feat=None, 
                num_samples=200, return_samples=False, quantiles=["0.1", "0.5", "0.9"]):
        """Requests the prediction of for the time series listed in `ts`, each with the (optional)
        corresponding category listed in `cat`.
        
        ts -- `pandas.Series` object, the time series to predict
        cat -- integer, the group associated to the time series (default: None)
        num_samples -- integer, number of samples to compute at prediction time (default: 100)
        return_samples -- boolean indicating whether to include samples in the response (default: False)
        quantiles -- list of strings specifying the quantiles to compute (default: ["0.1", "0.5", "0.9"])
        
        Return value: list of `pandas.DataFrame` objects, each containing the predictions
        """
        prediction_time = ts.index[-1] + 1
        #print(len(prediction_time))
        #print (len(num_samples))
        quantiles = [str(q) for q in quantiles]
        req = self.__encode_request(ts, cat, dynamic_feat, num_samples, return_samples, quantiles)
        res = super(DeepARPredictor, self).predict(req)
        return self.__decode_response(res, ts.index.freq, prediction_time, return_samples)
    
    def __encode_request(self, ts, cat, dynamic_feat, num_samples, return_samples, quantiles):
        instance = series_to_dict(ts, cat if cat is not None else None, dynamic_feat if dynamic_feat else None)

        configuration = {
            "num_samples": num_samples,
            "output_types": ["quantiles", "samples"] if return_samples else ["quantiles"],
            "quantiles": quantiles
        }
        
        http_request_data = {
            "instances": [instance],
            "configuration": configuration
        }
        
        return json.dumps(http_request_data).encode('utf-8')
    
    def __decode_response(self, response, freq, prediction_time, return_samples):
        # we only sent one time series so we only receive one in return
        # however, if possible one will pass multiple time series as predictions will then be faster
        predictions = json.loads(response.decode('utf-8'))['predictions'][0]
        prediction_length = len(next(iter(predictions['quantiles'].values())))
        prediction_index = pd.DatetimeIndex(start=prediction_time, freq=freq, periods=prediction_length)        
        if return_samples:
            dict_of_samples = {'sample_' + str(i): s for i, s in enumerate(predictions['samples'])}
        else:
            dict_of_samples = {}
        return pd.DataFrame(data={**predictions['quantiles'], **dict_of_samples}, index=prediction_index)

    def set_frequency(self, freq):
        self.freq = freq
        
def encode_target(ts):
    return [x if np.isfinite(x) else "NaN" for x in ts]        

def series_to_dict(ts, cat=None, dynamic_feat=None):
    """Given a pandas.Series object, returns a dictionary encoding the time series.

    ts -- a pands.Series object with the target time series
    cat -- an integer indicating the time series category

    Return value: a dictionary
    """
    obj = {"start": str(ts.index[0]), "target": encode_target(ts)}
    if cat is not None:
        obj["cat"] = cat
    if dynamic_feat is not None:
        obj["dynamic_feat"] = dynamic_feat        
    return obj

Now we can deploy the model and create and endpoint that can be queried using our custom DeepARPredictor class.

In [None]:
predictor = estimator.deploy(
    initial_instance_count=1,
    instance_type='ml.m4.xlarge',
    predictor_cls=DeepARPredictor)

### Make predictions

Now we can use the `predictor` object to generate predictions.

In [None]:
data_norm2 = data1.resample('1H').agg({'high': np.max, 'low': np.min, 'open': 'first', 'close': 'last', 'volume':np.sum})
data_norm2.high = pd.to_numeric(data_norm2.high)
data_norm2.low = pd.to_numeric(data_norm2.low)
data_norm2.open = pd.to_numeric(data_norm2.open)
data_norm2.close = pd.to_numeric(data_norm2.close)
data_norm2.volume = pd.to_numeric(data_norm2.volume)
data_norm2.index.name = None

timeseries2 = []
num_timeseries2 = data_norm2.shape[1]

for i in range(num_timeseries2-1):
    timeseries2.append(np.trim_zeros(data_norm2.iloc[:,i], trim='fb'))
timeseries2[1].index[-1]

In [None]:
left=pd.DataFrame(data=timeseries2[1].dropna())
left.reset_index(inplace=True)
left.columns = ['time','Price']

In [None]:
tspred = timeseries2[1]
right=predictor.predict(ts=tspred[:-14*24], num_samples=50, quantiles=[0.10, 0.5, 0.90])
right.reset_index(inplace=True)
right.columns = ['time','10thPercentile','50thPercentile', '90thPercentile']

right2=predictor.predict(ts=tspred[:-7*24], num_samples=50, quantiles=[0.10, 0.5, 0.90])
right2.reset_index(inplace=True)
right2.columns = ['time','10thPercentile','50thPercentile', '90thPercentile']

right=right.append(right2)
print(right)

In [None]:
print(right)
#print(right2)


In [None]:
dfp=pd.merge(left, right, how='outer', on=['time', 'time'])
dfp

### Save predictions to S3 bucket

In [None]:
from io import StringIO
import boto3

# Output to write predictions 
output_bucket_name = input_bucket  ## using the same bucket as input but different key to write output.
output_key = 'data/predicted/tslaPredict.csv'

csv_buffer = StringIO()
dfp.to_csv(csv_buffer, index=False)
s3_resource = boto3.resource('s3')
s3_resource.Object(output_bucket_name, output_key).put(Body=csv_buffer.getvalue())

### Delete endpoints

In [None]:
predictor.delete_endpoint()

predictor_new_features.delete_endpoint()