# Introduction

This notebook outlines the steps involved in building and deploying a Battlesnake model using Ray RLlib and TensorFlow on Amazon SageMaker.

Library versions currently in use:  TensorFlow 2.1, Ray RLlib 0.8.2

The model is first trained using multi-agent PPO, and then deployed to a managed _TensorFlow Serving_ SageMaker endpoint that can be used for inference.

In [1]:
import sagemaker
from sagemaker.rl import RLEstimator, RLToolkit
import boto3

## Initialise sagemaker
We need to define several parameters prior to running the training job. 

In [2]:
sm_session = sagemaker.session.Session()
s3_bucket = sm_session.default_bucket()

s3_output_path = 's3://{}/'.format(s3_bucket)
print("S3 bucket path: {}".format(s3_output_path))

S3 bucket path: s3://sagemaker-us-west-2-752200179490/


In [3]:
# job_name_prefix = 'Battlesnake-job-rllib-hete'

role = sagemaker.get_execution_role()
print(role)

arn:aws:iam::752200179490:role/service-role/AmazonSageMaker-ExecutionRole-20200224T083055


In [4]:
# Change local_mode to True if you want to do local training within this Notebook instance
# Otherwise, we'll spin-up a SageMaker training instance to handle the training

local_mode = False

if local_mode:
    instance_type = 'local'
else:
    instance_type = "SAGEMAKER_TRAINING_INSTANCE_TYPE"
    
# If training locally, do some Docker housekeeping..
if local_mode:
    !/bin/bash ./common/setup.sh

# Train your model here

In [5]:
region = sm_session.boto_region_name
device = "cpu"
cpu_image_name = '462105765813.dkr.ecr.{region}.amazonaws.com/sagemaker-rl-ray-container:ray-0.8.2-tf-{device}-py36'.format(region=region, device=device)

### Single instance, multiple core

In [None]:
%%time

# Define and execute our training job
# Adjust hyperparameters and train_instance_count accordingly

metric_definitions = RLEstimator.default_metric_definitions(RLToolkit.RAY) + [{'Name': 'episode_len_mean',
  'Regex': 'episode_len_mean: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'}, {'Name': 'episodes_total',
  'Regex': 'episodes_total: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'}, {'Name': 'num_steps_trained',
  'Regex': 'num_steps_trained: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'}, {'Name': 'training_iteration',
  'Regex': 'training_iteration: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'}]

job_name_prefix = 'anna-1m5-singlenode'
    
estimator = RLEstimator(entry_point="train-mabs-dqn.py",
                        source_dir='training/training_src',
                        dependencies=["training/common/sagemaker_rl", "../BattlesnakeGym/"],
                        image_name=cpu_image_name,
                        role=role,
                        train_instance_type=instance_type,
                        train_instance_count=1,
                        output_path=s3_output_path,
                        base_job_name=job_name_prefix,
                        metric_definitions=metric_definitions,
                        hyperparameters={
                            # See train-mabs.py to add additional hyperparameters
                            # Also see ray_launcher.py for the rl.training.* hyperparameters
                            #
                            # number of training iterations
                            "num_iters": 5000,
                            # number of snakes in the gym
                            "num_agents": 5,
                        }
                    )

estimator.fit(wait=False)

job_name = estimator.latest_training_job.job_name
print("Training job: %s" % job_name)

### Homogeneous scaling

In [48]:
job_name_prefix = 'anna-homogenous-3m5-async-largebatch'

train_instance_count = 3

estimator = RLEstimator(entry_point="train-mabs-dqn-async.py",
                            source_dir='rllib_src',
                            dependencies=["rllib_common/sagemaker_rl", "battlesnake_gym/"],
                            image_name=cpu_image_name, # gpu_image_name if we have
                            role=role,
                            train_instance_type=instance_type,
                            train_instance_count=train_instance_count,
                            output_path=s3_output_path,
                            base_job_name=job_name_prefix,
                            metric_definitions=metric_definitions,
                            hyperparameters={
                                # 3 m5.xl with 4 cores each. We have to leave 1 core for ray scheduler.
                                # Don't forget to change this on the basis of instance type.
                                "rl.training.config.num_workers": (4 * train_instance_count) - 1,
                                "rl.training.config.num_gpus": 0,
                                "num_iters": 5000,
                                "num_agents": 5,
                            }
                        )

estimator.fit(wait=False)
job_name = estimator.latest_training_job.job_name
print("Primary Training job: %s" % job_name)

Primary Training job: anna-homogenous-3m5-async-largebatch-2020-04-24-02-37-24-168


### Heterogeneous scaling

Setup VPC. It will allow the two SageMaker jobs to communicate over network.

In [40]:
ec2 = boto3.client('ec2')
default_vpc = [vpc['VpcId'] for vpc in ec2.describe_vpcs()['Vpcs'] if vpc["IsDefault"] == True][0]

default_security_groups = [group["GroupId"] for group in ec2.describe_security_groups()['SecurityGroups'] \
                   if group["GroupName"] == "default" and group["VpcId"] == default_vpc]

default_subnets = [subnet["SubnetId"] for subnet in ec2.describe_subnets()["Subnets"] \
                  if subnet["VpcId"] == default_vpc and subnet['DefaultForAz']==True]

print("Using default VPC:", default_vpc)
print("Using default security group:", default_security_groups)
print("Using default subnets:", default_subnets)

Using default VPC: vpc-f427bc8c
Using default security group: ['sg-8c1dddd6']
Using default subnets: ['subnet-dd9d5b80', 'subnet-68342523', 'subnet-6c0cf814', 'subnet-405d236b']


A SageMaker job running in VPC mode cannot access S3 resources. So, we need to create a VPC S3 endpoint to allow S3 access from SageMaker container. 

In [41]:
from IPython.display import HTML, Markdown
# from markdown_helper import generate_help_for_s3_endpoint_permissions, create_s3_endpoint_manually

aws_region = boto3.Session().region_name

try:
    route_tables = [route_table["RouteTableId"] for route_table in ec2.describe_route_tables()['RouteTables']\
                if route_table['VpcId'] == default_vpc]
except Exception as e:
    if "UnauthorizedOperation" in str(e):
        display(Markdown(generate_help_for_s3_endpoint_permissions(role)))
    else:
        display(Markdown(create_s3_endpoint_manually(aws_region, default_vpc)))
    raise e

print("Trying to attach S3 endpoints to the following route tables:", route_tables)

assert len(route_tables) >= 1, "No route tables were found. Please follow the VPC S3 endpoint creation "\
                              "guide by clicking the above link."

try:
    ec2.create_vpc_endpoint(DryRun=False,
                           VpcEndpointType="Gateway",
                           VpcId=default_vpc,
                           ServiceName="com.amazonaws.{}.s3".format(aws_region),
                           RouteTableIds=route_tables)
    print("S3 endpoint created successfully!")
except Exception as e:
    if "RouteAlreadyExists" in str(e):
        print("S3 endpoint already exists.")
    elif "UnauthorizedOperation" in str(e):
#         display(Markdown(generate_help_for_s3_endpoint_permissions(role)))
        raise e
    else:
#         display(Markdown(create_s3_endpoint_manually(aws_region, default_vpc)))
        raise e

Trying to attach S3 endpoints to the following route tables: ['rtb-60c6821b']
S3 endpoint already exists.


In [8]:
gpu_image_name = '462105765813.dkr.ecr.{region}.amazonaws.com/sagemaker-rl-ray-container:ray-0.8.2-tf-gpu-py36'.format(region=region)

In [42]:
primary_cluster_instance_type = "ml.p2.xlarge" #"ml.m5.4xlarge" # "ml.p2.xlarge"
primary_cluster_instance_count = 1

secondary_cluster_instance_type = "ml.m5.xlarge"
secondary_cluster_instance_count = 2


total_cpus = 16 + 4*2 - 1 # Leave one for ray scheduler
# p2.8
total_cpus = 4 + 4*2 - 1 # Leave one for ray scheduler

total_gpus = 1


In [49]:
s3_prefix = "dist-ray-%sGPU-%sCPUs-%s-dqn-async-largebatch" % (total_gpus, total_cpus, primary_cluster_instance_type)
s3_prefix

'dist-ray-1GPU-11CPUs-ml.p2.xlarge-dqn-async-largebatch'

In [50]:
sage_session = sagemaker.session.Session()
s3_output_path = 's3://{}/'.format(s3_bucket) # SDK appends the job name and output folder

# We explicitly need to specify these params so that the two jobs can synchronize using the metadata stored here
s3_bucket = sage_session.default_bucket()

# Make sure that the prefix is empty
!aws s3 rm --recursive s3://{s3_bucket}/{s3_prefix}

In [51]:
secondary_cluster_instance_type

'ml.m5.xlarge'

In [52]:
    
job_name_prefix = 'anna-ray-heter-dist-1p2x-2m5-dqn-async'
# primary_cluster_instance_type = "ml.m5.xlarge"
primary_cluster_instance_count = 1

primary_cluster_estimator = RLEstimator(entry_point="train-mabs-dqn-async.py",
                            source_dir='rllib_src',
                            dependencies=["rllib_common/sagemaker_rl", "battlesnake_gym/"],
                            image_name=gpu_image_name, # gpu_image_name if we have
                            role=role,
                            train_instance_type=primary_cluster_instance_type,
                            train_instance_count=primary_cluster_instance_count,
                            output_path=s3_output_path,
                            base_job_name=job_name_prefix,
                            metric_definitions=metric_definitions,
                            train_max_run=int(3600) * 5, # Maximum runtime in seconds
                            hyperparameters={
                                "s3_prefix": s3_prefix, # Important for syncing
                                "s3_bucket": s3_bucket, # Important for syncing
                                "aws_region": boto3.Session().region_name, # Important for S3 connection
                                "rl_cluster_type": "primary", # Important for syncing
                                "rl_num_instances_secondary": secondary_cluster_instance_count, # Important for syncing
                                "rl.training.config.num_workers": total_cpus,
                                "rl.training.config.num_gpus": total_gpus,
#                                 "rl.training.config.train_batch_size": int(64) * total_cpus,
                                "num_iters": 5000,
                                "num_agents": 5,
                            },
                            subnets=default_subnets, # Required for VPC mode
                            security_group_ids=default_security_groups # Required for VPC mode
                        )

primary_cluster_estimator.fit(wait=False)
primary_job_name = primary_cluster_estimator.latest_training_job.job_name
print("Primary Training job: %s" % primary_job_name)

Primary Training job: anna-ray-heter-dist-1p2x-2m5-dqn-async-2020-04-24-02-51-57-252


In [53]:
secondary_cluster_estimator = RLEstimator(entry_point="train-mabs-dqn-async.py",
                            source_dir='rllib_src',
                            dependencies=["rllib_common/sagemaker_rl", "battlesnake_gym/"],
                            image_name=cpu_image_name,
                            role=role,
                            train_instance_type=secondary_cluster_instance_type,
                            train_instance_count=secondary_cluster_instance_count,
                            output_path=s3_output_path,
                            base_job_name=job_name_prefix,
                            metric_definitions=metric_definitions,
                            train_max_run=int(3600) * 5 * 2, # Maximum runtime in seconds
                            hyperparameters={
                                "s3_prefix": s3_prefix, # Important for syncing
                                "s3_bucket": s3_bucket, # Important for syncing
                                "aws_region": boto3.Session().region_name, # Important for S3 connection
                                "rl_cluster_type": "secondary", # Important for syncing
                                "rl.training.config.num_workers": total_cpus,
                                "rl.training.config.num_gpus": total_gpus,
#                                 "rl.training.config.train_batch_size": int(64) * total_cpus,
                                "num_iters": 5000,
                                "num_iters": 5000,
                                "num_agents": 5,
                            },
                            subnets=default_subnets, # Required for VPC mode
                            security_group_ids=default_security_groups # Required for VPC mode
                        )

secondary_cluster_estimator.fit(wait=False)
secondary_job_name = secondary_cluster_estimator.latest_training_job.job_name
print("Secondary Training job: %s" % secondary_job_name)

Secondary Training job: anna-ray-heter-dist-1p2x-2m5-dqn-async-2020-04-24-02-51-57-832


In [None]:
# Where is the model stored in S3?
estimator.model_data

# Create an endpoint to host the policy
Firstly, we will delete the previous endpoint and model

In [None]:
sm_client = boto3.client(service_name='sagemaker')
sm_client.delete_endpoint(EndpointName='battlesnake-endpoint')
sm_client.delete_endpoint_config(EndpointConfigName='battlesnake-endpoint')
sm_client.delete_model(ModelName="battlesnake-rllib")

In [None]:
# Copy the endpoint to a central location
model_data = "s3://{}/battlesnake-aws/pretrainedmodels/model.tar.gz".format(s3_bucket)
!aws s3 cp {estimator.model_data} {model_data}

from sagemaker.tensorflow.serving import Model

model = Model(model_data=model_data,
              role=role,
              entry_point="inference.py",
              source_dir='rllib_inference/src',
              framework_version='2.1.0',
              name="battlesnake-rllib",
             )

if local_mode:
    inf_instance_type = 'local'
else:
    inf_instance_type = "SAGEMAKER_INFERENCE_INSTANCE_TYPE"

# Deploy an inference endpoint
predictor = model.deploy(initial_instance_count=1, instance_type=inf_instance_type,#instance_type="local", #
                         endpoint_name='battlesnake-endpoint')

# Test the endpoint

This example is using single observation for a 5-agent environment 
The last axis is 12 because the current MultiAgentEnv is concatenating 2 frames
5 agent maps + 1 food map = 6 maps total    6 maps * 2 frames = 12

In [None]:
import numpy as np
from time import time

data1 = np.zeros(shape=(1, 21, 21, 6), dtype=np.float32).tolist()

health_dict = {0: 50, 1: 50}
json = {"turn": 4,
        "board": {
                "height": 15,
                "width": 15,
                "food": [],
                "snakes": []
                },
            "you": {
                "id": "snake-id-string",
                "name": "Sneky Snek",
                "health": 90,
                "body": [{"x": 1, "y": 3}]
                }
            }

before = time()
action = predictor.predict({"state": data1, "prev_action": -1, 
                           "prev_reward": -1, "seq_lens": -1,  
                           "all_health": health_dict, "json": json})
elapsed = time() - before

action_to_take = action["outputs"]["heuristisc_action"]
print("Action to take {}".format(action_to_take))
print("Inference took %.2f ms" % (elapsed*1000))