# Amazon SageMaker Notebook for ProcGen Starter Kit with homogeneous scaling of multiple GPU instances 

In [None]:
import os
import time
import yaml

import sagemaker
from sagemaker.rl import RLEstimator, RLToolkit, RLFramework
import boto3

from IPython.display import HTML, Markdown
from source.common.docker_utils import build_and_push_docker_image
from source.common.markdown_helper import generate_help_for_s3_endpoint_permissions, create_s3_endpoint_manually

In [None]:
with open(os.path.join("config", "sagemaker_config.yaml")) as f:
    sagemaker_config = yaml.safe_load(f)

## Initialize Amazon SageMaker

In [None]:
sm_session = sagemaker.session.Session()
s3_bucket = sagemaker_config["S3_BUCKET"]

s3_output_path = 's3://{}/'.format(s3_bucket)
print("S3 bucket path: {}".format(s3_output_path))

In [None]:
job_name_prefix = 'sm-ray-gpu-dist-procgen'

role = sagemaker.get_execution_role()
print(role)

#### Note that `local_mode = True` does not work with heterogeneous scaling

In [None]:
instance_type = sagemaker_config["GPU_TRAINING_INSTANCE"]

# Configure the framework you want to use

Set `framework` to `"tf"` or `"torch"` for tensorflow or pytorch respectively.

You will also have to edit your entry point i.e., `train-sagemaker-distributed-gpu.py` with the configuration parameter `"use_pytorch"` to match the framework that you have selected.

In [None]:
framework = "tf"

# Train your homogeneous scaling job here

### Edit the training code

The training code is written in the file `train-sagemaker-distributed-gpu.py` which is uploaded in the /source directory.

*Note that ray will automatically set `"ray_num_cpus"` and `"ray_num_gpus"` in `_get_ray_config`*

In [None]:
!pygmentize source/train-sagemaker-distributed-gpu.py

### Train the RL model using the Python SDK Script mode

When using SageMaker for distributed training, you can select a GPU or CPU instance. The RLEstimator is used for training RL jobs.

1. Specify the source directory where the environment, presets and training code is uploaded.
2. Specify the entry point as the training code
3. Specify the image (CPU or GPU) to be used for the training environment.
4. Define the training parameters such as the instance count, job name, S3 path for output and job name.
5. Define the metrics definitions that you are interested in capturing in your logs. These can also be visualized in CloudWatch and SageMaker Notebooks.

#### GPU docker image

In [None]:
#Build GPU image
gpu_repository_short_name = "sagemaker-procgen-ray-%s" % "gpu"
docker_build_args = {
    'CPU_OR_GPU': "gpu", 
    'AWS_REGION': boto3.Session().region_name,
    'FRAMEWORK': framework
}
image_name = build_and_push_docker_image(gpu_repository_short_name, build_args=docker_build_args)
print("Using GPU ECR image %s" % image_name)

In [None]:
metric_definitions =  [
    {'Name': 'training_iteration', 'Regex': 'training_iteration: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'}, 
    {'Name': 'episodes_total', 'Regex': 'episodes_total: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'}, 
    {'Name': 'num_steps_trained', 'Regex': 'num_steps_trained: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'}, 
    {'Name': 'timesteps_total', 'Regex': 'timesteps_total: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'},
    {'Name': 'training_iteration', 'Regex': 'training_iteration: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'},

    {'Name': 'episode_reward_max', 'Regex': 'episode_reward_max: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'}, 
    {'Name': 'episode_reward_mean', 'Regex': 'episode_reward_mean: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'}, 
    {'Name': 'episode_reward_min', 'Regex': 'episode_reward_min: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'},
] 

### Ray homogeneous scaling - Specify `train_instance_count` > 1

Homogeneous scaling allows us to use multiple instances of the same type.

Spot instances are unused EC2 instances that could be used at 90% discount compared to On-Demand prices (more information about spot instances can be found [here](https://aws.amazon.com/ec2/spot/?cards.sort-by=item.additionalFields.startDateTime&cards.sort-order=asc) and [here](https://docs.aws.amazon.com/sagemaker/latest/dg/model-managed-spot-training.html))

To use spot instances, set `train_use_spot_instances = True`. To use On-Demand instances, `train_use_spot_instances = False`.

In [None]:
train_instance_count = 2
train_use_spot_instances = False

# Select which procgen environments to run in `envs_to_run`
'''
envs_to_run = ["coinrun", "bigfish", "bossfight", "caveflyer",
               "chaser", "climber",  "dodgeball",
               "fruitbot", "heist", "jumper", "leaper", "maze",
               "miner", "ninja", "plunder", "starpilot"]
'''

envs_to_run = ["coinrun"]

for env in envs_to_run:
    if train_use_spot_instances:
        print('*** Using spot instances ... ')
        job_name = 'sm-ray-dist-procgen-spot-' + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime()) + "-" + env
        checkpoint_s3_uri = 's3://{}/sagemaker-procgen/checkpoints/{}'.format(s3_bucket, job_name)
        training_params = {"train_use_spot_instances": True,
                           "train_max_run": 3600 * 5,
                           "train_max_wait": 7200 * 5,
                           "checkpoint_s3_uri": checkpoint_s3_uri
                          }
        hyperparameters = {
            "rl.training.upload_dir": checkpoint_s3_uri, #Necessary for syncing between spot instances
            "rl.training.config.env_config.env_name": env,
        }
    else:
        training_params = {"base_job_name": job_name_prefix + "-" + env}
        hyperparameters = {
            #"rl.training.upload_dir": s3_output_path + "/tensorboard_sync", # Uncomment to view tensorboard
            "rl.training.config.env_config.env_name": env,
        }

    # Defining the RLEstimator
    estimator = RLEstimator(entry_point="train-sagemaker-distributed-gpu.py",
                            source_dir='source',
                            dependencies=["source/utils", "source/common/", "neurips2020-procgen-starter-kit/"],
                            image_uri=image_name,
                            role=role,
                            instance_type=instance_type,
                            instance_count=train_instance_count,
                            output_path=s3_output_path,
                            metric_definitions=metric_definitions,
                            hyperparameters=hyperparameters,
                            **training_params
                        )
    if train_use_spot_instances:
        estimator.fit(job_name=job_name, wait=False)
    else:
        estimator.fit(wait=False)
    
    print(' ')
    print(estimator.latest_training_job.job_name)
    print('type=', instance_type, 'count=', train_instance_count )
    print(' ')