# Tutorial

### Overview

In [32]:
import ray
import time

### put and get

在python object与ray的object IDs进行转换

In [15]:
x="123"
Objectid=ray.put(x)
ray.get(Objectid)

'123'

In [16]:
result_ids = [ray.put(i) for i in range(10)]
ray.get(result_ids)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

### Asynchronous Computation in Ray

#### Remote functions

核心是submitting a task与executing the task的区别

提交一个task到local scheduler会立即返回一个object ID，但是并不一定执行了函数。函数的执行要等到所有的input都满足后才会被触发(这不是lazily方式)

In [23]:
import time

# 普通函数
def f1():
    time.sleep(1)

# remote function
@ray.remote
def f2():
    time.sleep(1)

start=time.time()    
# The following takes 4 seconds.
print([f1() for _ in range(4)])
print(time.time()-start)

start=time.time() 
# The following takes one second (assuming the system has at least 4 CPUs).
print(ray.get([f2.remote() for _ in range(4)]))
print(time.time()-start)

[None, None, None, None]
4.005740404129028
[None, None, None, None]
1.011054277420044


传参可以是value也可以是object ID

返回值是object ID而不是真实的value

In [29]:
@ray.remote
def add2(a, b):
    return a + b

a=add2.remote(1, 2)
b=add2.remote(1, ray.put(2))
c=add2.remote(ray.put(1), ray.put(2))

a,ray.get(a),b,ray.get(b),c,ray.get(c)

(ObjectID(01000000e248859342018871404e42efd905d73e),
 3,
 ObjectID(01000000cfb2eb2f5132c6af4028813b23fead38),
 3,
 ObjectID(01000000a6387379a472809cf8731d415412446e),
 3)

可以返回多个object IDs

In [31]:
@ray.remote(num_return_vals=3)
def return_multiple():
    return 1, 2, 3

a_id, b_id, c_id = return_multiple.remote()

#### tasks之间的依赖性

In [34]:
@ray.remote
def f(x):
    return x + 1

x = f.remote(0)
y = f.remote(x) #在x之后执行
z = f.remote(y) #在y之后执行
ray.get(z) # 3

3

#### remote function 的嵌套

worker进程也可以调用remote function

In [51]:
@ray.remote
def sub_experiment(i, j):
    # Run the jth sub-experiment for the ith experiment.
    return i + j

@ray.remote
def run_experiment(i):
    sub_results = []
    # Launch tasks to perform 10 sub-experiments in parallel.
    for j in range(10):
        sub_results.append(sub_experiment.remote(i, j))
    # Return the sum of the results of the sub-experiments.
    return sum(ray.get(sub_results))

results = [run_experiment.remote(i) for i in range(5)]
ray.get(results) # [45, 55, 65, 75, 85]

[45, 55, 65, 75, 85]

# Actors

#### 定义，创建，使用actor

Counter class 的instance是actor

In [52]:
@ray.remote
class Counter(object):
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

In [56]:
a1 = Counter.remote()
a2 = Counter.remote()

1. 在集群中选择一个node, 在该node上，一个worker进程被创建，目的是当调用actor的method时候执行它
2. 在该worker上，一个Counter object被创建，构造器被执行

In [65]:
v1=a1.increment.remote()  # ray.get returns 1,多次执行则会依次增加
v2=a2.increment.remote()  # ray.get returns 1
ray.get(v1),ray.get(v2)

(6, 6)

当a1.increment.remote()被调用时，执行如下

1. 一个task被创建

2. task被之间传递到本地的调度器（创建actor的那个），不然就传递到global调度器

3. 返回一个objectID

由于a1,a2是不同的actor，它们可以并行执行

同时，执行同一个actor比如a1上的method就会被串行执行

In [67]:
# Create ten Counter actors.
counters = [Counter.remote() for _ in range(10)]

# Increment each Counter once and get the results. These tasks all happen in
# parallel.
results = ray.get([c.increment.remote() for c in counters])
print(results)  # prints [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

# Increment the first Counter five times. These tasks are executed serially
# and share state.
results = ray.get([counters[0].increment.remote() for _ in range(5)])
print(results)  # prints [2, 3, 4, 5, 6]

[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[2, 3, 4, 5, 6]


In [72]:
import gym

@ray.remote
class GymEnvironment(object):
    def __init__(self, name):
        self.env = gym.make(name)
        self.env.reset()

    def step(self, action):
        return self.env.step(action)

    def reset(self):
        self.env.reset()
        
pong = GymEnvironment.remote("Pong-v0")
a=pong.step.remote(0)  # Take action 0 in the simulator.

gym 提供一些测试和训练强化学习agent的simulated env接口

执行下面的部分需要安装tensorflow-gpu直接使用pip如果找不到源，可以使用[tensorflow pip](https://www.tensorflow.org/install/pip)下载

然后使用`pip install -U [link to wheel]`安装

此处仍有问题，使用`yaourt`

```
pacman -S nvidia cuda cudnn
yaourt tensorflow
```
使用带GPU的那个

In [None]:
import os
import ray
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data

ray.init(num_gpus=8)

def construct_network():
    x = tf.placeholder(tf.float32, [None, 784])
    y_ = tf.placeholder(tf.float32, [None, 10])

    W = tf.Variable(tf.zeros([784, 10]))
    b = tf.Variable(tf.zeros([10]))
    y = tf.nn.softmax(tf.matmul(x, W) + b)

    cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
    train_step = tf.train.GradientDescentOptimizer(0.5).minimize(cross_entropy)
    correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

    return x, y_, train_step, accuracy

@ray.remote(num_gpus=1)
class NeuralNetOnGPU(object):
    def __init__(self, mnist_data):
        self.mnist = mnist_data
        # Set an environment variable to tell TensorFlow which GPUs to use. Note
        # that this must be done before the call to tf.Session.
        os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in ray.get_gpu_ids()])
        with tf.Graph().as_default():
            with tf.device("/gpu:0"):
                self.x, self.y_, self.train_step, self.accuracy = construct_network()
                # Allow this to run on CPUs if there aren't any GPUs.
                config = tf.ConfigProto(allow_soft_placement=True)
                self.sess = tf.Session(config=config)
                # Initialize the network.
                init = tf.global_variables_initializer()
                self.sess.run(init)

    def train(self, num_steps):
        for _ in range(num_steps):
            batch_xs, batch_ys = self.mnist.train.next_batch(100)
            self.sess.run(self.train_step, feed_dict={self.x: batch_xs, self.y_: batch_ys})

    def get_accuracy(self):
        return self.sess.run(self.accuracy, feed_dict={self.x: self.mnist.test.images,
                                                       self.y_: self.mnist.test.labels})


# Load the MNIST dataset and tell Ray how to serialize the custom classes.
mnist = input_data.read_data_sets("MNIST_data", one_hot=True)

# Create the actor.
nn = NeuralNetOnGPU.remote(mnist)

# Run a few steps of training and print the accuracy.
nn.train.remote(100)
accuracy = ray.get(nn.get_accuracy.remote())
print("Accuracy is {}.".format(accuracy))

Process STDOUT and STDERR is being redirected to /tmp/ray/session_2018-10-26_20-20-23_14865/logs.
Waiting for redis server at 127.0.0.1:41953 to respond...
Waiting for redis server at 127.0.0.1:42267 to respond...
Starting the Plasma object store with 1.61 GB memory.

View the web UI at http://localhost:8889/notebooks/ray_ui.ipynb?token=5edb455591b177a63c98bb738cf39120c13a7764458664a5



Instructions for updating:
Please use alternatives such as official/mnist/dataset.py from tensorflow/models.


From <ipython-input-1-112d87dae449>:51: read_data_sets (from tensorflow.contrib.learn.python.learn.datasets.mnist) is deprecated and will be removed in a future version.
Instructions for updating:
Please use alternatives such as official/mnist/dataset.py from tensorflow/models.


Instructions for updating:
Please write your own downloading logic.


From /usr/lib/python3.7/site-packages/tensorflow/contrib/learn/python/learn/datasets/mnist.py:260: maybe_download (from tensorflow.contrib.learn.python.learn.datasets.base) is deprecated and will be removed in a future version.
Instructions for updating:
Please write your own downloading logic.


Instructions for updating:
Please use urllib or similar directly.


From /usr/lib/python3.7/site-packages/tensorflow/contrib/learn/python/learn/datasets/base.py:252: _internal_retry.<locals>.wrap.<locals>.wrapped_fn (from tensorflow.contrib.learn.python.learn.datasets.base) is deprecated and will be removed in a future version.
Instructions for updating:
Please use urllib or similar directly.


# Web UI

安装如下内容`pip install jupyter ipywidgets bokeh`

在`ray.init()`时候就会显示

然后跳转之后需要`restart and run all`

用于debug,具体的使用参见[Web UI](https://ray.readthedocs.io/en/latest/webui.html)

In [1]:
import ray

In [2]:
ray.init()

Process STDOUT and STDERR is being redirected to /tmp/ray/session_2018-10-26_19-56-07_12586/logs.
Waiting for redis server at 127.0.0.1:10232 to respond...
Waiting for redis server at 127.0.0.1:39145 to respond...
Starting the Plasma object store with 1.61 GB memory.

View the web UI at http://localhost:8889/notebooks/ray_ui.ipynb?token=1ffbf3ad7ec306b7b7f8a03c63278a8865128c9a4be8ea46



{'node_ip_address': '114.214.177.98',
 'redis_address': '114.214.177.98:10232',
 'object_store_addresses': [ObjectStoreAddress(name='/tmp/ray/session_2018-10-26_19-56-07_12586/sockets/plasma_store', manager_name=None, manager_port=None)],
 'local_scheduler_socket_names': [],
 'raylet_socket_names': ['/tmp/ray/session_2018-10-26_19-56-07_12586/sockets/raylet'],
 'webui_url': 'http://localhost:8889/notebooks/ray_ui.ipynb?token=1ffbf3ad7ec306b7b7f8a03c63278a8865128c9a4be8ea46'}