In [8]:
import time
import io
import math
import random
import numpy as np
import pandas as pd
import jsons
import matplotlib.pyplot as plt
import boto3
import sagemaker
from sagemaker import get_execution_role

# set random seeds for reproducibility
np.random.seed(42)
random.seed(42)

In [9]:
bucket = "sagemaker-eu-west-1-112789546712"
data = "output"
model = "model"

sagemaker_session = sagemaker.Session()
role = get_execution_role()

s3_data_path = f"{bucket}/{data}"
s3_output_path = f"{bucket}/{model}/"

In [10]:
# Read single parquet file from S3
def pd_read_s3_parquet(key, bucket, s3_client=None, **args):
    if not s3_client:
        s3_client = boto3.client('s3')
    obj = s3_client.get_object(Bucket=bucket, Key=key)
    return pd.read_parquet(io.BytesIO(obj['Body'].read()), **args)

# Read multiple parquets from a folder on S3 generated by spark
def pd_read_s3_multiple_parquets(filepath, bucket, **args):
    if not filepath.endswith('/'):
        filepath = filepath + '/'  # Add '/' to the end
    
    s3_client = boto3.client('s3')   
    s3 = boto3.resource('s3')
    s3_keys = [item.key for item in s3.Bucket(bucket).objects.filter(Prefix=filepath)
               if item.key.endswith('.parquet')]
    if not s3_keys:
        print('No parquet found in', bucket, filepath)
    
    dfs = [pd_read_s3_parquet(key, bucket=bucket, s3_client=s3_client, **args) 
           for key in s3_keys]
    return pd.concat(dfs, ignore_index=True)

In [11]:
# get all retrieved parquet in a single dataframe with helpers functions
df = pd_read_s3_multiple_parquets(data, bucket)
df = df.iloc[:, :12] # get only relevant columns
df['hour'] = pd.to_datetime(df['timestamp']).dt.hour #add hour column for the timeseries format

# split in test and training
msk = np.random.rand(len(df)) < 0.8 # 80% mask

# Dividing in test and training
training_df = df[msk]
test_df = df[~msk]

In [12]:
from sagemaker.rl import RLEstimator, RLToolkit, RLFramework

estimator = RLEstimator(
    entry_point='src/train-coach.py',
    sagemaker_session=sagemaker_session,
    toolkit=RLToolkit.COACH,
    toolkit_version='0.11.1',
    framework=RLFramework.TENSORFLOW,
    role=role,
    instance_count=1,
    instance_type="ml.c5.2xlarge",
    output_path=f"s3://{s3_output_path}",
    hyperparameters = {
    "time_freq": 'H',
    "context_length": '24',
    "prediction_length": '24',
    "num_cells": "40",
    "num_layers": "3",
    "likelihood": "gaussian",
    "epochs": "20",
    "mini_batch_size": "32",
    "learning_rate": "0.001",
    "dropout_rate": "0.05",
    "early_stopping_patience": "10",
    }
)

In [14]:
#data_channels = {"train": train_path, "test": test_path}
estimator.fit()

2021-07-29 15:18:21 Starting - Starting the training job...
2021-07-29 15:18:45 Starting - Launching requested ML instancesProfilerReport-1627571901: InProgress
...
2021-07-29 15:19:17 Starting - Preparing the instances for training.........
2021-07-29 15:20:51 Downloading - Downloading input data
2021-07-29 15:20:51 Training - Training image download completed. Training in progress.
2021-07-29 15:20:51 Uploading - Uploading generated training model
2021-07-29 15:20:51 Failed - Training job failed
[34mbash: cannot set terminal process group (-1): Inappropriate ioctl for device[0m
[34mbash: no job control in this shell[0m
[34m2021-07-29 15:20:45,926 sagemaker-containers INFO     Imported framework sagemaker_tensorflow_container.training[0m
[34m2021-07-29 15:20:45,931 sagemaker-containers INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2021-07-29 15:20:46,050 sagemaker-containers INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2021-07-29 15:20:46,

UnexpectedStatusException: Error for Training job sagemaker-rl-tensorflow-2021-07-29-15-18-21-291: Failed. Reason: AlgorithmError: ExecuteUserScriptError:
Command "/usr/bin/python train-coach.py --context_length 24 --dropout_rate 0.05 --early_stopping_patience 10 --epochs 20 --learning_rate 0.001 --likelihood gaussian --mini_batch_size 32 --num_cells 40 --num_layers 3 --prediction_length 24 --time_freq H"

In [None]:
job_name = estimator._current_job_name
print("Job name: {}".format(job_name))

s3_url = "s3://{}/{}".format(s3_bucket, job_name)

if local_mode:
    output_tar_key = "{}/output.tar.gz".format(job_name)
else:
    output_tar_key = "{}/output/output.tar.gz".format(job_name)

intermediate_folder_key = "{}/output/intermediate/".format(job_name)
output_url = "s3://{}/{}".format(s3_bucket, output_tar_key)
intermediate_url = "s3://{}/{}".format(s3_bucket, intermediate_folder_key)

print("S3 job path: {}".format(s3_url))
print("Output.tar.gz location: {}".format(output_url))
print("Intermediate folder path: {}".format(intermediate_url))

tmp_dir = "/tmp/{}".format(job_name)
os.system("mkdir {}".format(tmp_dir))
print("Create local folder {}".format(tmp_dir))

In [None]:
%matplotlib inline
import pandas as pd

csv_file_name = "worker_0.simple_rl_graph.main_level.main_level.agent_0.csv"
key = os.path.join(intermediate_folder_key, csv_file_name)
wait_for_s3_object(s3_bucket, key, tmp_dir, training_job_name=job_name)

csv_file = "{}/{}".format(tmp_dir, csv_file_name)
df = pd.read_csv(csv_file)
df = df.dropna(subset=["Training Reward"])
x_axis = "Episode #"
y_axis = "Training Reward"

plt = df.plot(x=x_axis, y=y_axis, figsize=(12, 5), legend=True, style="b-")
plt.set_ylabel(y_axis)
plt.set_xlabel(x_axis);

In [None]:
wait_for_s3_object(s3_bucket, output_tar_key, tmp_dir, training_job_name=job_name)

if not os.path.isfile("{}/output.tar.gz".format(tmp_dir)):
    raise FileNotFoundError("File output.tar.gz not found")
os.system("tar -xvzf {}/output.tar.gz -C {}".format(tmp_dir, tmp_dir))

if local_mode:
    checkpoint_dir = "{}/data/checkpoint".format(tmp_dir)
else:
    checkpoint_dir = "{}/checkpoint".format(tmp_dir)

print("Checkpoint directory {}".format(checkpoint_dir))

In [None]:
if local_mode:
    checkpoint_path = "file://{}".format(checkpoint_dir)
    print("Local checkpoint file path: {}".format(checkpoint_path))
else:
    checkpoint_path = "s3://{}/{}/checkpoint/".format(s3_bucket, job_name)
    if not os.listdir(checkpoint_dir):
        raise FileNotFoundError("Checkpoint files not found under the path")
    os.system("aws s3 cp --recursive {} {}".format(checkpoint_dir, checkpoint_path))
    print("S3 checkpoint file path: {}".format(checkpoint_path))

In [None]:
#Run the evaluation step
estimator_eval = RLEstimator(
    role=role,
    source_dir="src/",
    dependencies=["common/sagemaker_rl"],
    toolkit=RLToolkit.COACH,
    toolkit_version="0.11.0",
    framework=RLFramework.MXNET,
    entry_point="evaluate-coach.py",
    instance_count=1,
    instance_type=instance_type,
    base_job_name=job_name_prefix + "-evaluation",
    hyperparameters={"RLCOACH_PRESET": "preset-cartpole-clippedppo", "evaluate_steps": 2000},
)

estimator_eval.fit({"checkpoint": checkpoint_path})

In [None]:
# Deploy for real time prediction
job_name = estimator.latest_training_job.name

endpoint_name = sagemaker_session.endpoint_from_job(
    job_name=job_name,
    initial_instance_count=1,
    instance_type='ml.m5.2xlarge',
    role=role
)

predictor = sagemaker.predictor.RealTimePredictor(
    endpoint_name, 
    sagemaker_session=sagemaker_session, 
    content_type="application/json")

In [None]:
sagemaker_session.delete_endpoint(endpoint_name)