In [None]:
!pip install ray

Collecting ray
[?25l  Downloading https://files.pythonhosted.org/packages/fa/e5/0ff593f053ff4fa2a582961272ef893c21d268d3c2c52ff1e7effd891e48/ray-1.1.0-cp36-cp36m-manylinux2014_x86_64.whl (48.5MB)
[K     |████████████████████████████████| 48.5MB 94kB/s 
Collecting colorful
[?25l  Downloading https://files.pythonhosted.org/packages/b0/8e/e386e248266952d24d73ed734c2f5513f34d9557032618c8910e605dfaf6/colorful-0.5.4-py2.py3-none-any.whl (201kB)
[K     |████████████████████████████████| 204kB 46.8MB/s 
Collecting opencensus
[?25l  Downloading https://files.pythonhosted.org/packages/50/68/4f407bc0980158001c802222fab17e946728aef13f42e5d80d39dfc9ca67/opencensus-0.7.11-py2.py3-none-any.whl (127kB)
[K     |████████████████████████████████| 133kB 30.8MB/s 
[?25hCollecting py-spy>=0.2.0
[?25l  Downloading https://files.pythonhosted.org/packages/8e/a7/ab45c9ee3c4654edda3efbd6b8e2fa4962226718a7e3e3be6e3926bf3617/py_spy-0.3.3-py2.py3-none-manylinux1_x86_64.whl (2.9MB)
[K     |█████████████████

In [None]:
import ray
import time
ray.init()

2021-01-12 01:54:06,691	INFO services.py:1173 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


{'metrics_export_port': 61951,
 'node_id': '279b22a480d432e3457cd2891c98be5f58663cc8',
 'node_ip_address': '172.28.0.2',
 'object_store_address': '/tmp/ray/session_2021-01-12_01-54-06_044642_57/sockets/plasma_store',
 'raylet_ip_address': '172.28.0.2',
 'raylet_socket_name': '/tmp/ray/session_2021-01-12_01-54-06_044642_57/sockets/raylet',
 'redis_address': '172.28.0.2:6379',
 'session_dir': '/tmp/ray/session_2021-01-12_01-54-06_044642_57',
 'webui_url': '127.0.0.1:8265'}

## ray

remote function은 Ray의 프로세스에 의해 비동기적으로 실행됩니다. 따라서 아래의 코드는 ray에 의해서 비동기적으로 실행됩니다.

In [None]:
@ray.remote(num_cpus=0.5)
def f(x):
    time.sleep(1)
    return x

In [None]:
start = time.time()
# Start 4 tasks in parallel.
result_ids = []
for i in range(4):
    result_ids.append(f.remote(i))

# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores, this will take 1 second.
results = ray.get(result_ids)  # [0, 1, 2, 3]

print("Ray 소요시간: {}".format(time.time() - start))

Ray 소요시간: 2.018064260482788


In [None]:
def f(x):
    time.sleep(1)
    return x

In [None]:
start = time.time()
result_ids = [] 
for i in range(4):
    result_ids.append(f(i))
print("소요시간 : {}".format(time.time() - start))

소요시간 : 4.003791570663452


## Task in Task

병렬로 x_id와 y_id를 연산하고 z_id를 구하는 예제

In [None]:
import numpy as np

@ray.remote
def create_matrix(size):
    return np.random.normal(size=size)

@ray.remote
def multiply_matrices(x, y):
    return np.dot(x, y)

size = 10000

# Get the results with ray.
start = time.time()
x_id = create_matrix.remote([size, size])
y_id = create_matrix.remote([size, size])
z_id = multiply_matrices.remote(x_id, y_id)
z = ray.get(z_id)
print("z_id_with_ray : {}".format(time.time() - start))

# Get the results with np array.
start = time.time()
x_id = np.random.normal([size, size])
y_id = np.random.normal([size, size])
z_id = np.dot(x_id, y_id)
print("z_id_with_np_array : {}".format(time.time() - start))

2021-01-12 02:21:25,181	ERROR worker.py:980 -- Possible unhandled error from worker: [36mray::multiply_matrices()[39m (pid=808, ip=172.28.0.2)
  File "python/ray/_raylet.pyx", line 425, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 447, in ray._raylet.execute_task
ray.exceptions.WorkerCrashedError: The worker died unexpectedly while executing this task.


z_id_with_ray : 129.8542284965515
z_id_with_np_array : 0.011426448822021484


## Aggregation

연산을 어떻게 병렬로 배치하느냐에 따라서 보는 속도의 차이
![](https://miro.medium.com/max/1400/1*vHz3troEmr4uLns0V8VmdA.jpeg)


In [None]:
import time

@ray.remote
def add(x, y):
    time.sleep(1)
    return x + y

In [None]:
start = time.time()
# Aggregate the values slowly. This approach takes O(n) where n is the
# number of values being aggregated. In this case, 7 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(id1, 3)
id3 = add.remote(id2, 4)
id4 = add.remote(id3, 5)
id5 = add.remote(id4, 6)
id6 = add.remote(id5, 7)
id7 = add.remote(id6, 8)
result = ray.get(id7)
print("Vanilla version : {}".format(time.time() - start))


start = time.time()
# Aggregate the values in a tree-structured pattern. This approach
# takes O(log(n)). In this case, 3 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(3, 4)
id3 = add.remote(5, 6)
id4 = add.remote(7, 8)
id5 = add.remote(id1, id2)
id6 = add.remote(id3, id4)
id7 = add.remote(id5, id6)
result = ray.get(id7)
print("Advanced version : {}".format(time.time() - start))

Vanilla version : 7.023898363113403
Advanced version : 4.0150158405303955


## Reference

https://towardsdatascience.com/modern-parallel-and-distributed-python-a-quick-tutorial-on-ray-99f8d70369b8