**Configure execution roles**

In [1]:
import sagemaker

# SageMaker Execution Role
# You can use sagemaker.get_execution_role() if running inside sagemaker's notebook instance
sagemaker_execution_role = sagemaker.get_execution_role() #Replace with ARN if not in an AWS SageMaker notebook

# paste the StepFunctionsWorkflowExecutionRole ARN from above
workflow_execution_role = 'arn:aws:iam::118600533013:role/StepFunctionsWorkflowExecutionRole'

In [2]:
%%sh
pip -q install --upgrade stepfunctions

You should consider upgrading via the '/home/ec2-user/anaconda3/envs/mxnet_latest_p37/bin/python -m pip install --upgrade pip' command.


**Import the required modules**

In [3]:
import sys
from dateutil.parser import parse
import logging
import uuid
import stepfunctions
import json
import datetime
import os
import requests
import datetime
import time
import boto3
import s3fs
import pandas as pd
import numpy as np
from datetime import timedelta

from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker.transformer import Transformer
from stepfunctions import steps
from stepfunctions.steps import TrainingStep, ModelStep, TransformStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow
from stepfunctions.template import TrainingPipeline
from stepfunctions.template.utils import replace_parameters_with_jsonpath

sagemaker_session = sagemaker.Session()
stepfunctions.set_stream_logger(level=logging.INFO)

region = boto3.Session().region_name

s3_bucket = sagemaker.Session().default_bucket()  # replace with an existing bucket if needed
s3_prefix = "bitcoin-notebook"  # prefix used for all data stored within the bucket

s3_data_path = "s3://{}/{}/data".format(s3_bucket, s3_prefix)
s3_output_path = "s3://{}/{}/output".format(s3_bucket, s3_prefix)

Next, we configure the container image to be used for the region that we are running in.

In [4]:
image_uri = get_image_uri(boto3.Session().region_name, "forecasting-deepar")

**Prepare the dataset**

As a first step, we need to download data set.

In [5]:
today = datetime.date.today()
today_365 = today - datetime.timedelta(days = 365)
unix_today = time.mktime(today.timetuple())
unix_today_365 = time.mktime(today_365.timetuple())


r = requests.get(
        "https://api.coingecko.com/api/v3/coins/bitcoin/market_chart/range?vs_currency=usd&from={}&to={}".format(
            unix_today_365,unix_today
        )
    )
r_2 = requests.get(
        "https://api.coingecko.com/api/v3/coins/ethereum/market_chart/range?vs_currency=usd&from={}&to={}".format(
            unix_today_365,unix_today
        )
    )

data = pd.DataFrame(r.json()['prices'],columns = ['Date', 'Price_bitcoin']) 
data['Date'] = pd.to_datetime(data['Date'].astype(int), unit = 'ms')

eth = np.array(r_2.json()['prices'])[:,1]
data['Price_ethereum'] = eth

Then, we load and parse the dataset and convert it to a collection of Pandas time series, which makes common time series operations such as indexing by time periods or resampling much easier.

### Train and Test splits

Often times one is interested in evaluating the model or tuning its hyperparameters by looking at error metrics on a hold-out test set. Here we split the available data into train and test sets for evaluating the trained model. For standard machine learning tasks such as classification and regression, one typically obtains this split by randomly separating examples into train and test sets. However, in forecasting it is important to do this train/test split based on time rather than by time series.

In this example, we will reserve the last section of each of the time series for evalutation purpose and use only the first part as training data. 

In [6]:
# we use 1 Day frequency for the time series
freq = "1D"

# we predict for 7 days
prediction_length = 7

# we also use 7 days as context length, this is the number of state updates accomplished before making predictions
context_length = 7

We specify here the portion of the data that is used for training: the model sees data from 2014-01-01 to 2014-09-01 for training.

In [7]:
start_dataset = pd.Timestamp("2021-02-02 00:00:00", freq=freq)
end_training = pd.Timestamp("2022-02-02 00:00:00", freq=freq)

The DeepAR JSON input format represents each time series as a JSON object. In the simplest case each time series just consists of a start time stamp (``start``) and a list of values (``target``). For more complex cases, DeepAR also supports the fields ``dynamic_feat`` for time-series features and ``cat`` for categorical features, which we will use  later.

In [8]:
dt_train_range = (start_dataset,
                  end_training - datetime.timedelta(days=7) )

# Use entire data for testing
# We can compare predicted values vs actual (i.e. last 12 days is withheld for testing and model hasn't seen that data)
dt_test_range = (start_dataset, 
                 end_training) 

In [9]:
targets = ['Price_bitcoin','Price_ethereum']
data = data.set_index('Date')
data.index.freq = 'd'

time_series_test = []
time_series_training = []

for target in targets:
    time_series_test.append(data[dt_test_range[0]:dt_test_range[1]][target])
    time_series_training.append(data[dt_train_range[0]:dt_train_range[1]][target])
    
# time_series_test = [data[dt_test_range[0]:dt_test_range[1]][target]]
# time_series_training = [data[dt_train_range[0]:dt_train_range[1]][target]]

In [10]:
training_data = [
    {
        "start": str(start_dataset),
        "target": ts[
            start_dataset : end_training - timedelta(days=1)
        ].tolist(),  # We use -1, because pandas indexing includes the upper bound
    }
    for ts in time_series_training
]

In [11]:
test_data = [
    {
        "start": str(start_dataset),
        "target": ts[
            start_dataset : end_training - timedelta(days=1)
        ].tolist(),  # We use -1, because pandas indexing includes the upper bound
    }
    for ts in time_series_test
]

In [12]:
def write_dicts_to_file(path, data):
    with open(path, "wb") as fp:
        for d in data:
            fp.write(json.dumps(d).encode("utf-8"))
            fp.write("\n".encode("utf-8"))

In [13]:
%%time
write_dicts_to_file("train.json", training_data)
write_dicts_to_file("test.json", test_data)

CPU times: user 2.87 ms, sys: 0 ns, total: 2.87 ms
Wall time: 2.72 ms


In [14]:
s3 = boto3.resource("s3")


def copy_to_s3(local_file, s3_path, override=False):
    assert s3_path.startswith("s3://")
    split = s3_path.split("/")
    bucket = split[2]
    path = "/".join(split[3:])
    buk = s3.Bucket(bucket)

    if len(list(buk.objects.filter(Prefix=path))) > 0:
        if not override:
            print(
                "File s3://{}/{} already exists.\nSet override to upload anyway.\n".format(
                    s3_bucket, s3_path
                )
            )
            return
        else:
            print("Overwriting existing file")
    with open(local_file, "rb") as data:
        print("Uploading file to {}".format(s3_path))
        buk.put_object(Key=path, Body=data)

In [15]:
%%time
copy_to_s3("train.json", s3_data_path + "/train/train.json")
copy_to_s3("test.json", s3_data_path + "/test/test.json")

File s3://sagemaker-us-east-1-118600533013/s3://sagemaker-us-east-1-118600533013/bitcoin-notebook/data/train/train.json already exists.
Set override to upload anyway.

File s3://sagemaker-us-east-1-118600533013/s3://sagemaker-us-east-1-118600533013/bitcoin-notebook/data/test/test.json already exists.
Set override to upload anyway.

CPU times: user 24.1 ms, sys: 0 ns, total: 24.1 ms
Wall time: 81.5 ms


Now that we have the data files locally, let us copy them to S3 where DeepAR can access them. Depending on your connection, this may take a couple of minutes.

We are all set with our dataset processing, we can now call DeepAR to train a model and generate predictions.

### Train a model

Here we define the estimator that will launch the training job.

In [16]:
estimator = sagemaker.estimator.Estimator(
    image_uri =  image_uri,
    sagemaker_session=sagemaker_session,
    role=sagemaker_execution_role,
    train_instance_count=1,
    train_instance_type="ml.c4.2xlarge",
    base_job_name="deepar-bitcoin",
    output_path=s3_output_path,
)

Next we need to set the hyperparameters for the training job. For example frequency of the time series used, number of data points the model will look at in the past, number of predicted data points. The other hyperparameters concern the model to train (number of layers, number of cells per layer, likelihood function) and the training options (number of epochs, batch size, learning rate...). We use default parameters for every optional parameter in this case (you can always use [Sagemaker Automated Model Tuning](https://aws.amazon.com/blogs/aws/sagemaker-automatic-model-tuning/) to tune them).

In [17]:
hyperparameters = {
    "time_freq": freq,
    "epochs": "400",
    "early_stopping_patience": "40",
    "mini_batch_size": "64",
    "learning_rate": "5E-4",
    "context_length": str(context_length),
    "prediction_length": str(prediction_length),
}

In [18]:
estimator.set_hyperparameters(**hyperparameters)

**Define the input schema for a workflow execution**


The ExecutionInput API defines the options to dynamically pass information to a workflow at runtime.

The following cell defines the fields that must be passed to your workflow when starting an execution.

While the workflow is usually static after it is defined, you may want to pass values dynamically that are used by steps in your workflow. To help with this, the SDK provides a way to create placeholders when you define your workflow. These placeholders can be dynamically assigned values when you execute your workflow.

ExecutionInput values are accessible to each step of your workflow. You have the ability to define a schema for this placeholder collection, as shown in the cell below. When you execute your workflow the SDK will verify if the dynamic input conforms to the schema you defined.

In [19]:
# SageMaker expects unique names for each job, model and endpoint. 
# If these names are not unique the execution will fail. Pass these
# dynamically for each execution using placeholders.
execution_input = ExecutionInput(schema={
    'JobName': str, 
    'ModelName': str
})

**Create the training step**

In the following cell we create the training step and pass the estimator we defined above. See TrainingStep in the AWS Step Functions Data Science SDK documentation.

In [20]:
training_step = steps.TrainingStep(
    'Train Step', 
    estimator=estimator,
    data = {
        "train": "{}/train/".format(s3_data_path),
        "test": "{}/test/".format(s3_data_path)
    },
    job_name=execution_input['JobName']
)

**Create the model step**

In the following cell we define a model step that will create a model in SageMaker using the artifacts created during the TrainingStep. See ModelStep in the AWS Step Functions Data Science SDK documentation.

The model creation step typically follows the training step. The Step Functions SDK provides the get_expected_model method in the TrainingStep class to provide a reference for the trained model artifacts. Please note that this method is only useful when the ModelStep directly follows the TrainingStep.

In [21]:
model_step = steps.ModelStep(
    'Save model',
    model=training_step.get_expected_model(),
    model_name=execution_input['ModelName']  
)

**Create the transform step**

In the following cell we create the transform step. See TransformStep in the AWS Step Functions Data Science SDK documentation.

In [22]:
from sagemaker.predictor import json_serializer

transform_step = steps.TransformStep(
    'Transform Input Dataset',
    transformer=estimator.transformer(
        instance_count=1,
        instance_type='ml.m5.large',
        strategy='MultiRecord',
        output_path= '{}/batch-prediction'.format(s3_data_path),
        assemble_with='Line'
    ),
    job_name=execution_input['JobName'],     
    model_name=execution_input['ModelName'], 
    data='{}/test'.format(s3_data_path),
    split_type='Line'
)

**Chain together steps for your workflow**

Create your workflow definition by chaining the steps together. See Chain in the AWS Step Functions Data Science SDK documentation.

In [23]:
workflow_definition = steps.Chain([
    training_step,
    model_step,
    transform_step
])

Create your workflow using the workflow definition above, and render the graph with render_graph.

In [24]:
from time import strftime, gmtime
timestamp = strftime('%d-%H-%M-%S', gmtime())

workflow = Workflow(
    name='{}-{}'.format('MyTrainTransformDeploy_v1', timestamp),
    definition=workflow_definition,
    role=workflow_execution_role,
    execution_input=execution_input
)

In [25]:
workflow.render_graph()

Create the workflow in AWS Step Functions with create.

In [26]:
workflow.create()

[32m[INFO] Workflow created successfully on AWS Step Functions.[0m


'arn:aws:states:us-east-1:118600533013:stateMachine:MyTrainTransformDeploy_v1-06-08-14-50'

In [27]:
execution = workflow.execute(
    inputs={
        'JobName': 'regression-{}'.format(uuid.uuid1().hex), # Each Sagemaker Job requires a unique name
        'ModelName': 'regression-{}'.format(uuid.uuid1().hex) # Each Model requires a unique name,
    }
)

[32m[INFO] Workflow execution started successfully on AWS Step Functions.[0m


Render workflow progress with the render_progress.

This generates a snapshot of the current state of your workflow as it executes. This is a static image. Run the cell again to check progress.

In [32]:
execution.render_progress()

Use list_events to list all events in the workflow execution.

In [33]:
workflow.list_executions(html=True)

Name,Status,Started,End Time
6de8c3d1-4361-47eb-b812-c1a184760975,SUCCEEDED,"Feb 06, 2022 08:14:54.227 AM","Feb 06, 2022 08:25:14.888 AM"


In [34]:
workflow.list_executions(html=True)

Name,Status,Started,End Time
6de8c3d1-4361-47eb-b812-c1a184760975,SUCCEEDED,"Feb 06, 2022 08:14:54.227 AM","Feb 06, 2022 08:25:14.888 AM"


In [37]:
template = workflow.get_cloudformation_template()

with open('workflow.json', 'w') as f:
    f.write(template)



In [39]:
!cat workflow.json

AWSTemplateFormatVersion: '2010-09-09'
Description: CloudFormation template for AWS Step Functions - State Machine
Resources:
  StateMachineComponent:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      StateMachineName: MyTrainTransformDeploy_v1-06-08-14-50
      DefinitionString: |-
        {
          "StartAt": "Train Step",
          "States": {
            "Train Step": {
              "Resource": "arn:aws:states:::sagemaker:createTrainingJob.sync",
              "Parameters": {
                "AlgorithmSpecification": {
                  "TrainingImage": "522234722520.dkr.ecr.us-east-1.amazonaws.com/forecasting-deepar:1",
                  "TrainingInputMode": "File"
                },
                "OutputDataConfig": {
                  "S3OutputPath": "s3://sagemaker-us-east-1-118600533013/bitcoin-notebook/output"
                },
                "StoppingCondition": {
                  "MaxRuntimeInSeconds": 86400
                },


Now we can deploy the model and create and endpoint that can be queried using our custom DeepARPredictor class.

In [29]:
# predictor = estimator.deploy(
#     initial_instance_count=1, instance_type="ml.m5.large", predictor_cls=DeepARPredictor
# )

-------!

In [30]:
#time_series_test[0][:-30]

Date
2021-02-05    36816.508082
2021-02-06    38007.832229
2021-02-07    39279.412869
2021-02-08    38833.340265
2021-02-09    46307.574122
                  ...     
2021-12-30    46506.994648
2021-12-31    47191.868390
2022-01-01    46319.651088
2022-01-02    47816.077676
2022-01-03    47387.212168
Freq: D, Name: Price_bitcoin, Length: 333, dtype: float64

In [31]:
#predictor.predict(ts=time_series_test[0][:-30], quantiles=[0.10, 0.5, 0.90]).head()

Unnamed: 0,0.1,0.5,0.9
2022-01-04,44409.601562,47335.433594,49665.105469
2022-01-05,44897.566406,47590.519531,50451.5625
2022-01-06,45012.765625,47385.144531,50605.820312
2022-01-07,44393.175781,47944.53125,51394.910156
2022-01-08,43859.03125,47146.96875,51238.199219
