# Timeseries analysis on Amazon SageMaker

We will try to predict values in the series CO, NO2, NOx, O3, PM10, SO2, there are various stations and we would like to segregate the data by station. We are using the Madrid Air Quality dataset from Kaggle here: https://www.kaggle.com/decide-soluciones/air-quality-madrid#madrid.h5
Our Notebook instance comes installed with wget, which you can use, if you have an account on Kaggle.
You can issue the following shell commands from the same directory as your Notebook, to get the file from Kaggle.

`wget --user=rahulns73 --ask-password https://www.kaggle.com/decide-soluciones/air-quality-madrid/downloads/air-quality-madrid.zip`

You can then unzip the file using the unzip utility, thus,

`unzip <path/output-file-name>`

We chose to use the madrid_2001.csv file, you can choose another if you so desire. You may need to analyze the file before proceeding with the rest of this exercise.

Lets look at the records from 2001

In [None]:
import os
import boto3
from sagemaker import get_execution_role
from sagemaker import Session
from sagemaker import estimator

# Get our role setup
role = get_execution_role()
bucket = 'rnszsdemo' ## Replace with your bucket name
prefix = 'sagemaker/data/Madrid_Air_Quality' ## Replace with the folder structure inside your bucket or simply ''

Lets import all the python packages we think we might need

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import time
import sagemaker.amazon.common as smac

In [None]:
# Lets get the data, start with building a pandas dataframe
data = pd.read_csv('s3://{}/{}/madrid_2001.csv'.format(bucket,prefix),parse_dates=True,index_col=0)

# Copy the index column
data['processedDate'] = data.index

Take a look at the data, what have we got??

In [None]:
data.head()

Deal with the missing values and create separate dataframes for each station. We find the stations, then create a separate dataframe for the readings from each of the stations.

In [None]:
stnlist = list(data['station'].unique())
stnelem = [data.loc[data['station'] == stn] for stn in stnlist]
stnelem[3].head()

Looks like hourly data, but we also know that there are a bunch of NaNs in this data. Many of the columns contain NaNs but those aren't of much interest to us anyway, so we are going to forward fill those. Nevertheless, we don't need to do that when using DeepAR.

In [None]:
cleanstnelem = [elem.drop(columns=['BEN','EBE','MXY','NMHC','OXY','PXY','TCH','TOL']) for elem in stnelem]
eachstn = [] ## Lets keep this final data from each station here.
for clelem in cleanstnelem:
    eachstn.append(clelem.fillna(method='ffill'))
eachstn[0].head()

Now that we have our data preprocessed, lets see what it looks like, lest we need to work on it a little more.

## Lets plot this data for some of the stations

In [None]:
eachstn[0]['O_3'][:'2001-01-25'].plot(figsize=(20,10))

So lets try to predict the various series, in our case O_3 monthly. 

In [None]:
trainingSet = []
testSet = []

## Set up the start times for our series, this is 1 to 12
startTimes = eachstn[0].groupby([eachstn[0].index.month]).first()['processedDate'].tolist()

## Don't want to keep these undefined
trainingSet.append([0])
testSet.append([0])

## Lets get all the training examples
for month in range(1,9): ##choose months January to August
    trainingSet.append(eachstn[0]['O_3'][:'2001-{}-25'.format(month)].tolist())
    testSet.append(eachstn[0]['O_3']['2001-{}-25'.format(month):'2001-{}-28'.format(month)].tolist())


Now we create our input dataset and test set. Once we are done with this, we can run our training.

In [None]:
def writeData(fname,dataset):
    f = open(fname,'w')
    for i in range(1,len(trainingSet)):
        line = "\"start\":\"{}\",\"target\":{}".format(str(startTimes[i-1]),dataset[i])
        f.write('{'+line+'}\n')
    f.close()
    
writeData('trainingData.json',trainingSet)
writeData('testData.json',testSet)

Lets take a look at the file we have generated for this exercise.

In [None]:
!cat testData.json

## Now let us upload the data to S3
We will use Amazon SageMaker's own function to do that. You can take a look at it __[here](http://sagemaker.readthedocs.io/en/latest/session.html)__.

In [None]:
training_key = 'trainingData.json'
testing_key = 'testData.json'
train_prefix   = '{}/{}'.format(prefix, 'train')
test_prefix    = '{}/{}'.format(prefix, 'test')
output_prefix = '{}/{}'.format(prefix, 'output')

## Lets create the sagemaker session and upload the data from where Amazon SageMaker will pick it 
## up to put in the container
sg_sess = Session()
training_path  = sg_sess.upload_data(training_key, bucket=bucket, key_prefix=train_prefix)
testing_path = sg_sess.upload_data(testing_key, bucket=bucket, key_prefix=test_prefix)

## Lets set up the training job

We first set up the containers we intend to use and then create an estimator

In [None]:
containers = {
    'us-east-1': '522234722520.dkr.ecr.us-east-1.amazonaws.com/forecasting-deepar:latest',
    'us-east-2': '566113047672.dkr.ecr.us-east-2.amazonaws.com/forecasting-deepar:latest',
    'us-west-2': '156387875391.dkr.ecr.us-west-2.amazonaws.com/forecasting-deepar:latest',
    'eu-west-1': '224300973850.dkr.ecr.eu-west-1.amazonaws.com/forecasting-deepar:latest'
}

b3session = boto3.session.Session()
myregion = b3session.region_name

img = containers[myregion]

estimator = estimator.Estimator(
    sagemaker_session=sg_sess,
    image_name=img,
    role=role,
    train_instance_count=1,
    train_instance_type='ml.c4.8xlarge',
    base_job_name='Madrid-Air-Quality',
    output_path='s3://{}/{}'.format(bucket,output_prefix)
)

## Looking at the Hyperparameters for DeepAR, here they are,

__time_freq__ : The granularity of the time series in the dataset. Required. Use time_freq to select appropriate date features and lags. The model supports the following basic frequencies. It also supports multiples of these basic frequencies. For example, 5min specifies a frequency of 5 minutes.
 - M for monthly
 - W for weekly
 - D for daily
 - H for hourly
 - min for every minute

__prediction_length__ : The number of time-steps that the model is trained to predict, also called the forecast horizon. The trained model always generates forecasts with this length. It can't generate longer forecasts. The prediction_length is fixed when a model is trained and it cannot be changed later. 

__context_length__ : The number of time-points that the model gets to see before making the prediction. The value for this parameter should be about the same as the prediction_length. The model also receives lagged inputs from the target, so context_length can be much smaller than typical seasonalities. For example, a daily time series can have yearly seasonality. The model automatically includes a lag of one year, so the context length can be shorter than a year. The lag values that the model picks depend on the frequency of the time series. For example, lag values for daily frequency are previous week, 2 weeks, 3 weeks, 4 weeks, and year.

__likelihood__ : The model generates a probabilistic forecast, and can provide quantiles of the distribution and return samples. Depending on your data, select an appropriate likelihood (noise model) that is used for uncertainty estimates. The following likelihoods can be selected:
**gaussian** : Use for real-valued data
**beta** : Use for real-valued targets between 0 and 1 incl.
**negative-binomial** : Use for count data (non-negative integers).
**student-T** : An alternative for real-valued data that works well for bursty data.
**deterministic-L1** : A loss function that does not estimate uncertainty and only learns a point forecast.

__epochs__ : The maximum number of passes over the training data. The optimal value depends on your data size and learning rate. See also early_stopping_patience. Typical values range from 10 to 1000.

There are more...if you want to know more check __[here](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar_hyperparameters.html)__.

# There are some recommended best practices that would be good to consider when using DeepAR. You can find them __[here](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html)__.

In [None]:
prediction_length = 72 # Three days ahead?!
hyperparameters = {
    "time_freq": 'H', # hourly series
    "context_length": prediction_length,
    "prediction_length": prediction_length, # number of data points to predict
    "num_cells": "40", #default
    "num_layers": "2",
    "likelihood": "gaussian",
    "epochs": "50",
    "mini_batch_size": "64",
    "learning_rate": "0.01",
    "dropout_rate": "0.05"#,
#    "early_stopping_patience": "10" # what is this ??
}

estimator.set_hyperparameters(**hyperparameters)

# Now lets train this baby!! :-)
Note that when you call `fit()`, there is no option but to see the training job name in the console, it will be preceeded with the string `INFO`.

In [None]:
data_channels = {"train": training_path, "test": testing_path}
estimator.fit(inputs=data_channels)

Next we get the name of the model we just trained to use later. 

In [None]:
h_level_api_model = estimator.latest_training_job.name

## Alternatively, you could be more sophisticated about the training,

You can use the low-level SDK for Python which provides the create_traning_job method which maps to the _CreateTrainingJob_ Amazon SageMaker API. You will also be required to define the training parameters that give you more finegrained control over the training job.
You can also use a _Waiter_ to wait until the training job is over.

In [None]:
from time import gmtime, strftime

job_name = 'MyDeepARTraining-' + strftime("%Y-%m-%d-%H-%M-%S",gmtime())
print(job_name)
                                
## We have already setup the containers above
                                
## We choose a different output location to keep these separate from our old artifacts
output_location = 's3://{}/{}/output-low-level-training'.format(bucket,prefix)
print('training artifacts will be uploaded to: {}'.format(output_location))

create_training_params = \
{
    "AlgorithmSpecification": {
        "TrainingImage": img,
        "TrainingInputMode": "File"
    },
    "RoleArn": role,
    "OutputDataConfig": {
        "S3OutputPath": output_location
    },
    "ResourceConfig": {
        "InstanceCount": 1,
        "InstanceType": "ml.c4.8xlarge",
        "VolumeSizeInGB": 50
    },
    "TrainingJobName": job_name,
    "HyperParameters": {
        "time_freq": 'H', # hourly series
        "context_length": str(prediction_length*3),
        "prediction_length": str(prediction_length), # number of data points to predict
        "num_cells": "40", #default
        "num_layers": "2",
        "likelihood": "gaussian",
        "epochs": "50",
        "mini_batch_size": "64",
        "learning_rate": "0.01",
        "dropout_rate": "0.05"
    },
    "StoppingCondition": {
        "MaxRuntimeInSeconds": 60 * 60
    },
    "InputDataConfig": [
        {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": training_path,
                    "S3DataDistributionType": "FullyReplicated"
                }
            },
            "CompressionType": "None",
            "RecordWrapperType": "None"
        },
        {
            "ChannelName": "test",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": testing_path,
                    "S3DataDistributionType": "FullyReplicated"
                }
            },
            "CompressionType": "None",
            "RecordWrapperType": "None"
        }
    ]
}

sgmaker = boto3.client('sagemaker')
sgmaker.create_training_job(**create_training_params)

## Lets check the status of the job to see if its complete, and lets wait until its done
status = sgmaker.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
print(status)
try:
    sgmaker.get_waiter('training_job_completed_or_stopped').wait(TrainingJobName=job_name)
finally:
    status = sgmaker.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
    print("Training job ended with status: " + status)
    if status == 'Failed':
        message = sgmaker.describe_training_job(TrainingJobName=job_name)['FailureReason']
        print('Training failed with the following error: {}'.format(message))
        raise Exception('Training job failed')

## Deploying the model to Amazon SageMaker hosting services

There are 3 steps to host a model in Amazon SageMaker,
 - Create a model in Amazon SageMaker
 - Create an endpoint configuration
 - Create an endpoint

Using the high level API, all it takes is a simple call to _deploy_, like so,
    
    my_predictor = myalgo.deploy(initial_instance_count=2,instance_type='ml.m4.xlarge')

Lets use the low level API and use the models we generated above before(see above) to do this.

### Create a model

In [None]:
modelname = job_name
print(modelname)

## Lets find out where our model artifacts have been stored, first for the model artifacts from the low level SDK
info = sgmaker.describe_training_job(TrainingJobName=job_name)
modeldata = info['ModelArtifacts']['S3ModelArtifacts']
primary_ctnr = {
    'Image': img,
    'ModelDataUrl': modeldata
}

## Create the actual model
cmr = sgmaker.create_model(
    ModelName = job_name,
    ExecutionRoleArn = role,
    PrimaryContainer = primary_ctnr)

## You should now be able to get the model ARN
print(cmr['ModelArn'])

### Alternatively, if you have created a model using the high-level SDK, you can create the model like so,

In [None]:
model_name = h_level_api_model 
information = sgmaker.describe_training_job(TrainingJobName=model_name) 

## All of the rest is pretty much similar
model_data = information['ModelArtifacts']['S3ModelArtifacts']
primary_container = {
    'Image': img,
    'ModelDataUrl': model_data
}

## Create the actual model
cmrv2 = sgmaker.create_model(
    ModelName = model_name,
    ExecutionRoleArn = role,
    PrimaryContainer = primary_container)

## You should now be able to get the model ARN
print(cmrv2['ModelArn'])

### So now we have two models, we would like to send 50% of the traffic to each of the endpoints, lets create an endpoint configuration for this.

In [None]:
endpoint_config_name = 'MyDeepAREP-'+strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_config_name)
create_endpoint_config_response = sgmaker.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants = [{
            "InitialInstanceCount": 1,
            "InitialVariantWeight": 2,
            "InstanceType": "ml.m4.xlarge",
            "ModelName": model_name, ## The highlevel model we trained first
            "VariantName": model_name
        },
        {
            "InitialInstanceCount": 1,
            "InitialVariantWeight": 2,
            "InstanceType": "ml.m4.xlarge",
            "ModelName": modelname, ## The lowlevel model we trained second
            "VariantName": modelname            
        }
    ]
)
print("Endpoint Config Arn: "+create_endpoint_config_response['EndpointConfigArn'])

### Finally, create the endpoint

In [None]:
endpoint_name = 'MyDeepAREndpoint-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_name)
create_endpoint_response = sgmaker.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name)
print(create_endpoint_response['EndpointArn'])

resp = sgmaker.describe_endpoint(EndpointName=endpoint_name)
status = resp['EndpointStatus']
print("Status: " + status)

try:
    sgmaker.get_waiter('endpoint_in_service').wait(EndpointName=endpoint_name)
finally:
    resp = sgmaker.describe_endpoint(EndpointName=endpoint_name)
    status = resp['EndpointStatus']
    print("Arn: " + resp['EndpointArn'])
    print("Create endpoint ended with status: " + status)

    if status != 'InService':
        message = sgmaker.describe_endpoint(EndpointName=endpoint_name)['FailureReason']
        print('Training failed with the following error: {}'.format(message))
        raise Exception('Endpoint creation did not succeed')


# Application autoscaling can be configured using __[this](https://docs.aws.amazon.com/sagemaker/latest/dg/endpoint-auto-scaling-add-policy.html)__.

In [None]:
ReqSet = []
ValidationChkSet = [] 

## Lets create a helper function for the creation of the request
def createRequest(fname,dataset):
    f = open(fname,'w')
    line1 = "\"start\":\"{}\",\"target\":{}".format(str(startTimes[8]),dataset[1])
    # We want only 10 samples for this exercise
    line2 = "\"instances\": [{"+line1+"}],\"configuration\": {\"num_samples\": 50,\"output_types\": [\"mean\",\"quantiles\",\"samples\"],\"quantiles\": [\"0.1\",\"0.9\"]}"
    f.write('{'+line2+'}\n')
    f.close()

## Don't want to keep these undefined
ReqSet.append([0])
ValidationChkSet.append([0])    

## Lets get what we need for our request first, we will use month 9, remember we have trained the model on the first 8 months.
ReqSet.append(eachstn[0]['O_3'][:'2001-{}-25'.format(9)].tolist()) # You may __not__ want to **hardcode** this

## The below will be used to check how we are predicting
ValidationChkSet.append(eachstn[0]['O_3']['2001-{}-25'.format(9):'2001-{}-28'.format(9)].tolist())


In [None]:
createRequest('RequestData.json',ReqSet)
createRequest('ValidationData.json',ValidationChkSet)

Lets see how the request files have turned out, note we didn't really need validation data to be put into the request format, but its easier to look at smaller data than the larger request data that has more data points

In [None]:
!cat ValidationData.json

### Lets do the prediction
We obtained the endpoint name from above endpoint creation process

In [None]:
runtime = boto3.Session().client('sagemaker-runtime')
## Create bytes for the payload
with open("ValidationData.json","rb") as f:
    reqcontents = f.read()
payload = reqcontents

## This comes from the previous call that gives us our endpoint
response = runtime.invoke_endpoint(EndpointName=endpoint_name,ContentType='application/json',Body=payload)

### So now we have the predictions! Lets look at what we got.

In [None]:
print(response)

So we can see from the above many of the details, we are specifically interested in the `InvokedProductionVariant`. What we get is a `botocore.response.StreamingBody`, thats where your predictions are. You need to be able to read the `StreamingBody` to get to the predictions.
The Stream that we get from the `response`(above) can be used only once as it doesn't survive across calls. We get the whole thing in one read call, not really a best practice. You may want to read fewer bytes at a time by passing the `read()` a bytes parameter.
Storing the stream in `datamy` helps me access the data later, as and when I want, now we can proceed with choosing to display one of the forecasts.
Getting string data. If you want the `StreamingBody`, you would have to call invoke_endpoint again!!! Of course! :-)
since we have already read the stream.

In [None]:
import json

my_json = (response['Body']).read()
datamy = json.loads(my_json)

## Lets look at the predictions

In [None]:
import random

q1 = 0.1
q2 = 0.9

y_data = datamy['predictions'][0]
y_mean = y_data['mean']
y_q1 = y_data['quantiles'][str(q1)]
y_q2 = y_data['quantiles'][str(q2)]

## Since we now have everything, lets plot this to compare the predictions with real values

In [None]:
x = range(0,prediction_length) 
plt.gcf().clear()
fig = plt.figure(figsize=(20,10))
meanlbl, = plt.plot(x,y_mean,'k--',label='mean')
q1lbl, = plt.plot(x,y_q1,'g',label=q1)
q2lbl, = plt.plot(x,y_q2,'b',label=q2)
truth_data = eachstn[0]['O_3']['2001-{}-26'.format(9):'2001-{}-28'.format(9)].tolist()
gtlbl, = plt.plot(x,truth_data,'m',label='truth')
plt.legend(handles=[meanlbl,q1lbl,q2lbl,gtlbl])
plt.show()

We obviously need more training or need to tune our hyperparameters better. That bring us to 
### Automated Model Tuning

Refer to this __[page](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar-tuning.html)__ for a detailed explanation of how to tune DeepAR models, and this __[page](https://docs.aws.amazon.com/sagemaker/latest/dg/automatic-model-tuning.html)__ for a detailed explanation of Automatic Model Tuning in Amazon SageMaker.
Some other tools that you can use for automatic model tuning in Python : __[Spearmint](https://github.com/JasperSnoek/spearmint)__, __[MOE](https://github.com/Yelp/MOE)__, __[HyperOpt](https://github.com/hyperopt/hyperopt)__ and __[SMAC](https://github.com/automl/SMAC3)__.
We will use AMT to tune our model like so,
 - We create an estimator, just like we did when we wanted to create the model (see above)
 - Then we setup the hyperparameter ranges that AMT will will explore to get the optimal model
 - Setup the `tuner`
 - Then run tuner.fit()
 
 To analyze the results of the hyperparameter optimzation task, use __[this notebook](https://github.com/awslabs/amazon-sagemaker-examples/blob/master/hyperparameter_tuning/analyze_results/HPO_Analyze_TuningJob_Results.ipynb)__.

In [None]:
from sagemaker import estimator

training_key = 'trainingData.json'
testing_key = 'testData.json'
train_prefix   = '{}/{}'.format(prefix, 'train-HPO')
test_prefix    = '{}/{}'.format(prefix, 'test-HPO')
output_prefix = '{}/{}'.format(prefix, 'output-HPO')

## Lets create the sagemaker session
sg_sessV2 = Session()
training_path  = sg_sess.upload_data(training_key, bucket=bucket, key_prefix=train_prefix)
testing_path = sg_sess.upload_data(testing_key, bucket=bucket, key_prefix=test_prefix)

estimatorHPO = estimator.Estimator(
    sagemaker_session=sg_sessV2,
    image_name=img,
    role=role,
    train_instance_count=2,
    train_instance_type='ml.c4.8xlarge',
    base_job_name='Madrid-Air-Quality-With-HPO',
    output_path='s3://{}/{}'.format(bucket,output_prefix)
)
prediction_length = 72 # Three days ahead?!
## Here we don't have the hyperparameters that we intend to tune
hyperparameters = {
    "time_freq": 'H', # hourly series
    "context_length": prediction_length,
    "prediction_length": prediction_length, # number of data points to predict
    "num_cells": "40", #default
    "num_layers": "2",
    "likelihood": "gaussian",
    "epochs": "50",
    "mini_batch_size": "128",
    "learning_rate": "0.01",
    "dropout_rate": "0.05"#,
#    "early_stopping_patience": "10" # stop if loss hasn't improved in 10 epochs
}

estimatorHPO.set_hyperparameters(**hyperparameters)

### Set up the hyperparameter ranges
The HPO algorithm used by Amazon SageMaker will explore the ranges specified. With continuous ranges, it can explore 
a lot more values than with integer ranges.

In [None]:
from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner

hyperparameter_ranges = {'learning_rate': ContinuousParameter(0.01, 0.2),'num_layers': IntegerParameter(2,5)}

### Metric definitions
If you are using a builtin algorithm, like we are here, you should __not__ provide the metric definition.
At other times, like if you are using your own say, tensorflow script __OR__ your own framework, you __will__ definitely have to provide metric definitions.

In [None]:
objective_metric_name = 'train:final_loss'
objective_type = 'Minimize'

In [None]:
tuner = HyperparameterTuner(estimatorHPO,objective_metric_name,hyperparameter_ranges,max_jobs=1,max_parallel_jobs=1,objective_type=objective_type)

The parameter `include_cls_metadata=False` to the `fit()` method is required as Amazon SageMaker HPO for DeepAR will not work without it. To get more detailed information on why this argument to `fit()` is required, check __[here](https://github.com/aws/sagemaker-python-sdk/blob/master/README.rst)__.
To check the status of your Hyperparameter tuning job, you can use this notebook (or the code in it): In your notebook instance click the `SageMaker Examples` tab, go to the section on `Hyperparameter Tuning`, `Use` the notebook `HPO_Analyze_TuningJob_Results.ipynb`. In this notebook, replace the name of the Hyperparameter tuning job with the one you get from the `fit()` method below.

In [None]:
tuner.fit({'train': training_path, 'test': testing_path},include_cls_metadata=False)

### Remember to delete your endpoint.

In [None]:
cl = boto3.client('sagemaker')
response = cl.delete_endpoint(EndpointName=endpoint_name)