<a href="https://colab.research.google.com/github/denklewer/yandex-rl-tasks/blob/master/Ray_Tutorial.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
# # in google colab uncomment this

import os

In [2]:
os.system('pip install ray')

0

In [3]:
import ray
ray.init(num_gpus=8)

2019-05-12 16:10:56,131	INFO node.py:469 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2019-05-12_16-10-56_1940/logs.
2019-05-12 16:10:56,243	INFO services.py:407 -- Waiting for redis server at 127.0.0.1:63370 to respond...
2019-05-12 16:10:56,384	INFO services.py:407 -- Waiting for redis server at 127.0.0.1:48214 to respond...
2019-05-12 16:10:56,391	INFO services.py:804 -- Starting Redis shard with 2.52 GB max memory.
2019-05-12 16:10:56,433	INFO node.py:483 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2019-05-12_16-10-56_1940/logs.
2019-05-12 16:10:56,438	INFO services.py:1427 -- Starting the Plasma object store with 3.78 GB memory using /dev/shm.


{'node_ip_address': '172.28.0.2',
 'object_store_address': '/tmp/ray/session_2019-05-12_16-10-56_1940/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2019-05-12_16-10-56_1940/sockets/raylet',
 'redis_address': '172.28.0.2:63370',
 'webui_url': None}

In [4]:
x = "example"
ray.put(x)  # ObjectID(b49a32d72057bdcfc4dda35584b3d838aad89f5d)

ObjectID(ffffffff11d2dfcaf2d3e429e56d8b95f47d0ef7)

In [5]:
x_id = ray.put("example")
ray.get(x_id)  # "example"

'example'

In [6]:
result_ids = [ray.put(i) for i in range(10)]
ray.get(result_ids)  # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [0]:
def add1(a, b):
    return a + b

In [0]:
@ray.remote
def add2(a, b):
    return a + b

In [9]:
x_id = add2.remote(1, 2)
ray.get(x_id)  # 3

3

In [10]:
import time

def f1():
    time.sleep(1)

@ray.remote
def f2():
    time.sleep(1)

# The following takes ten seconds.
[f1() for _ in range(10)]

# The following takes one second (assuming the system has at least ten CPUs).
ray.get([f2.remote() for _ in range(10)])

[None, None, None, None, None, None, None, None, None, None]

In [11]:
add2.remote(1, 2)
add2.remote(1, ray.put(2))
add2.remote(ray.put(1), ray.put(2))

ObjectID(01000000839c2f5495c7538f46b31d89744d7a0c)

In [0]:
@ray.remote(num_return_vals=3)
def return_multiple():
    return 1, 2, 3

a_id, b_id, c_id = return_multiple.remote()

## Expressing dependencies between tasks

In [13]:
@ray.remote
def f(x):
    return x + 1

x = f.remote(0)
y = f.remote(x)
z = f.remote(y)
ray.get(z) # 3

3

In [0]:
import numpy as np

@ray.remote
def generate_data():
    return np.random.normal(size=1000)

@ray.remote
def aggregate_data(x, y):
    return x + y

# Generate some random data. This launches 100 tasks that will be scheduled on
# various nodes. The resulting data will be distributed around the cluster.
data = [generate_data.remote() for _ in range(100)]

# Perform a tree reduce.
while len(data) > 1:
    data.append(aggregate_data.remote(data.pop(0), data.pop(0)))

# Fetch the result.
ray.get(data)

## Remote Functions Within Remote Functions

In [15]:
@ray.remote
def sub_experiment(i, j):
    # Run the jth sub-experiment for the ith experiment.
    return i + j

@ray.remote
def run_experiment(i):
    sub_results = []
    # Launch tasks to perform 10 sub-experiments in parallel.
    for j in range(10):
        sub_results.append(sub_experiment.remote(i, j))
    # Return the sum of the results of the sub-experiments.
    return sum(ray.get(sub_results))

results = [run_experiment.remote(i) for i in range(5)]
ray.get(results) # [45, 55, 65, 75, 85]



[45, 55, 65, 75, 85]

In [16]:
ray.get_gpu_ids()


[]

# The Ray API

*

*   **ray.init()** - Connect to an existing Ray cluster or start one and connect to it.
*   **ray.remote()** - Define a remote function or an actor class.
*  **ray.get()** - Get a remote object or a list of remote objects from the object store.
*  **ray.wait()** - Return a list of IDs that are ready and a list of IDs that are not.
*  **ray.put()** - Store an object in the object store.
*  **ray.get_gpu_ids()** - Get the IDs of the GPUs that are available to the worker.
*  **ray.get_resource_ids()** - Get the IDs of the resources that are available to the worker.
*  **ray.get_webui_url()** - 
 Get the URL to access the web UI.
* **ray.shutdown()** - Disconnect the worker, and terminate processes started by ray.init().
*  **ray.register_custom_serializer()** -- Enable serialization and deserialization for a particular class.
*  **ray.profile()** - Profile a span of time so that it appears in the timeline visualization.
*  **ray.method()** - Annotate an actor method.|


# Actors

## Defining and creating an actor

In [0]:
@ray.remote
class Counter(object):
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

In [0]:
a1 = Counter.remote()
a2 = Counter.remote()

## Using an actor

In [19]:
a1.increment.remote()  # ray.get returns 1
a2.increment.remote()  # ray.get returns 1

ObjectID(01000000645ffd24f9db37e98af8eb11de018325)

We can then call ray.get on the object ID to retrieve the actual value.

Similarly, the call to a2.increment.remote() generates a task that is scheduled on the second Counter actor. Since these two tasks run on different actors, they can be executed in parallel (note that only actor methods will be scheduled on actor workers, regular remote functions will not be).

On the other hand, methods called on the same Counter actor are executed serially in the order that they are called. They can thus share state with one another, as shown below.

In [20]:
# Create ten Counter actors.
counters = [Counter.remote() for _ in range(10)]

# Increment each Counter once and get the results. These tasks all happen in
# parallel.
results = ray.get([c.increment.remote() for c in counters])
print(results)  # prints [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

# Increment the first Counter five times. These tasks are executed serially
# and share state.
results = ray.get([counters[0].increment.remote() for _ in range(5)])
print(results)  # prints [2, 3, 4, 5, 6]



[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[2, 3, 4, 5, 6]


## A More Interesting Actor Example

In [0]:
import gym

@ray.remote
class GymEnvironment(object):
    def __init__(self, name):
        self.env = gym.make(name)
        self.env.reset()

    def step(self, action):
        return self.env.step(action)

    def reset(self):
        self.env.reset()

In [22]:
pong = GymEnvironment.remote("Pong-v0")
pong.step.remote(0)  # Take action 0 in the simulator.

ObjectID(010000005e4a676537735cc0c1cad35f732c372f)

## Using GPUs on actors

In [0]:
import tensorflow as tf

def construct_network():
    x = tf.placeholder(tf.float32, [None, 784])
    y_ = tf.placeholder(tf.float32, [None, 10])

    W = tf.Variable(tf.zeros([784, 10]))
    b = tf.Variable(tf.zeros([10]))
    y = tf.nn.softmax(tf.matmul(x, W) + b)

    cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
    train_step = tf.train.GradientDescentOptimizer(0.5).minimize(cross_entropy)
    correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

    return x, y_, train_step, accuracy

We can then define an actor for this network as follows.

In [0]:
@ray.remote(num_gpus=1)
class NeuralNetOnGPU(object):
    def __init__(self, mnist_data):
        self.mnist = mnist_data
        # Set an environment variable to tell TensorFlow which GPUs to use. Note
        # that this must be done before the call to tf.Session.
        os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in ray.get_gpu_ids()])
        with tf.Graph().as_default():
            with tf.device("/gpu:0"):
                self.x, self.y_, self.train_step, self.accuracy = construct_network()
                # Allow this to run on CPUs if there aren't any GPUs.
                config = tf.ConfigProto(allow_soft_placement=True)
                self.sess = tf.Session(config=config)
                # Initialize the network.
                init = tf.global_variables_initializer()
                self.sess.run(init)

    def train(self, num_steps):
        for _ in range(num_steps):
            batch_xs, batch_ys = self.mnist.train.next_batch(100)
            self.sess.run(self.train_step, feed_dict={self.x: batch_xs, self.y_: batch_ys})

    def get_accuracy(self):
        return self.sess.run(self.accuracy, feed_dict={self.x: self.mnist.test.images,
                                                       self.y_: self.mnist.test.labels})

To indicate that an actor requires one GPU, we pass in num_gpus=1 to ray.remote. Note that in order for this to work, Ray must have been started with some GPUs, e.g., via ray.init(num_gpus=2). Otherwise, when you try to instantiate the GPU version with NeuralNetOnGPU.remote(), an exception will be thrown saying that there aren’t enough GPUs in the system.

When the actor is created, it will have access to a list of the IDs of the GPUs that it is allowed to use via ray.get_gpu_ids(). This is a list of integers, like [], or [1], or [2, 5, 6]. Since we passed in ray.remote(num_gpus=1), this list will have length one.

In [26]:
import os
import ray
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data

# Load the MNIST dataset and tell Ray how to serialize the custom classes.
mnist = input_data.read_data_sets("MNIST_data", one_hot=True)

# Create the actor.
nn = NeuralNetOnGPU.remote(mnist)

# Run a few steps of training and print the accuracy.
nn.train.remote(100)
accuracy = ray.get(nn.get_accuracy.remote())
print("Accuracy is {}.".format(accuracy))



Instructions for updating:
Please use alternatives such as official/mnist/dataset.py from tensorflow/models.
Instructions for updating:
Please write your own downloading logic.
Instructions for updating:
Please use urllib or similar directly.
Successfully downloaded train-images-idx3-ubyte.gz 9912422 bytes.
Instructions for updating:
Please use tf.data to implement this functionality.
Extracting MNIST_data/train-images-idx3-ubyte.gz
Successfully downloaded train-labels-idx1-ubyte.gz 28881 bytes.
Instructions for updating:
Please use tf.data to implement this functionality.
Extracting MNIST_data/train-labels-idx1-ubyte.gz
Instructions for updating:
Please use tf.one_hot on tensors.
Successfully downloaded t10k-images-idx3-ubyte.gz 1648877 bytes.
Extracting MNIST_data/t10k-images-idx3-ubyte.gz
Successfully downloaded t10k-labels-idx1-ubyte.gz 4542 bytes.
Extracting MNIST_data/t10k-labels-idx1-ubyte.gz
Instructions for updating:
Please use alternatives such as official/mnist/dataset.py fr

In [0]:
@ray.remote(num_gpus=1)
class GPUActor(object):
    def __init__(self):
        return "This actor is allowed to use GPUs {}.".format(ray.get_gpu_ids())
    def test(self):
        return "This actor is allowed to use GPUs {}.".format(ray.get_gpu_ids())


In [0]:
@ray.remote(num_gpus=1)
class GPUActor(object):
    def __init__(self):
        self.gpu_ids = ray.get_gpu_ids()
        os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, self.gpu_ids))
        # The call to tf.Session() will restrict TensorFlow to use the GPUs
        # specified in the CUDA_VISIBLE_DEVICES environment variable.
        self.sess = tf.Session()

In [0]:
actor = GPUActor.remote()

In [7]:
ray.get(actor.test.remote())

'This actor is allowed to use GPUs [7].'