In [None]:
bucket = 'resource-usage-data' # S3 bucket for training and model data
prefix = 'sagemaker/resource-usage-prediction' 
 
# Define IAM role
import sagemaker

import sagemaker.predictor
from sagemaker.predictor import csv_serializer, json_deserializer
import boto3
import  s3fs
import re
from sagemaker import get_execution_role
import json
import math
from os import path
import sagemaker.amazon.common as smac
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import io
import os
import time
import json
import glob

role = get_execution_role()

sagemaker_session = sagemaker.Session()
role = get_execution_role()
region = boto3.Session().region_name
smclient = boto3.Session().client('sagemaker')

s3_data_path = "{}/{}/data".format(bucket, prefix)
s3_output_path = "{}/{}/output".format(bucket, prefix)

# configure container image to be used for the region we are running in.
# Should be the same as the region of our S3 bucket.
containers = {
    'us-east-1': '522234722520.dkr.ecr.us-east-1.amazonaws.com/forecasting-deepar:latest',
}
image_name = containers[boto3.Session().region_name]

In [None]:
# Download the data and extract .zip file
!wget http://gwa.ewi.tudelft.nl/fileadmin/pds/trace-archives/grid-workloads-archive/datasets/gwa-t-12/rnd.zip

import zipfile
with zipfile.ZipFile("rnd.zip","r") as zip_ref: zip_ref.extractall("targetdir")

In [None]:
# Load the data into one data frame
files = glob.glob(os.path.join('targetdir/rnd/2013-7', "*.csv"))
files_first200 = files[:300]
dfs = [pd.read_csv(fp, sep = ';\t', engine='python').assign(VM=os.path.basename(fp).split('.')[0]) for fp in files_first200]
df = pd.concat(dfs, ignore_index=True)

files2 = glob.glob(os.path.join('targetdir/rnd/2013-8', "*.csv"))
files2_first200 = files2[:300]
dfs2 = [pd.read_csv(fp, sep = ';\t', engine='python').assign(VM=os.path.basename(fp).split('.')[0]) for fp in files2_first200]
df2 = pd.concat(dfs2, ignore_index=True)

files3 = glob.glob(os.path.join('targetdir/rnd/2013-9', "*.csv"))
files3_first200 = files3[:300]
dfs3 = [pd.read_csv(fp, sep = ';\t', engine='python').assign(VM=os.path.basename(fp).split('.')[0]) for fp in files3_first200]
df3 = pd.concat(dfs3, ignore_index=True)

print('done!')

In [None]:
data1 = df.append(df2)
data2 = data1.append(df3)
data_frame = data2
data_frame.head()

In [None]:
# Formatting
data_frame['Timestamp'] = pd.to_datetime(data_frame['Timestamp [ms]'], unit = 's')
data_frame.describe()
data_frame['weekday'] = data_frame['Timestamp'].dt.dayofweek
data_frame['weekend'] = ((data_frame.weekday) // 5 == 1).astype(float)

# Feature engineering with the date
data_frame['month']=data_frame.Timestamp.dt.month 
data_frame['day']=data_frame.Timestamp.dt.day
data_frame.set_index('Timestamp',inplace=True)
data_frame["CPU usage prev"] = data_frame['CPU usage [%]'].shift(1)
data_frame["CPU_diff"] = data_frame['CPU usage [%]'] - data_frame["CPU usage prev"]
data_frame["received_prev"] = data_frame['Network received throughput [KB/s]'].shift(1)
data_frame["received_diff"] = data_frame['Network received throughput [KB/s]']- data_frame["received_prev"]
data_frame["transmitted_prev"] = data_frame['Network transmitted throughput [KB/s]'].shift(1)
data_frame["transmitted_diff"] = data_frame['Network transmitted throughput [KB/s]']- data_frame["transmitted_prev"]

In [None]:
data_frame["start"] = data_frame.index
data_frame['target'] = data_frame['CPU usage [MHZ]']

In [None]:
df2 = data_frame.groupby('VM').resample('1min', how={'target':np.mean})

In [None]:
df3 = data_frame.groupby('VM').resample('1min', how={'CPU capacity provisioned [MHZ]':np.mean})

In [None]:
df3.reset_index(level=0, inplace=True)
df3 = df3.fillna(method='ffill')

In [None]:
df2.reset_index(level=0, inplace=True)
df2 = df2.fillna(method='ffill')

In [None]:
#format data into json

freq = "1min"
context_length = 30
prediction_length = 30

def series_to_obj(ts, cat=None):
    obj = {"start": str(ts.index[0]), "target": list(ts)}
    if cat is not None:
        obj["cat"] = cat
    return obj

def series_to_jsonline(ts, cat=None):
    return json.dumps(series_to_obj(ts, cat))



time_series_test=[]
vm_index_range = df2['VM'].unique()
for i in vm_index_range:
    newseries = df2[df2['VM'] == i]['target']
    del newseries.index.name
    newseries.index = pd.to_datetime(newseries.index)
    time_series_test.append(newseries)
    
    
time_series_training=[]
vm_index_range = df2['VM'].unique()
for i in vm_index_range:
    newseries = df2[df2['VM'] == i]['target']
    del newseries.index.name
    newseries.index = pd.to_datetime(newseries.index)
    time_series_training.append(newseries[:-prediction_length])



In [None]:
# push the json data to S# bucket

s3filesystem = s3fs.S3FileSystem()

In [None]:
#This functions converts the test and train data into JSON lines for Sagemaker

encoding = "utf-8"

with s3filesystem.open(s3_data_path + "/test/test_data.json", 'wb') as fp:
    for ts in time_series_test:
        fp.write(series_to_jsonline(ts).encode(encoding))
        fp.write('\n'.encode(encoding))


In [None]:
with s3filesystem.open(s3_data_path + "/train/train_data.json", 'wb') as fp:
    for ts in time_series_training:
        fp.write(series_to_jsonline(ts).encode(encoding))
        fp.write('\n'.encode(encoding))

In [None]:
# set hyperparameters

estimator = sagemaker.estimator.Estimator(
    sagemaker_session=sagemaker_session,
    image_name=image_name,
    role=role,
    train_instance_count=1,
    train_instance_type='ml.c4.xlarge',
    base_job_name='test-demo-deepar',
    output_path="s3://" + s3_output_path
)

In [None]:
hyperparameters  = {
    "time_freq": freq,
    "context_length": context_length,
    "prediction_length": prediction_length,
    "num_cells": "32",
    "num_layers": "2",
    "likelihood": "student-t",
    "epochs": "20",
    "mini_batch_size": "32",
    "learning_rate": "0.001",
    "dropout_rate": "0.05",
    "early_stopping_patience": "10"
}


In [None]:
# The next line tells SageMaker to start an EC2 instance, 
# download the data from S3, start training the model and save the trained model.

estimator.set_hyperparameters(**hyperparameters)

In [None]:
# Train model

data_channels = {
    "train": "s3://{}/train/".format(s3_data_path),
    "test": "s3://{}/test/".format(s3_data_path)
}

estimator.fit(inputs=data_channels)


In [None]:
# create endpoint and predictor
job_name = estimator.latest_training_job.name

endpoint_name = sagemaker_session.endpoint_from_job(
    job_name=job_name,
    initial_instance_count=1,
    instance_type='ml.m4.xlarge',
    deployment_image=image_name,
    role=role
)


In [None]:
class DeepARPredictor(sagemaker.predictor.RealTimePredictor):

    def set_prediction_parameters(self, freq, prediction_length):
        self.freq = freq
        self.prediction_length = prediction_length
        
    def predict(self, ts, cat=None, encoding="utf-8", num_samples=100, quantiles=["0.1", "0.5", "0.9"]):
        prediction_times = [x.index[-1]+1 for x in ts]
        req = self.__encode_request(ts, cat, encoding, num_samples, quantiles)
        res = super(DeepARPredictor, self).predict(req)
        return self.__decode_response(res, prediction_times, encoding)
    
    def __encode_request(self, ts, cat, encoding, num_samples, quantiles):
        instances = [series_to_obj(ts[k], cat[k] if cat else None) for k in range(len(ts))]
        configuration = {"num_samples": num_samples, "output_types": ["quantiles"], "quantiles": quantiles}
        http_request_data = {"instances": instances, "configuration": configuration}
        return json.dumps(http_request_data).encode(encoding)
    
    def __decode_response(self, response, prediction_times, encoding):
        response_data = json.loads(response.decode(encoding))
        list_of_df = []
        for k in range(len(prediction_times)):
            prediction_index = pd.DatetimeIndex(start=prediction_times[k], freq=self.freq, periods=self.prediction_length)
            list_of_df.append(pd.DataFrame(data=response_data['predictions'][k]['quantiles'], index=prediction_index))
        return list_of_df


In [None]:
predictor = DeepARPredictor(
    endpoint=endpoint_name,
    sagemaker_session=sagemaker_session,
    content_type="application/json"
)
predictor.set_prediction_parameters(freq, prediction_length)


In [None]:
# visualize Model Predictions

new_time_series_training = []
for ts in time_series_training:
    new_time_series_training.append(ts.asfreq('T'))

In [None]:
new_time_series_test = []
for ts in time_series_test:
    new_time_series_test.append(ts.asfreq('T'))


In [None]:
list_of_df  = predictor.predict(new_time_series_training[1:2]) # predicted forecast
actual_data = new_time_series_test[1:2] # full data set

In [None]:
for k in range(len(list_of_df)): 
    plt.style.use('seaborn-white')
    plt.figure(figsize=(12,6))
    actual_data[k][-prediction_length-context_length:].plot(label='Actual',linewidth = 2.5)
    p10 = list_of_df[k]['0.1'] 
    p90 = list_of_df[k]['0.9'] #set limits predictively
    plt.fill_between(p10.index, p10, p90, alpha=0.5, label='80% Confidence Interval')
    list_of_df[k]['0.5'].plot(label='Prediction Median', color = 'orange',linewidth = 2.5) # set requests for capacity allocation 
    plt.title("DeepAR Model Prediction", fontsize = 23)
    plt.ylabel("CPU usage [MHz]", fontsize = 20)
    #plt.yticks([10,20.40,50])
    plt.xlabel("Time", fontsize = 20)
    (list_of_df[k]['0.9']+100).plot(label='My Suggested Provision', color = 'g',linewidth = 2.5) # set requests for capacity allocation 
    plt.yticks(fontsize=14);
    #plt.axhline(y=5851.99912, color='r', linestyle='-', label = 'Actual Provision')
    plt.xticks(fontsize=14);
    plt.legend(fontsize = 12,loc = 'best')
    #plt.savefig('VM101-withactual')
    plt.show()
