# Training memory forecasting model on bitbrains traces
This notebooks trains a memory forecasting model with DeepAR on AWS SageMaker
It follows Jessie's example here: https://github.com/manifoldco/Time-Series-Forecasting-/blob/master/Manifold_AWS_DeepAR.py.ipynb

The data comes from the grid-workloads-archive and is the bitbrains business-critical workloads traces: gwa-t-12

This notebook will train download the data needed, munge it to timeseries, train a model, and test it. Just run all the cells.

In [None]:
import glob
import numpy as np
import os
import pandas as pd
import sagemaker

In [None]:
bucket = 'manifoldco-sagemaker'
prefix = 'hlnr-o/memory-forecasting/poc/'
S3_MODEL_PATH = os.path.join('s3://', bucket, prefix)
S3_DATA_PATH = os.path.join(S3_MODEL_PATH, 'data')

DEEPAR_IMAGE = '522234722520.dkr.ecr.us-east-1.amazonaws.com/forecasting-deepar:latest'

In [None]:
!wget http://gwa.ewi.tudelft.nl/fileadmin/pds/trace-archives/grid-workloads-archive/datasets/gwa-t-12/rnd.zip
!unzip rnd.zip -d ./data

## Load Data and Transform it to Timeseries

In [None]:
trace_files = glob.glob(os.path.join('./data/rnd/2013-*', "*.csv"))
traces = pd.concat([pd.read_csv(fp, sep=';\t').assign(VM=os.path.basename(fp).split('.')[0]) for fp in trace_files], ignore_index=True)
traces.head()

In [None]:
freq = '1min'
context_length = 30
prediction_length = 30

# generate timesampes
traces['Timestamp'] = pd.to_datetime(traces['Timestamp [ms]'], unit = 's')
traces.set_index('Timestamp', inplace=True)

# generate index and targets
traces["start"] = traces.index
traces['target'] = traces['Memory usage [KB]']

# sample for every minute for every vm
trace_timeseries  =  traces.groupby('VM').resample(freq, how={'target':np.mean})
trace_timeseries.reset_index(level=0, inplace=True)

# propogate non-null values forward and backward
trace_timeseries  = trace_timeseries.fillna(method='ffill')

In [None]:
# build train / test data sets by VM
timeseries_test, timeseries_training = [], []
vm_index_range = trace_timeseries['VM'].unique()
for i in vm_index_range:
    
    newseries = trace_timeseries[trace_timeseries['VM'] == i]['target']
    del newseries.index.name
    
    newseries.index = pd.to_datetime(newseries.index)
    timeseries_test.append(newseries)
    timeseries_training.append(newseries[:-prediction_length])


## Upload to data to S3

In [None]:
s3filesystem = s3fs.S3FileSystem()
encoding = 'utf-8'

def series_to_jsonline(ts):
    obj = {"start": str(ts.index[0]), "target": list(ts)}
    return json.dumps(obj)

with s3filesystem.open(os.path.join(S3_DATA_PATH, "train", "test_data.json"), 'wb') as fp:
    for ts in timeseries_test:
        fp.write(series_to_jsonline(ts).encode(encoding))
        fp.write('\n'.encode(encoding))
        

with s3filesystem.open(os.path.join(S3_DATA_PATH, "test", "train_data.json"), 'wb') as fp:
    for ts in timeseries_training:
        fp.write(series_to_jsonline(ts).encode(encoding))
        fp.write('\n'.encode(encoding))

## Train the model

In [None]:
sagemaker_session = sagemaker.Session()
estimator = sagemaker.estimator.Estimator(
    sagemaker_session=sagemaker_session,
    role='arn:aws:iam::223261615538:role/terraform-sagemaker-role',
    image_name=DEEPAR_IMAGE,
    train_instance_count=1,
    train_instance_type='ml.m5.4xlarge',
    base_job_name='memory-forecasting-poc',
    output_path=S3_MODEL_PATH
)

hyperparameters  = {
    "time_freq": freq,
    "context_length": context_length,
    "prediction_length": prediction_length,
    "num_cells": "32",
    "num_layers": "2",
    "likelihood": "student-t",
    "epochs": "20",
    "mini_batch_size": "32",
    "learning_rate": "0.001",
    "dropout_rate": "0.05",
    "early_stopping_patience": "10"
}

estimator.set_hyperparameters(**hyperparameters)

data_channels = {
    "train": "{}/train/".format(S3_DATA_PATH),
    "test": "{}/test/".format(S3_DATA_PATH)
}

estimator.fit(inputs=data_channels)

## Create an endpoint and test model

In [None]:
job_name = estimator.latest_training_job.name

endpoint_name = sagemaker_session.endpoint_from_job(
    job_name=job_name,
    initial_instance_count=1,
    instance_type='ml.m4.xlarge',
    deployment_image=DEEPAR_IMAGE,
    role='arn:aws:iam::223261615538:role/terraform-sagemaker-role'
)

In [None]:
class DeepARPredictor(sagemaker.predictor.RealTimePredictor):

    def set_prediction_parameters(self, freq, prediction_length):
        """Set the time frequency and prediction length parameters. This method **must** be called
        before being able to use `predict`.
        
        Parameters:
        freq -- string indicating the time frequency
        prediction_length -- integer, number of predicted time points
        
        Return value: none.
        """
        self.freq = freq
        self.prediction_length = prediction_length
        
    def predict(self, ts, cat=None, encoding="utf-8", num_samples=100, quantiles=["0.1", "0.5", "0.9"]):
        """Requests the prediction of for the time series listed in `ts`, each with the (optional)
        corresponding category listed in `cat`.
        
        Parameters:
        ts -- list of `pandas.Series` objects, the time series to predict
        cat -- list of integers (default: None)
        encoding -- string, encoding to use for the request (default: "utf-8")
        num_samples -- integer, number of samples to compute at prediction time (default: 100)
        quantiles -- list of strings specifying the quantiles to compute (default: ["0.1", "0.5", "0.9"])
        
        Return value: list of `pandas.DataFrame` objects, each containing the predictions
        """
        prediction_times = [x.index[-1]+1 for x in ts]
        req = self.__encode_request(ts, cat, encoding, num_samples, quantiles)
        res = super(DeepARPredictor, self).predict(req)
        return self.__decode_response(res, prediction_times, encoding)
    
    def __encode_request(self, ts, cat, encoding, num_samples, quantiles):
        instances = [series_to_obj(ts[k], cat[k] if cat else None) for k in range(len(ts))]
        configuration = {"num_samples": num_samples, "output_types": ["quantiles"], "quantiles": quantiles}
        http_request_data = {"instances": instances, "configuration": configuration}
        return json.dumps(http_request_data).encode(encoding)
    
    def __decode_response(self, response, prediction_times, encoding):
        response_data = json.loads(response.decode(encoding))
        list_of_df = []
        for k in range(len(prediction_times)):
            prediction_index = pd.DatetimeIndex(start=prediction_times[k], freq=self.freq, periods=self.prediction_length)
            list_of_df.append(pd.DataFrame(data=response_data['predictions'][k]['quantiles'], index=prediction_index))
        return list_of_df

    
def series_to_obj(ts, cat=None):
    obj = {"start": str(ts.index[0]), "target": list(ts)}
    if cat is not None:
        obj["cat"] = cat
    return obj

In [None]:
predictor  = DeepARPredictor(
    endpoint=endpoint_name,
    sagemaker_session=sagemaker_session,
    content_type="application/json"
)
predictor.set_prediction_parameters(freq, prediction_length)

new_time_series_training = []
for ts in timeseries_training:
    new_time_series_training.append(ts.asfreq('T'))

new_time_series_test = []
for ts in timeseries_test:
    new_time_series_test.append(ts.asfreq('T'))

In [None]:
list_of_df  = predictor.predict(new_time_series_training[100:101]) # predicted forecast
actual_data = new_time_series_test[100:101] # full data set


In [None]:
import matplotlib.pyplot as plt
for k in range(len(list_of_df)): 
    plt.figure(figsize=(12,6))
    actual_data[k][-prediction_length-context_length:].plot(label='Actual',linewidth = 2.5)
    p10 = list_of_df[k]['0.1'] 
    p90 = list_of_df[k]['0.9'] #set limits predictively
    plt.fill_between(p10.index, p10, p90, alpha=0.5, label='80% Confidence Interval')
    list_of_df[k]['0.5'].plot(label='Prediction Median', color = 'orange',linewidth = 2.5) # set requests for capacity allocation 
    plt.title("DeepAR Model Prediction", fontsize = 23)
    plt.ylabel("Memory Usage [KBs]", fontsize = 20)
    #plt.yticks([10,20.40,50])
    plt.xlabel("Time", fontsize = 20)
    (list_of_df[k]['0.9']+100).plot(label='My Suggested Provision', color = 'g',linewidth = 2.5) # set requests for capacity allocation 
    plt.yticks(fontsize=14);
    #plt.axhline(y=5851.99912, color='r', linestyle='-', label = 'Actual Provision')
    plt.xticks(fontsize=14);
    plt.legend(fontsize = 12,loc = 'best')
    #plt.savefig('VM101-withactual')
    plt.show()

In [None]:
sagemaker_session.delete_endpoint(endpoint_name)