In [None]:
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

<table align="left">

  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/notebook_template.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Run in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/notebook_template.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
  <td>
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/notebook_template.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo">
      Open in Vertex AI Workbench
    </a>
  </td>                                                                                               
</table>

## Overview

In this notebook, we demonstrate creating an ephermal Ray cluster to run a workload using the ray [RLlib](https://docs.ray.io/en/latest/rllib/index.html) reinforcment learning library on top of Vertex AI training.  

### Creating an ephemeral Ray cluster using Vertex AI Training

The following notebook demonstrates a basic proof-of-concept example for creating a Ray cluster on demand for reinforcement learning using the framework-agnostic distributed training capabilities of Vertex AI.

At a high level, we create a single custom container for use by both the 'head' node and the 'worker' nodes in the Ray cluster. At the start of training, each node checks for its location in the resource pool by inspecting the Vertex AI 'task' attribute of the [`CLUSTER_SPEC`](https://cloud.google.com/vertex-ai/docs/training/distributed-training#cluster-variables) environment variable - the first node in `workerpool0` becomes the 'head' node and nodes in `workerpool1` become workers which contribute compute resources to the cluster.

Ray doesn't have a simple way of shutting down the whole cluster and cleanly releasing all the resources, so upon completion of the training script, the head node creates a `stop_cluster.txt` file in the gcs storage bucket before issuing `ray stop` which should send terminate commands to the workers.  The head node will wait for one minute before issuing `ray stop` which should give the workers some time to check for the existence of the `stop_cluster.txt` and shut themselves down cleanly. Some connection errors may be reported in Cloud logging as the worker nodes are shutting down and no longer able to connect to the head node.


### Costs 

This tutorial uses billable components of Google Cloud:

* Vertex AI Training
* Vertex AI Experiments (Managed Tensorboard)
* Artifact Registry
* Cloud Storage

Learn about [Vertex AI
pricing](https://cloud.google.com/vertex-ai/pricing), [Artifact Registry pricing](https://cloud.google.com/artifact-registry/pricing), [Cloud Storage
pricing](https://cloud.google.com/storage/pricing), and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

### Set up your local development environment

**If you are using Colab or Google Cloud Notebooks**, your environment already meets
all the requirements to run this notebook. You can skip this step.

**Otherwise**, make sure your environment meets this notebook's requirements.
You need the following:

* The Google Cloud SDK
* Git
* Python 3
* virtualenv
* Jupyter notebook running in a virtual environment with Python 3

The Google Cloud guide to [Setting up a Python development
environment](https://cloud.google.com/python/setup) and the [Jupyter
installation guide](https://jupyter.org/install) provide detailed instructions
for meeting these requirements. The following steps provide a condensed set of
instructions:

1. [Install and initialize the Cloud SDK.](https://cloud.google.com/sdk/docs/)

1. [Install Python 3.](https://cloud.google.com/python/setup#installing_python)

1. [Install
   virtualenv](https://cloud.google.com/python/setup#installing_and_using_virtualenv)
   and create a virtual environment that uses Python 3. Activate the virtual environment.

1. To install Jupyter, run `pip3 install jupyter` on the
command-line in a terminal shell.

1. To launch Jupyter, run `jupyter notebook` on the command-line in a terminal shell.

1. Open this notebook in the Jupyter Notebook Dashboard.

### Install additional packages

Install additional package dependencies not installed in your notebook environment, such as the Vertex AI SDK

In [1]:
import os

# The Google Cloud Notebook product has specific requirements
IS_GOOGLE_CLOUD_NOTEBOOK = os.path.exists("/opt/deeplearning/metadata/env_version")

# Google Cloud Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_GOOGLE_CLOUD_NOTEBOOK:
    USER_FLAG = "--user"

In [None]:
!pip3 install {USER_FLAG} --upgrade google-cloud-aiplatform

### Restart the kernel

After you install the additional packages, you need to restart the notebook kernel so it can find the packages.

In [2]:
# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

## Before you begin

### Set up your Google Cloud project

**The following steps are required, regardless of your notebook environment.**

1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.

1. [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project).

1. [Enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com). {TODO: Update the APIs needed for your tutorial. Edit the API names, and update the link to append the API IDs, separating each one with a comma. For example, container.googleapis.com,cloudbuild.googleapis.com}

1. If you are running this notebook locally, you will need to install the [Cloud SDK](https://cloud.google.com/sdk).

1. Enter your project ID in the cell below. Then run the cell to make sure the
Cloud SDK uses the right project for all the commands in this notebook.

**Note**: Jupyter runs lines prefixed with `!` as shell commands, and it interpolates Python variables prefixed with `$` into these commands.

#### Set your project ID

**If you don't know your project ID**, you may be able to get your project ID using `gcloud`.

In [1]:
import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Project ID:  gcp-ml-sandbox


Otherwise, set your project ID here.

In [2]:
if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "[your-project-id]"  # @param {type:"string"}

### Authenticate your Google Cloud account

**If you are using Google Cloud Notebooks**, your environment is already
authenticated. Skip this step.

**If you are using Colab**, run the cell below and follow the instructions
when prompted to authenticate your account via oAuth.

**Otherwise**, follow these steps:

1. In the Cloud Console, go to the [**Create service account key**
   page](https://console.cloud.google.com/apis/credentials/serviceaccountkey).

2. Click **Create service account**.

3. In the **Service account name** field, enter a name, and
   click **Create**.

4. In the **Grant this service account access to project** section, click the **Role** drop-down list. Type "Vertex AI"
into the filter box, and select
   **Vertex AI Administrator**. Type "Storage Object Admin" into the filter box, and select **Storage Object Admin**.

5. Click *Create*. A JSON file that contains your key downloads to your
local environment.

6. Enter the path to your service account key as the
`GOOGLE_APPLICATION_CREDENTIALS` variable in the cell below and run the cell.

In [3]:
import os
import sys

# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your GCP account. This provides access to your
# Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

# The Google Cloud Notebook product has specific requirements
IS_GOOGLE_CLOUD_NOTEBOOK = os.path.exists("/opt/deeplearning/metadata/env_version")

# If on Google Cloud Notebooks, then don't execute this code
if not IS_GOOGLE_CLOUD_NOTEBOOK:
    if "google.colab" in sys.modules:
        from google.colab import auth as google_auth

        google_auth.authenticate_user()

    # If you are running this notebook locally, replace the string below with the
    # path to your service account key and run this cell to authenticate your GCP
    # account.
    elif not os.getenv("IS_TESTING"):
        %env GOOGLE_APPLICATION_CREDENTIALS ''

#### Timestamp

If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, you create a timestamp for each instance session, and append it onto the name of resources you create in this tutorial.

In [5]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
TIMESTAMP

'20221206201057'

### Create a Cloud Storage bucket

**The following steps are required, regardless of your notebook environment.**


Ray has [built-in support for Tensorboard](https://docs.ray.io/en/latest/tune/api_docs/logging.html).  We will create a Cloud Storage bucket to store logging output of our reinforcement learning job for access by Vertex managed Tensorboard.

Set the name of your Cloud Storage bucket below. It must be unique across all
Cloud Storage buckets.

You may also change the `REGION` variable, which is used for operations
throughout the rest of this notebook. We suggest that you [choose a region where Vertex AI services are
available](https://cloud.google.com/vertex-ai/docs/general/locations#available_regions).

In [6]:
BUCKET_URI = "gs://[your-bucket-name]"  # @param {type:"string"}
REGION = "[your-region]"  # @param {type:"string"}

In [7]:
if BUCKET_URI == "" or BUCKET_URI is None or BUCKET_URI == "gs://[your-bucket-name]":
    BUCKET_URI = "gs://" + PROJECT_ID + "-aip-" + TIMESTAMP

if REGION == "[your-region]":
    REGION = "us-central1"

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [8]:
! gsutil mb -l $REGION -p $PROJECT_ID $BUCKET_URI

Creating gs://gcp-ml-sandbox-aip-20221206201057/...


Finally, validate access to your Cloud Storage bucket by examining its contents:

In [9]:
! gsutil ls -al $BUCKET_URI

## Python and Env Variables for building containers

Next, create a series of variables that will assist us building our custom container and specifying the resources we need to execute our custom job.

In [10]:
import os

REPO_NAME='ray-ml'

NODE_DIR="/home/jupyter/ray/node"
# Adjust to reflect package requirements
NODE_DIR="/home/jupyter/ray/trainer"
SETUP_DIR="/home/jupyter/ray"
TRAINER_DIR="/ray/trainer"

NODE_IMAGE_NAME='ray_cluster_node'
NODE_IMAGE_TAG='latest'
NODE_IMAGE_URI=f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{NODE_IMAGE_NAME}:{NODE_IMAGE_TAG}"

API_ENDPOINT=f"{REGION}-aiplatform.googleapis.com"

    
os.environ['PROJECT_ID']=PROJECT_ID
os.environ['NODE_DIR']=NODE_DIR
os.environ['NODE_IMAGE_URI']=NODE_IMAGE_URI

## Setup Working directories

Create a working directory for writing our training job and Dockerfile definition.

In [11]:
try:
    os.makedirs(os.getenv("HOME") + TRAINER_DIR, exist_ok = True)
    print(f"Directory {os.getenv('HOME') + TRAINER_DIR} created successfully")
except OSError as error:
    print (error)
    print("Directory '%s' exists")

Directory /home/jupyter/ray/trainer created successfully


In [12]:
%cd $NODE_DIR

/home/jupyter/ray/trainer


In [13]:
%%writefile {SETUP_DIR}/setup.py
from setuptools import find_packages
from setuptools import setup

REQUIRED_PACKAGES = [
    "ray[rllib]",
    "gym",
    #"torch",
    "tblib"
]

VERSION = '0.1'

setup(
    name='trainer',
    version=VERSION,
    install_requires=REQUIRED_PACKAGES,
    packages=find_packages(),
    include_package_data=True,
    description='My training application.'
)

Overwriting /home/jupyter/ray/setup.py


In [14]:
with open('__init__.py', 'w') as fp:
    pass

## Training script

The following training script implements an example of the [IMPALA RL algorithm](https://docs.ray.io/en/latest/rllib/rllib-algorithms.html#impala) from the Ray RLLib library.  

The code specifies to use GPU (cuda) devices for the environment configuration.  The environment setup here should match the resources being requested as part of the Vertex job being created.

In [15]:
%%writefile {NODE_DIR}/task.py
from collections import Counter
import socket
import time

import os
import json

import gym
import numpy as np
import os
import math
from gym import spaces, logger

import ray
from ray import tune
from ray.rllib.agents import ppo
from ray.rllib.env.env_context import EnvContext
from ray.rllib.models import ModelCatalog
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.models.torch.fcnet import FullyConnectedNetwork as TorchFC
from ray.rllib.utils.framework import try_import_torch
from ray.rllib.utils.test_utils import check_learning_achieved
from ray.tune.logger import pretty_print
from ray.tune.registry import register_env

torch, nn = try_import_torch()

import random
import string

# get random password pf length 8 with letters, digits
characters = string.ascii_letters + string.digits
ray_pwd = ''.join(random.choice(characters) for i in range(8))

# Stop cluster file for cleanly shutting down workers
model_dir = os.getenv("AIP_MODEL_DIR", 'trainer')
print(f"AIP_MODEL_DIR: {model_dir}")

start_file_name = 'cluster_started.txt'
stop_file_name = 'stop_cluster.txt'

#bucket = model_dir.split("/")[2]
bucket = "vertexai-mlops-workshop"
#parent = ("/").join(model_dir.split("/")[3:])
parent = "ray-rllib-custom-job"
stop_name = parent + stop_file_name
start_name = parent + start_file_name

stop_file_name = f"/gcs/{bucket}/{stop_name}"
start_file_name = f"/gcs/{bucket}/{start_name}"

print(f"Stop file name: {stop_file_name}")
print(f"Cluster Started file name: {start_file_name}")


#Cluster info
cluster_spec = json.loads(os.environ.get("CLUSTER_SPEC"))
print(f"ClusterSpec:\n{cluster_spec}")

head_node = cluster_spec['cluster']['workerpool0'][0].split(":")[0]
head_node_address = f'{head_node}:2222'

#Grab the task variable to see which which node pool we are in.  
#If workerpool0, we are the head node, otherwise, we are a worker node to delegate resources to the cluster

head_node = (cluster_spec['task']['type'] == 'workerpool0')

if head_node: 
    print(f"Running as Head node: {head_node}")
    print(f"Head node_address: {head_node_address}")
    os.system(f"ray start --head --port=2222 --redis-password={ray_pwd}")
    os.system(f"mkdir -p /gcs/{bucket}/{parent}")
    os.system(f"touch {start_file_name}")

else: # worker - connect and sleep
    os.system(f"ray start --address={head_node_address} --redis-password={ray_pwd}")

ray.init(address="auto",_redis_password=ray_pwd)


#WORK
class CartPole(gym.Env):
    """
    Description:
        A pole is attached by an un-actuated joint to a cart, which moves along
        a frictionless track. The pendulum starts upright, and the goal is to
        prevent it from falling over by increasing and reducing the cart's
        velocity.
    Source:
        This environment corresponds to the version of the cart-pole problem
        described by Barto, Sutton, and Anderson
    Observation:
        Type: Box(4)
        Num     Observation               Min                     Max
        0       Cart Position             -4.8                    4.8
        1       Cart Velocity             -Inf                    Inf
        2       Pole Angle                -0.418 rad (-24 deg)    0.418 rad (24 deg)
        3       Pole Angular Velocity     -Inf                    Inf
    Actions:
        Type: Discrete(2)
        Num   Action
        0     Push cart to the left
        1     Push cart to the right
        Note: The amount the velocity that is reduced or increased is not
        fixed; it depends on the angle the pole is pointing. This is because
        the center of gravity of the pole increases the amount of energy needed
        to move the cart underneath it
    Reward:
        Reward is 1 for every step taken, including the termination step
    Starting State:
        All observations are assigned a uniform random value in [-0.05..0.05]
    Episode Termination:
        Pole Angle is more than 12 degrees.
        Cart Position is more than 2.4 (center of the cart reaches the edge of
        the display).
        Episode length is greater than 200.
        Solved Requirements:
        Considered solved when the average return is greater than or equal to
        195.0 over 100 consecutive trials.
    """

    metadata = {
        'render.modes': ['human', 'rgb_array'],
        'video.frames_per_second': 50
    }

    def __init__(self, config: EnvContext):
        #print(config)
        #print("CUDA Here?: ", torch.cuda.is_available())
        self.gravity = 9.8
        self.masscart = 1.0
        self.masspole = 0.1
        self.total_mass = (self.masspole + self.masscart)
        self.length = 0.5  # actually half the pole's length
        self.polemass_length = (self.masspole * self.length)
        self.force_mag = 10.0
        self.tau = 0.02  # seconds between state updates
        self.kinematics_integrator = 'euler'

        # Angle at which to fail the episode
        self.theta_threshold_radians = 12 * 2 * math.pi / 360
        self.x_threshold = 2.4
        self.env_count = config["env_count"]
        self.device=config["device"]
        
        # Angle limit set to 2 * theta_threshold_radians so failing observation
        # is still within bounds.
        high = np.array([self.x_threshold * 2,
                         np.finfo(np.float32).max,
                         self.theta_threshold_radians * 2,
                         np.finfo(np.float32).max],
                        dtype=np.float32)

        self.action_space = spaces.Discrete(2)
        self.observation_space = spaces.Box(-high, high, dtype=np.float32)

        self.seed()
        self.viewer = None
        self.state = None
        self.done = torch.full([self.env_count], True, dtype=torch.bool, device=self.device)
        self.state = torch.zeros([self.env_count, 4], dtype=torch.float32, device=self.device)

    def seed(self, seed=None):
        return [seed]

    def step(self, action):

        #breakpoint()
        # All env must already have been reset.
        self.done[:] = False
        x, x_dot, theta, theta_dot = self.state[:, 0], self.state[:, 1], self.state[:, 2], self.state[:, 3]
        #breakpoint()
        force = self.force_mag * ((action * 2.) - 1.)
        
        costheta = torch.cos(theta)
        sintheta = torch.sin(theta)

        # For the interested reader:
        # https://coneural.org/florian/papers/05_cart_pole.pdf
        temp = (force + self.polemass_length * theta_dot ** 2 * sintheta) / self.total_mass
        thetaacc = ((self.gravity * sintheta - costheta * temp) 
                    / (self.length * (4.0 / 3.0 - self.masspole * costheta ** 2 / self.total_mass)))
        xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass

        if self.kinematics_integrator == 'euler':
            x = x + self.tau * x_dot
            x_dot = x_dot + self.tau * xacc
            theta = theta + self.tau * theta_dot
            theta_dot = theta_dot + self.tau * thetaacc
        else:  # semi-implicit euler
            x_dot = x_dot + self.tau * xacc
            x = x + self.tau * x_dot
            theta_dot = theta_dot + self.tau * thetaacc
            theta = theta + self.tau * theta_dot

        self.state[:, 0], self.state[:, 1], self.state[:, 2], self.state[:, 3] = x, x_dot, theta, theta_dot

        self.done = (
            (x < -self.x_threshold)
            | (x > self.x_threshold)
            | (theta < -self.theta_threshold_radians)
            | (theta > self.theta_threshold_radians)
        )
        reward = ~self.done
        
        #self.state = self.reset()
        self.reset()
        #print("In GPU: ", self.state.is_cuda)
        to_return = self.state.to("cpu").numpy()[0]
        return to_return, reward.float().item(), self.done, {}

    def reset(self):
        #breakpoint()
        self.state = torch.where(self.done.unsqueeze(1), (torch.rand(self.env_count, 4, device=self.device) -0.5) / 10., self.state) 
        #self.state = (torch.rand((self.env_count, 4)) -0.5) / 10.
        #print("Before converting to CPU, self.state=", self.state)
        #print(self.state)
        to_return = self.state.to("cpu").numpy()[0]
        #print("After converting to CPU, to_return= ", to_return)
        return to_return

    def render(self, mode='human'):
        screen_width = 600
        screen_height = 400

        world_width = self.x_threshold * 2
        scale = screen_width/world_width
        carty = 100  # TOP OF CART
        polewidth = 10.0
        polelen = scale * (2 * self.length)
        cartwidth = 50.0
        cartheight = 30.0

        if self.viewer is None:
            from gym.envs.classic_control import rendering
            self.viewer = rendering.Viewer(screen_width, screen_height)
            l, r, t, b = -cartwidth / 2, cartwidth / 2, cartheight / 2, -cartheight / 2
            axleoffset = cartheight / 4.0
            cart = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)])
            self.carttrans = rendering.Transform()
            cart.add_attr(self.carttrans)
            self.viewer.add_geom(cart)
            l, r, t, b = -polewidth / 2, polewidth / 2, polelen - polewidth / 2, -polewidth / 2
            pole = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)])
            pole.set_color(.8, .6, .4)
            self.poletrans = rendering.Transform(translation=(0, axleoffset))
            pole.add_attr(self.poletrans)
            pole.add_attr(self.carttrans)
            self.viewer.add_geom(pole)
            self.axle = rendering.make_circle(polewidth/2)
            self.axle.add_attr(self.poletrans)
            self.axle.add_attr(self.carttrans)
            self.axle.set_color(.5, .5, .8)
            self.viewer.add_geom(self.axle)
            self.track = rendering.Line((0, carty), (screen_width, carty))
            self.track.set_color(0, 0, 0)
            self.viewer.add_geom(self.track)

            self._pole_geom = pole

        if self.state is None:
            return None

        # Edit the pole polygon vertex
        pole = self._pole_geom
        l, r, t, b = -polewidth / 2, polewidth / 2, polelen - polewidth / 2, -polewidth / 2
        pole.v = [(l, b), (l, t), (r, t), (r, b)]

        x = self.state
        cartx = x[0] * scale + screen_width / 2.0  # MIDDLE OF CART
        self.carttrans.set_translation(cartx, carty)
        self.poletrans.set_rotation(-x[2])

        return self.viewer.render(return_rgb_array=mode == 'rgb_array')

    def close(self):
        if self.viewer:
            self.viewer.close()
            self.viewer = None

if head_node:
    
    print('''This cluster consists of
    {} nodes in total
    {} CPU resources in total
    {} GPU resources in total
    '''.format(len(ray.nodes()), ray.cluster_resources()['CPU'],ray.cluster_resources()['GPU']))
    
    #tensorboard_dir = os.environ.get("AIP_TENSORBOARD_LOG_DIR")
    tensorboard_dir = os.getenv("AIP_TENSORBOARD_LOG_DIR", 'tb_dir')
    #tb_bucket = tensorboard_dir.split("/")[2]
    #tb_path = ("/").join(tensorboard_dir.split("/")[3:])
    tb_path = f"{parent}/{tensorboard_dir}"

    #ray_log_path = f"/gcs/{tb_bucket}/{tb_path}"
    ray_log_path = f"/gcs/{bucket}/{tb_path}"

    print(f"Ray Output Path: {ray_log_path}")
    
    if not os.path.exists(ray_log_path):
        os.makedirs(ray_log_path)

    #RL WORK
    register_env("CartPole", lambda config: CartPole(config))
    
    #ENV config
    rollout_fragment_length = 128
    config = dict(
            {
            "num_workers": 1,
            "num_aggregation_workers": 0,
            "num_cpus_for_driver": 2,
            "num_gpus": 1,
            "num_gpus_per_worker": 0.5,
            "num_cpus_per_worker": 4,
            "num_envs_per_worker": 1,
            "num_multi_gpu_tower_stacks": 1,
            "minibatch_buffer_size": 1,
            "num_sgd_iter" : 1,
            "vf_loss_coeff": 0.5,
            "train_batch_size": rollout_fragment_length*4,
            },
            **{
                "env": CartPole,
                "env_config": {
                    "env_count": 1,
                    "device": "cuda",
                },
                "model": {
                    "fcnet_hiddens": [256, 256],
                    "use_lstm": True,
                    "lstm_cell_size": 256,
                    "lstm_use_prev_action": False,
                    "lstm_use_prev_reward": False,
                },
                "framework": "torch",
                # Run with tracing enabled for tfe/tf2?
                "eager_tracing": False,
            })
    
    stop = {
        "training_iteration": 100
    }

    results = tune.run("IMPALA", config=config, stop=stop, verbose=2, local_dir=ray_log_path, reuse_actors=True)


    #Shutdown the cluster
    print ('Shutting down cluster')
    open(stop_file_name, 'w').close()
    time.sleep(60) # Give workers a chance to detect the stop_cluster.txt file and cleanly shutdown
    os.system("ray stop")
    os.remove(stop_file_name)
    os.remove(start_file_name)

    
else:
    print("Worker node - delegating resources to cluster")
    stop_cluster = os.path.exists(stop_file_name)
    
    while not stop_cluster:
        time.sleep(30)
        print('Secondary worker - main thread - heartbeat')
        stop_cluster = os.path.exists(stop_file_name)
    print ('Shutting down worker')
    ray.shutdown()
    os.system("ray stop")

Overwriting /home/jupyter/ray/trainer/task.py


In [17]:
%%writefile {NODE_DIR}/task.py
from collections import Counter
import socket
import time

import os
import json

import gym
import numpy as np
import os
import math
from gym import spaces, logger

import ray
from ray import tune
from ray.rllib.agents import ppo
from ray.rllib.env.env_context import EnvContext
from ray.rllib.models import ModelCatalog
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.models.torch.fcnet import FullyConnectedNetwork as TorchFC
from ray.rllib.utils.framework import try_import_torch
from ray.rllib.utils.test_utils import check_learning_achieved
from ray.tune.logger import pretty_print
from ray.tune.registry import register_env

torch, nn = try_import_torch()

import random
import string


# get random password pf length 8 with letters, digits
characters = string.ascii_letters + string.digits
ray_pwd = ''.join(random.choice(characters) for i in range(8))

#Adding pip path
os.environ["PATH"] += os.pathsep + '/root/.local/bin'


# Stop cluster file for cleanly shutting down workers
model_dir = os.environ.get("AIP_MODEL_DIR", 'trainer')

print(f"AIP_MODEL_DIR: {model_dir}")

'''
start_file_name = 'cluster_started.txt'
stop_file_name = 'stop_cluster.txt'

bucket = model_dir.split("/")[2]
parent = ("/").join(model_dir.split("/")[3:])
stop_name = parent + stop_file_name
start_name = parent + start_file_name

stop_file_name = f"/gcs/{bucket}/{stop_name}"
start_file_name = f"/gcs/{bucket}/{start_name}"

'''

# Stop cluster file for cleanly shutting down workers
model_dir = os.getenv("AIP_MODEL_DIR", 'trainer')
print(f"AIP_MODEL_DIR: {model_dir}")

start_file_name = 'cluster_started.txt'
stop_file_name = 'stop_cluster.txt'

bucket = model_dir.split("/")[2]
#bucket = "vertexai-mlops-workshop"
#parent = ("/").join(model_dir.split("/")[3:])
parent = "ray-rllib-custom-job"
stop_name = parent + stop_file_name
start_name = parent + start_file_name

print(f"Stop file name: {stop_file_name}")
print(f"Cluster Started file name: {start_file_name}")

#Cluster info
cluster_spec = json.loads(os.environ.get("CLUSTER_SPEC"))
print(f"ClusterSpec:\n{cluster_spec}")

head_node = cluster_spec['cluster']['workerpool0'][0].split(":")[0]
head_node_address = f'{head_node}:2222'

#Grab the task variable to see which which node pool we are in.  
#If workerpool0, we are the head node, otherwise, we are a worker node to delegate resources to the cluster

head_node = (cluster_spec['task']['type'] == 'workerpool0')

if head_node: 
    print(f"Running as Head node: {head_node}")
    print(f"Head node_address: {head_node_address}")
    os.system(f"ray start --head --port=2222 --redis-password={ray_pwd} --dashboard-host=10.128.0.9")
    os.system(f"mkdir -p /gcs/{bucket}/{parent}")
    os.system(f"touch {start_file_name}")

else: # worker - connect and sleep
    os.system(f"ray start --address={head_node_address} --redis-password={ray_pwd} --dashboard-host=10.128.0.9")

ray.init(address="auto",_redis_password=ray_pwd)


#WORK
class CartPole(gym.Env):
    """
    Description:
        A pole is attached by an un-actuated joint to a cart, which moves along
        a frictionless track. The pendulum starts upright, and the goal is to
        prevent it from falling over by increasing and reducing the cart's
        velocity.
    Source:
        This environment corresponds to the version of the cart-pole problem
        described by Barto, Sutton, and Anderson
    Observation:
        Type: Box(4)
        Num     Observation               Min                     Max
        0       Cart Position             -4.8                    4.8
        1       Cart Velocity             -Inf                    Inf
        2       Pole Angle                -0.418 rad (-24 deg)    0.418 rad (24 deg)
        3       Pole Angular Velocity     -Inf                    Inf
    Actions:
        Type: Discrete(2)
        Num   Action
        0     Push cart to the left
        1     Push cart to the right
        Note: The amount the velocity that is reduced or increased is not
        fixed; it depends on the angle the pole is pointing. This is because
        the center of gravity of the pole increases the amount of energy needed
        to move the cart underneath it
    Reward:
        Reward is 1 for every step taken, including the termination step
    Starting State:
        All observations are assigned a uniform random value in [-0.05..0.05]
    Episode Termination:
        Pole Angle is more than 12 degrees.
        Cart Position is more than 2.4 (center of the cart reaches the edge of
        the display).
        Episode length is greater than 200.
        Solved Requirements:
        Considered solved when the average return is greater than or equal to
        195.0 over 100 consecutive trials.
    """

    metadata = {
        'render.modes': ['human', 'rgb_array'],
        'video.frames_per_second': 50
    }

    def __init__(self, config: EnvContext):
        #print(config)
        #print("CUDA Here?: ", torch.cuda.is_available())
        self.gravity = 9.8
        self.masscart = 1.0
        self.masspole = 0.1
        self.total_mass = (self.masspole + self.masscart)
        self.length = 0.5  # actually half the pole's length
        self.polemass_length = (self.masspole * self.length)
        self.force_mag = 10.0
        self.tau = 0.02  # seconds between state updates
        self.kinematics_integrator = 'euler'

        # Angle at which to fail the episode
        self.theta_threshold_radians = 12 * 2 * math.pi / 360
        self.x_threshold = 2.4
        self.env_count = config["env_count"]
        self.device=config["device"]
        
        # Angle limit set to 2 * theta_threshold_radians so failing observation
        # is still within bounds.
        high = np.array([self.x_threshold * 2,
                         np.finfo(np.float32).max,
                         self.theta_threshold_radians * 2,
                         np.finfo(np.float32).max],
                        dtype=np.float32)

        self.action_space = spaces.Discrete(2)
        self.observation_space = spaces.Box(-high, high, dtype=np.float32)

        self.seed()
        self.viewer = None
        self.state = None
        self.done = torch.full([self.env_count], True, dtype=torch.bool, device=self.device)
        self.state = torch.zeros([self.env_count, 4], dtype=torch.float32, device=self.device)

    def seed(self, seed=None):
        return [seed]

    def step(self, action):

        #breakpoint()
        # All env must already have been reset.
        self.done[:] = False
        x, x_dot, theta, theta_dot = self.state[:, 0], self.state[:, 1], self.state[:, 2], self.state[:, 3]
        #breakpoint()
        force = self.force_mag * ((action * 2.) - 1.)
        
        costheta = torch.cos(theta)
        sintheta = torch.sin(theta)

        # For the interested reader:
        # https://coneural.org/florian/papers/05_cart_pole.pdf
        temp = (force + self.polemass_length * theta_dot ** 2 * sintheta) / self.total_mass
        thetaacc = ((self.gravity * sintheta - costheta * temp) 
                    / (self.length * (4.0 / 3.0 - self.masspole * costheta ** 2 / self.total_mass)))
        xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass

        if self.kinematics_integrator == 'euler':
            x = x + self.tau * x_dot
            x_dot = x_dot + self.tau * xacc
            theta = theta + self.tau * theta_dot
            theta_dot = theta_dot + self.tau * thetaacc
        else:  # semi-implicit euler
            x_dot = x_dot + self.tau * xacc
            x = x + self.tau * x_dot
            theta_dot = theta_dot + self.tau * thetaacc
            theta = theta + self.tau * theta_dot

        self.state[:, 0], self.state[:, 1], self.state[:, 2], self.state[:, 3] = x, x_dot, theta, theta_dot

        self.done = (
            (x < -self.x_threshold)
            | (x > self.x_threshold)
            | (theta < -self.theta_threshold_radians)
            | (theta > self.theta_threshold_radians)
        )
        reward = ~self.done
        
        #self.state = self.reset()
        self.reset()
        #print("In GPU: ", self.state.is_cuda)
        to_return = self.state.to("cpu").numpy()[0]
        return to_return, reward.float().item(), self.done, {}

    def reset(self):
        #breakpoint()
        self.state = torch.where(self.done.unsqueeze(1), (torch.rand(self.env_count, 4, device=self.device) -0.5) / 10., self.state) 
        #self.state = (torch.rand((self.env_count, 4)) -0.5) / 10.
        #print("Before converting to CPU, self.state=", self.state)
        #print(self.state)
        to_return = self.state.to("cpu").numpy()[0]
        #print("After converting to CPU, to_return= ", to_return)
        return to_return

    def render(self, mode='human'):
        screen_width = 600
        screen_height = 400

        world_width = self.x_threshold * 2
        scale = screen_width/world_width
        carty = 100  # TOP OF CART
        polewidth = 10.0
        polelen = scale * (2 * self.length)
        cartwidth = 50.0
        cartheight = 30.0

        if self.viewer is None:
            from gym.envs.classic_control import rendering
            self.viewer = rendering.Viewer(screen_width, screen_height)
            l, r, t, b = -cartwidth / 2, cartwidth / 2, cartheight / 2, -cartheight / 2
            axleoffset = cartheight / 4.0
            cart = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)])
            self.carttrans = rendering.Transform()
            cart.add_attr(self.carttrans)
            self.viewer.add_geom(cart)
            l, r, t, b = -polewidth / 2, polewidth / 2, polelen - polewidth / 2, -polewidth / 2
            pole = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)])
            pole.set_color(.8, .6, .4)
            self.poletrans = rendering.Transform(translation=(0, axleoffset))
            pole.add_attr(self.poletrans)
            pole.add_attr(self.carttrans)
            self.viewer.add_geom(pole)
            self.axle = rendering.make_circle(polewidth/2)
            self.axle.add_attr(self.poletrans)
            self.axle.add_attr(self.carttrans)
            self.axle.set_color(.5, .5, .8)
            self.viewer.add_geom(self.axle)
            self.track = rendering.Line((0, carty), (screen_width, carty))
            self.track.set_color(0, 0, 0)
            self.viewer.add_geom(self.track)

            self._pole_geom = pole

        if self.state is None:
            return None

        # Edit the pole polygon vertex
        pole = self._pole_geom
        l, r, t, b = -polewidth / 2, polewidth / 2, polelen - polewidth / 2, -polewidth / 2
        pole.v = [(l, b), (l, t), (r, t), (r, b)]

        x = self.state
        cartx = x[0] * scale + screen_width / 2.0  # MIDDLE OF CART
        self.carttrans.set_translation(cartx, carty)
        self.poletrans.set_rotation(-x[2])

        return self.viewer.render(return_rgb_array=mode == 'rgb_array')

    def close(self):
        if self.viewer:
            self.viewer.close()
            self.viewer = None

if head_node:
    
    print('''This cluster consists of
    {} nodes in total
    {} CPU resources in total
    {} GPU resources in total
    '''.format(len(ray.nodes()), ray.cluster_resources()['CPU'],ray.cluster_resources()['GPU']))
    
    '''
    tensorboard_dir = os.environ.get("AIP_TENSORBOARD_LOG_DIR")
    tb_bucket = tensorboard_dir.split("/")[2]
    tb_path = ("/").join(tensorboard_dir.split("/")[3:])
    '''
    
    #tensorboard_dir = os.environ.get("AIP_TENSORBOARD_LOG_DIR")
    tensorboard_dir = os.getenv("AIP_TENSORBOARD_LOG_DIR", 'tb_dir')
    #tb_bucket = tensorboard_dir.split("/")[2]
    #tb_path = ("/").join(tensorboard_dir.split("/")[3:])
    tb_path = f"{parent}/{tensorboard_dir}"

    ray_log_path = f"/gcs/{bucket}/{tb_path}"

    print(f"Ray Output Path: {ray_log_path}")
    
    if not os.path.exists(ray_log_path):
        os.makedirs(ray_log_path)

    #RL WORK
    register_env("CartPole", lambda config: CartPole(config))
    
    #ENV config
    config = dict(
            {
            "num_workers": 3,
            "num_gpus": 0.5,
            "num_gpus_per_worker": 1,
            "num_envs_per_worker": 1,
            "num_multi_gpu_tower_stacks": 1,
            "minibatch_buffer_size": 1,
            "num_sgd_iter" : 1,
            "vf_loss_coeff": 0.5,
            "train_batch_size": 1000,
                #"num_workers": 3,
                #"num_gpus": .5,
                #"num_gpus_per_worker": 1,
                #"num_envs_per_worker": 1,
                #"num_multi_gpu_tower_stacks": 1,
                #"minibatch_buffer_size": 1,
                #"num_sgd_iter" : 1,
                #"vf_loss_coeff": 0.5,
                #"train_batch_size": 1000,
            },
            **{
                "env": CartPole,
                "disable_env_checking": True,
                "env_config": {
                    "env_count": 1,
                    "device": "cuda",
                },
                "model": {
                    "fcnet_hiddens": [256, 256],
                    "use_lstm": True,
                    "lstm_cell_size": 256,
                    "lstm_use_prev_action": False,
                    "lstm_use_prev_reward": False,
                },
                "framework": "torch",
                # Run with tracing enabled for tfe/tf2?
                "eager_tracing": False,
            })
    
    stop = {
        "training_iteration": 100
    }

    results = tune.run("IMPALA", config=config, stop=stop, verbose=2, local_dir=ray_log_path, reuse_actors=True)


    #Shutdown the cluster
    print ('Shutting down cluster')
    open(stop_file_name, 'w').close()
    time.sleep(60) # Give workers a chance to detect the stop_cluster.txt file and cleanly shutdown
    os.system("ray stop")
    os.remove(stop_file_name)
    os.remove(start_file_name)

    
else:
    print("Worker node - delegating resources to cluster")
    stop_cluster = os.path.exists(stop_file_name)
    
    while not stop_cluster:
        time.sleep(30)
        print('Secondary worker - main thread - heartbeat')
        stop_cluster = os.path.exists(stop_file_name)
    print ('Shutting down worker')
    ray.shutdown()
    os.system("ray stop")

Overwriting /home/jupyter/ray/trainer/task.py


### Run training using CustomTrainingJob with pre-built container

In [30]:
JOB_NAME=f"ray-rllib-training-{TIMESTAMP}"
MACHINE_TYPE="a2-ultragpu-1g"
REPLICA_COUNT=1
ACCELERATOR_TYPE="NVIDIA_TESLA_T4"
ACCELERATOR_COUNT=1
WORKER_POOL_MACHINE_TYPE="n1-standard-4"
WORKER_POOL_REPLICA_COUNT=4
WORKER_ACCELERATOR_TYPE="NVIDIA_TESLA_T4"
#Setup 1 accelerator per worker
WORKER_ACCELERATOR_COUNT=1
#EXECUTOR_IMAGE_URI="gcr.io/deeplearning-platform-release/base-cu113"
EXECUTOR_IMAGE_URI="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-10:latest"
WORKING_DIRECTORY=SETUP_DIR
SCRIPT_PATH="/home/jupyter/ray/trainer/task.py"

In [39]:
print(BUCKET_URI)

gs://gcp-ml-sandbox-aip-20220420163359


In [57]:
import os
import sys

from google.cloud import aiplatform
from google.cloud import aiplatform_v1

aiplatform.init(project=PROJECT_ID, location=REGION)

job = aiplatform.CustomTrainingJob(
display_name=JOB_NAME,
script_path=SCRIPT_PATH,
requirements=["ray[default]","ray[rllib]","gym", "tblib"],
container_uri=EXECUTOR_IMAGE_URI,
staging_bucket=BUCKET_URI
)

job.run(
replica_count=REPLICA_COUNT+WORKER_POOL_REPLICA_COUNT,
accelerator_type=WORKER_ACCELERATOR_TYPE,
machine_type=MACHINE_TYPE,
accelerator_count=WORKER_ACCELERATOR_COUNT,
network="projects/357746845324/global/networks/gcp-ml-sandbox-network",
enable_web_access=True,
sync=False,
)

INFO:google.cloud.aiplatform.utils.source_utils:Training script copied to:
gs://gcp-ml-sandbox-aip-20220420163359/aiplatform-2022-04-20-21:20:58.906-aiplatform_custom_trainer_script-0.1.tar.gz.
INFO:google.cloud.aiplatform.training_jobs:Training Output directory:
gs://gcp-ml-sandbox-aip-20220420163359/aiplatform-custom-training-2022-04-20-21:20:58.987 
INFO:google.cloud.aiplatform.training_jobs:View Training:
https://console.cloud.google.com/ai/platform/locations/us-central1/training/5649513394617712640?project=357746845324
INFO:google.cloud.aiplatform.training_jobs:CustomTrainingJob projects/357746845324/locations/us-central1/trainingPipelines/5649513394617712640 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.training_jobs:View backing custom job:
https://console.cloud.google.com/ai/platform/locations/us-central1/training/1365622578755928064?project=357746845324
INFO:google.cloud.aiplatform.training_jobs:CustomTrainingJob projects/357746845324/locatio

### Run training using the local script and have Vertex autopackage the code
Create job: https://cloud.google.com/vertex-ai/docs/training/create-custom-job#create

#### Timestamp

If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, you create a timestamp for each instance session, and append it onto the name of resources you create in this tutorial.

In [133]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [134]:
JOB_NAME=f"ray-rllib-training-{TIMESTAMP}"
MACHINE_TYPE="n1-standard-4"
REPLICA_COUNT=1
ACCELERATOR_TYPE="NVIDIA_TESLA_T4"
ACCELERATOR_COUNT=1
WORKER_POOL_MACHINE_TYPE="n1-standard-4"
WORKER_POOL_REPLICA_COUNT=1
WORKER_ACCELERATOR_TYPE="NVIDIA_TESLA_T4"
#Setup 1 accelerator per worker
WORKER_ACCELERATOR_COUNT=4
EXECUTOR_IMAGE_URI="gcr.io/deeplearning-platform-release/base-cu113"
EXECUTOR_IMAGE_URI="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-10:latest"
WORKING_DIRECTORY=SETUP_DIR
SCRIPT_PATH="trainer/task.py"

In [None]:
! gcloud ai custom-jobs create \
  --region=$REGION \
  --display-name=$JOB_NAME \
  --worker-pool-spec=machine-type=$MACHINE_TYPE,replica-count=$REPLICA_COUNT,accelerator-type=$ACCELERATOR_TYPE,accelerator-count=$ACCELERATOR_COUNT,executor-image-uri=$EXECUTOR_IMAGE_URI,local-package-path=$WORKING_DIRECTORY,script=$SCRIPT_PATH \
  --worker-pool-spec=machine-type=$WORKER_POOL_MACHINE_TYPE,replica-count=$WORKER_POOL_REPLICA_COUNT,accelerator-type=$WORKER_ACCELERATOR_TYPE,accelerator-count=$WORKER_ACCELERATOR_COUNT

In [135]:
! gcloud ai custom-jobs create \
  --region=$REGION \
  --display-name=$JOB_NAME \
  --worker-pool-spec=machine-type=$MACHINE_TYPE,replica-count=$REPLICA_COUNT,executor-image-uri=$EXECUTOR_IMAGE_URI,local-package-path=$WORKING_DIRECTORY,script=$SCRIPT_PATH \
  --worker-pool-spec=machine-type=$WORKER_POOL_MACHINE_TYPE,replica-count=$WORKER_POOL_REPLICA_COUNT,accelerator-type=$WORKER_ACCELERATOR_TYPE,accelerator-count=$WORKER_ACCELERATOR_COUNT

Using endpoint [https://us-central1-aiplatform.googleapis.com/]
  self.stdin = io.open(p2cwrite, 'wb', bufsize)
  self.stdout = io.open(c2pread, 'rb', bufsize)
Sending build context to Docker daemon  20.53kB
Step 1/10 : FROM us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-10:latest
 ---> 1004232908f3
Step 2/10 : RUN mkdir -m 777 -p /usr/app /home
 ---> Running in 5b80db444fdd
Removing intermediate container 5b80db444fdd
 ---> e7e95ad93f81
Step 3/10 : WORKDIR /usr/app
 ---> Running in 3cde287716a3
Removing intermediate container 3cde287716a3
 ---> 083c9ecad321
Step 4/10 : ENV HOME=/home
 ---> Running in 5914fe28a2fc
Removing intermediate container 5914fe28a2fc
 ---> f1cff28c0e47
Step 5/10 : ENV PYTHONDONTWRITEBYTECODE=1
 ---> Running in 67ed5d166d22
Removing intermediate container 67ed5d166d22
 ---> b962e39eae4e
Step 6/10 : RUN rm -rf /var/sitecustomize
 ---> Running in c563d9bc3986
Removing intermediate container c563d9bc3986
 ---> d16b6a9234c0
Step 7/10 : COPY ["./setup.py", "./set

### Copy command from above to view job status

In [137]:
! gcloud ai custom-jobs describe projects/780788467724/locations/us-central1/customJobs/6567315332740939776

Using endpoint [https://us-central1-aiplatform.googleapis.com/]
createTime: '2022-04-19T01:54:53.431584Z'
displayName: ray-rllib-training-20220419015342
endTime: '2022-04-19T02:00:30Z'
error:
  code: 3
  message: 'The replica workerpool0-0 exited with a non-zero status of 1. Termination
    reason: Error. To find out more about why your job exited please check the logs:
    https://console.cloud.google.com/logs/viewer?project=780788467724&resource=ml_job%2Fjob_id%2F6567315332740939776&advancedFilter=resource.type%3D%22ml_job%22%0Aresource.labels.job_id%3D%226567315332740939776%22'
jobSpec:
  workerPoolSpecs:
  - containerSpec:
      imageUri: gcr.io/anthos-workshop-cloudinsight/cloudai-autogenerated/ray-rllib-training-20220419015342:20220419.01.53.56.257804
    diskSpec:
      bootDiskSizeGb: 100
      bootDiskType: pd-ssd
    machineSpec:
      machineType: n1-standard-4
    replicaCount: '1'
  - containerSpec:
      imageUri: gcr.io/anthos-workshop-cloudinsight/cloudai-autogenerated/