# SageMaker/DeepAR Model Training Demo
This notebook:
1. Initializes the SageMaker environment
2. Loads the data from SageMaker Feature Store
3. Prepares the data for the model training
4. Trains an Amazon DeepAR time series forecasting model
5. Deploys a SageMaker Endpoint for testing the model
6. Cleans up the SageMaker model

__Credits:__
* This notebook is mainly inspired by the [SageMaker/DeepAR demo on electricity dataset notebook](https://github.com/aws/amazon-sagemaker-examples/blob/main/introduction_to_amazon_algorithms/deepar_electricity/DeepAR-Electricity.ipynb)

## Model Training
### Target Variable
By default, this notebook selects the `avg_fee_1min` (average transaction fee over a 1 minute period) aggregated variable generated by Kinesis Analytics as the target value for the model. `total_nb_trx_1min` (total number of transactions during a 1 minute period) and `total_fee_1min` (total amount of transaction fees during a 1 minute period) can also be used. 
### Time Series Period
In this demo, the data are aggregate in near real time by the Kinesis Analytics stream over a 1 minute period. When predicting, we train the model to predict 30 steps (30 minutes) into the future. Aggregating data over such a short time period and predicting only 30 minutes in the future might not be useful in a real world scenario. This is however done on purpose to generate enough data fast enough to train the model without having to let the demo run for weeks and months in order to collect enough data (especially due to the costs incured to running the demo).
### Why using Amazon DeepAR Model?
[Amazon DeepAR model](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html) is a deep learning model use to forecast one-dimensional time series. It is best suited when *"you have many similar time series across a set of cross-sectional units"* e.g., many households electricity consumption, a server farm CPU utilization, temperatures from multiple weather stations.

In this demo we have a single time series of blockchain transactions and aggregate in near realtime some metrics over a 1 minute period by default. Our data is thus not optimized to be used with the Amazon DeepAR model.

So why do we chose Amazon DeepAR?

Despite not being an optimum model for the data we chose to use Amazon DeepAR model for the following reasons:
* The objective of this demo is not to train the best model but to demo MLOps on AWS
* According to AWS documentation, Amazon DeepAR can be used with single time series and based on basic comparison tests we did with forecasting models like ARIMA it provided better initial results
* Being a model developed by AWS, the SageMaker environment provides out of the box containers to train and deploy this model without having to build our own, thus simplifying the demo

In [None]:
from __future__ import print_function

%matplotlib inline

import json
import os

import boto3
import sagemaker
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sagemaker.session import Session
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.serializers import IdentitySerializer

## Initialisation

__IMPORTANT__: Update the stack prefix and suffix to the one of your deployment. This values are displayed in the console when running the CDK CLI command to deploy the stack. They can also be found by looking at stack resource names in the AWS console.

In [None]:
# Naming prefixes coming from the CDK Stack
stack_prefix = "mlops-rdi-deve274c85"
stack_suffix = "afb6f5f2"

### SageMaker Initialisation

In [None]:
# Set S3 Buckets variables
offline_feature_store_bucket = f"{stack_prefix}-sagemaker-feature-store-bucket-{stack_suffix}"
model_artifacts_bucket = f"{stack_prefix}-sagemaker-model-artifacts-{stack_suffix}"
# Set prefix used for all data stored within the bucket
s3_model_prefix = "deepar-model" 
train_test_data_folder_prefix = "model-testing"
s3_data_path = f"s3://{model_artifacts_bucket}/data/{train_test_data_folder_prefix}"
s3_output_path = f"s3://{model_artifacts_bucket}/{s3_model_prefix}/output"

In [None]:
# Set session variables
sagemaker_session = sagemaker.Session(default_bucket = model_artifacts_bucket)
region = sagemaker_session.boto_region_name
account_id = sagemaker_session.account_id()
role = sagemaker.get_execution_role() 
boto_session = boto3.Session(region_name=region)

In [None]:
# Set boto3 S3 client
s3 = boto3.resource("s3")
# Set feature store session
sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

## Load Data From FeatureStore
The notebook uses Athena to load the data from the S3 bucket of the SageMaker Feature Store.

In [None]:
transactions_feature_group_name = f"{stack_prefix}-agg-feature-group"
transactions_feature_group = FeatureGroup(
    name=transactions_feature_group_name, 
    sagemaker_session=feature_store_session
)

In [None]:
transactions_feature_group.describe()

Query the Data from FeatureStore using Athena

In [None]:
transactions_data_query = transactions_feature_group.athena_query()
transactions_data_table = transactions_data_query.table_name

query_string = f'SELECT * FROM "{transactions_data_table}"'

# run Athena query. The output is loaded to a Pandas dataframe.
# dataset = pd.DataFrame()
transactions_data_query.run(
    query_string=query_string,
    output_location="s3://" + model_artifacts_bucket + "/query_results/",
)
transactions_data_query.wait()
df = transactions_data_query.as_dataframe()

Simple Dataset ETL:
- sort the values by time
- convert the time column from string to Pandas TImestamp

In [None]:
df.sort_values(by="tx_minute", axis=0, ascending=True, inplace=True)
df["tx_minute"] = pd.to_datetime(df["tx_minute"])
df.set_index("tx_minute", drop=True, inplace=True)
df.head()

In [None]:
print(f"Number of data points in the dataset: {len(df)}")

## Train, Test and Validation Splits
As mentioned earlier, by default the data are aggregated over a 1 minute period and the model is trained to predict 30 data points into the future.

According to the [docomentation](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html):
>"Because lags are used, a model can look further back in the time series than the value specified for context_length. Therefore, you don't need to set this parameter to a large value. We recommend starting with the value that you used for prediction_length."

As a result we set `predition_length` (how far in the future we predict) to 30 and the `context_length` (how far in the past the model sees) to the same value.

### Amount of Data in each Dataset
By default we use 4 test windows, meaning that the total amount of data removed from the training dataset is `4*30=120` datapoints. We also keep 1 prediction window (`30` datapoints) to validate the model at the end.

In such a case, the minimum amount of data is:
`how much data in the passt is used for prediction (context_length) + the prediction window size (predition_length) * (number of test window + 1 validation window)`

In case there are not enough data, we set the prediction window size (`predition_length`) and validation window size (`validation_length`) to 5% of the amount of data. How far in the past the model sees (`context_length`) is set to the remaining amount of data after keeping data for esting and validation.

We recommend to run the data ingestion long enough to at least have the minimum amount of data points.

### Train, Test and Validation Data
Based on the above and how the DeepAR model works the various datasets are splitted as follow:
* The training dataset is all datapoints minus the last `4 * predition_length + validation_length` datapoints, which by default will be `4*30+30=150` datappoints
* The test datasets, like the training dataset, start at the beginning of the dataset. But each of the test dataset is longer than the previous one by the `predition_length` window size
* The validation data, is a set of data equivalent to the prediction window. For validation, the model will be given all data minus the `validation_length` (which is equal to `predition_length`) to predict future values. The predicted values will be compared to the validation to check the model quality.

__Example:__
If the dataset has 100 datapoints with `predition_length = context_length = validation_length = 5 datapoints` and 4 test windows, the datasets will be as follow:
* training data: [0:75]
* test data #1: [0:80]
* test data #2: [0:85]
* test data #3: [0:90]
* test data #4: [0:95]
* validation data: [95:100]

In [None]:
start_dataset = df.index.min()
end_dataset = df.index.max()
freq = "1min"
dataset_period = end_dataset - start_dataset
num_test_windows = 4
target_col = "avg_fee_1min"

In [None]:
total_nb_data_points = len(df)
prediction_length = 30
context_length = prediction_length
validation_length = prediction_length
min_data_length = context_length + prediction_length * (num_test_windows + 1)

In [None]:
if total_nb_data_points < min_data_length :
    prediction_length = int(total_nb_data_points  * 0.05)
    validation_length = prediction_length
    context_length = total_nb_data_points - prediction_length * (num_test_windows + 1)
test_windows_length = num_test_windows * prediction_length
training_test_ts_length = context_length + test_windows_length

### Data Formatting
The data have to be formated to the DeepAR format as described in [this documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html#deepar-inputoutput).

Data are of JSON format, following the this structure:
* the start time stamps (additional time stamps are derived from this start time stamps and the frequency parameter of the model)
* the target values, starting at the start time stamp

We do not use DeepAR's optional features and as such do not need other fields.

In [None]:
df_validation = df[-validation_length:]
df_train_test = df[:-validation_length]
df_train = df_train_test[:-test_windows_length]
 
training_data = [
    {
        "start": str(start_dataset),
        "target": list(df_train[target_col]),
    }
]
test_data = [
    {
        "start": str(start_dataset),
        "target": list(df_train_test.iloc[0 : -int((num_test_windows - k) * prediction_length), df_train_test.columns.get_loc(target_col)]),
    }
    for k in range(1, num_test_windows)
]
test_data.append(
    {
        "start": str(start_dataset),
        "target": list(df_train_test[target_col]),
    }
)

We write the the train and test datasets JSON dictionaries file to S3

In [None]:
def write_dicts_to_file(path, data):
    with open(path, "wb") as fp:
        for d in data:
            fp.write(json.dumps(d).encode("utf-8"))
            fp.write("\n".encode("utf-8"))

In [None]:
%%time
if not os.path.isdir(train_test_data_folder_prefix):
    os.mkdir(train_test_data_folder_prefix)
write_dicts_to_file(f"{train_test_data_folder_prefix}/train.json", training_data)
write_dicts_to_file(f"{train_test_data_folder_prefix}/test.json", test_data)

In [None]:
def copy_to_s3(local_file, s3_path, override=False):
    assert s3_path.startswith("s3://")
    split = s3_path.split("/")
    bucket = split[2]
    path = "/".join(split[3:])
    buk = s3.Bucket(bucket)

    if len(list(buk.objects.filter(Prefix=path))) > 0:
        if not override:
            print(f"File {s3_path} already exists.\nSet override to upload anyway.\n")
            return
        else:
            print("Overwriting existing file")
    with open(local_file, "rb") as data:
        print(f"Uploading file to {s3_path}")
        buk.put_object(Key=path, Body=data)

In [None]:
%%time
copy_to_s3(f"{train_test_data_folder_prefix}/train.json", s3_data_path + f"/train/train.json", override=True)
copy_to_s3(f"{train_test_data_folder_prefix}/test.json", s3_data_path + f"/test/test.json", override=True)

## Train a Model
### Initialize the Model

In [None]:
image_name = sagemaker.image_uris.retrieve("forecasting-deepar", region)
estimator = sagemaker.estimator.Estimator(
    image_uri=image_name,
    sagemaker_session=sagemaker_session,
    role=role,
    instance_count=1,
    instance_type="ml.c5.2xlarge",
    base_job_name="deepar-transaction-single",
    use_spot_instances=True,
    max_run=60*60-1,
    max_wait=60*60,
    output_path=s3_output_path,
)

data_channels = {"train": f"{s3_data_path}/train/", "test": f"{s3_data_path}/test/"}

### Train a Model
The hyperparameters are set based on a hyperparameter tuning job we ran once. If you run yourself the hyperparameter tuning job you might get different values based on your data. But again the objective here is not to train the best model but to have a model to test MLOps on AWS SageMaker.

In [None]:
hyperparameters = {
    "time_freq": freq,
    "epochs": "182",
    "early_stopping_patience": "40",
    "mini_batch_size": "77",
    "learning_rate": "4.5e-4",
    "context_length": str(context_length),
    "prediction_length": str(prediction_length),
    "likelihood": "negative-binomial"
}
estimator.set_hyperparameters(**hyperparameters)

In [None]:
%%time
estimator.fit(inputs=data_channels, wait=True)

## Create Endpoint and Predictor
A utility class is created to query the endpoint and perform predictions. This class is mainly copied from the AWS [DeepAR demo notebook](https://github.com/aws/amazon-sagemaker-examples/blob/main/introduction_to_amazon_algorithms/deepar_electricity/DeepAR-Electricity.ipynb)

__Note: Remember to delete the endpoint after running this experiment. A cell at the very bottom of this notebook will do that: make sure you run it at the end.__

In [None]:
class DeepARPredictor(sagemaker.predictor.Predictor):
    def __init__(self, *args, **kwargs):
        super().__init__(
            *args,
            # serializer=JSONSerializer(),
            serializer=IdentitySerializer(content_type="application/json"),
            **kwargs,
        )

    def predict(
        self,
        ts,
        freq=pd.Timedelta(1, "min"),
        cat=None,
        dynamic_feat=None,
        num_samples=100,
        return_samples=False,
        quantiles=["0.1", "0.5", "0.9"],
    ):
        """Requests the prediction of for the time series listed in `ts`, each with the (optional)
        corresponding category listed in `cat`.

        ts -- `pandas.Series` object, the time series to predict
        cat -- integer, the group associated to the time series (default: None)
        num_samples -- integer, number of samples to compute at prediction time (default: 100)
        return_samples -- boolean indicating whether to include samples in the response (default: False)
        quantiles -- list of strings specifying the quantiles to compute (default: ["0.1", "0.5", "0.9"])

        Return value: list of `pandas.DataFrame` objects, each containing the predictions
        """
        prediction_time = ts.index[-1] + freq
        quantiles = [str(q) for q in quantiles]
        req = self.__encode_request(ts, cat, dynamic_feat, num_samples, return_samples, quantiles)
        res = super(DeepARPredictor, self).predict(req)
        return self.__decode_response(res, freq, prediction_time, return_samples)

    def __encode_request(self, ts, cat, dynamic_feat, num_samples, return_samples, quantiles):
        instance = series_to_dict(
            ts, cat if cat is not None else None, dynamic_feat if dynamic_feat else None
        )

        configuration = {
            "num_samples": num_samples,
            "output_types": ["quantiles", "samples"] if return_samples else ["quantiles"],
            "quantiles": quantiles,
        }

        http_request_data = {"instances": [instance], "configuration": configuration}

        return json.dumps(http_request_data).encode("utf-8")

    def __decode_response(self, response, freq, prediction_time, return_samples):
        # we only sent one time series so we only receive one in return
        # however, if possible one will pass multiple time series as predictions will then be faster
        predictions = json.loads(response.decode("utf-8"))["predictions"][0]
        prediction_length = len(next(iter(predictions["quantiles"].values())))
        prediction_index = pd.date_range(
            start=prediction_time, freq=freq, periods=prediction_length
        )
        if return_samples:
            dict_of_samples = {"sample_" + str(i): s for i, s in enumerate(predictions["samples"])}
        else:
            dict_of_samples = {}
        return pd.DataFrame(
            data={**predictions["quantiles"], **dict_of_samples}, index=prediction_index
        )

    def set_frequency(self, freq):
        self.freq = freq


def encode_target(ts):
    return [x if np.isfinite(x) else "NaN" for x in ts]


def series_to_dict(ts, cat=None, dynamic_feat=None):
    """Given a pandas.Series object, returns a dictionary encoding the time series.

    ts -- a pands.Series object with the target time series
    cat -- an integer indicating the time series category

    Return value: a dictionary
    """
    obj = {"start": str(ts.index[0]), "target": encode_target(ts)}
    if cat is not None:
        obj["cat"] = cat
    if dynamic_feat is not None:
        obj["dynamic_feat"] = dynamic_feat
    return obj

In [None]:
predictor = estimator.deploy(
    initial_instance_count=1, instance_type="ml.m5.large", predictor_cls=DeepARPredictor
)

## Make Predictions and Plot Results
We convert the train-test data to a Pandas Series and we use it to predict data points as defined in `prediction_length`.
The predicted data are compared to the validation dataset to evaluate the model.

In [None]:
# Convert the context data to time series
ts=df.iloc[:-validation_length, df.columns.get_loc(target_col)]
# Convert the validation dataframe to a time series
target_ts=df.iloc[-validation_length:, df.columns.get_loc(target_col)]

df_predictions = predictor.predict(ts=ts, quantiles=[0.10, 0.5, 0.90])

In [None]:
def plot(
    predictor,
    ts,
    target_ts,
    freq=pd.Timedelta(1, "min"),
    cat=None,
    show_samples=False,
    confidence=80,
):
    forecast_start_date = target_ts.index[0]
    print(
        "calling served model to generate predictions starting from {}".format(str(forecast_start_date))
    )
    assert confidence > 50 and confidence < 100
    low_quantile = 0.5 - confidence * 0.005
    up_quantile = confidence * 0.005 + 0.5

    # we first construct the argument to call our model
    args = {
        "ts": ts,
        "return_samples": show_samples,
        "quantiles": [low_quantile, 0.5, up_quantile],
        "num_samples": 100,
    }

    fig = plt.figure(figsize=(20, 3))
    ax = plt.subplot(1, 1, 1)

    if cat is not None:
        args["cat"] = cat
        ax.text(0.9, 0.9, "cat = {}".format(cat), transform=ax.transAxes)

    # call the end point to get the prediction
    prediction = predictor.predict(**args)

    # plot the samples
    if show_samples:
        for key in prediction.keys():
            if "sample" in key:
                prediction[key].plot(color="lightskyblue", alpha=0.2, label="_nolegend_")

    # plot the target
    target_ts.plot(color="black", label="target")

    # plot the confidence interval and the median predicted
    ax.fill_between(
        prediction[str(low_quantile)].index,
        prediction[str(low_quantile)].values,
        prediction[str(up_quantile)].values,
        color="b",
        alpha=0.3,
        label="{}% confidence interval".format(confidence),
    )
    prediction["0.5"].plot(color="b", label="P50")
    ax.legend(loc=2)

    # fix the scale as the samples may change it
    ax.set_ylim(target_ts.min() * 0.5, target_ts.max() * 1.5)

In [None]:
plot(
        predictor,
        ts=ts,
        target_ts=target_ts,
        show_samples=False,
        confidence=60,
    )

## Cleanup
Delete the model & endpoint used for testing.

In [None]:
predictor.delete_model()
predictor.delete_endpoint()