In [1]:
!pip install ray

Collecting ray
[?25l  Downloading https://files.pythonhosted.org/packages/11/14/15d0f0aec20a4674a996429160565a071688f27f49f789327ebed8188ffb/ray-1.2.0-cp37-cp37m-manylinux2014_x86_64.whl (47.5MB)
[K     |████████████████████████████████| 47.5MB 82kB/s 
[?25hCollecting py-spy>=0.2.0
[?25l  Downloading https://files.pythonhosted.org/packages/0c/b7/2056a6f06adb93f679f2a1e415dd33219b7c66ba69b8fd2ff1668b8064ed/py_spy-0.3.4-py2.py3-none-manylinux1_x86_64.whl (3.2MB)
[K     |████████████████████████████████| 3.2MB 42.8MB/s 
[?25hCollecting colorama
  Downloading https://files.pythonhosted.org/packages/44/98/5b86278fbbf250d239ae0ecb724f8572af1c91f4a11edf4d36a206189440/colorama-0.4.4-py2.py3-none-any.whl
Collecting aioredis
[?25l  Downloading https://files.pythonhosted.org/packages/b0/64/1b1612d0a104f21f80eb4c6e1b6075f2e6aba8e228f46f229cfd3fdac859/aioredis-1.3.1-py3-none-any.whl (65kB)
[K     |████████████████████████████████| 71kB 7.4MB/s 
[?25hCollecting opencensus
[?25l  Downloadin

In [4]:
import ray
import time
ray.init()

RuntimeError: ignored

## ray

remote function은 Ray의 프로세스에 의해 비동기적으로 실행됩니다. 따라서 아래의 코드는 ray에 의해서 비동기적으로 실행됩니다.

In [5]:
@ray.remote(num_cpus=0.5)
def f(x):
    time.sleep(1)
    return x

In [6]:
start = time.time()
# Start 4 tasks in parallel.
result_ids = []
for i in range(4):
    result_ids.append(f.remote(i))

# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores, this will take 1 second.
results = ray.get(result_ids)  # [0, 1, 2, 3]

print("Ray 소요시간: {}".format(time.time() - start))

Ray 소요시간: 2.014763593673706


In [7]:
def f(x):
    time.sleep(1)
    return x

In [8]:
start = time.time()
result_ids = [] 
for i in range(4):
    result_ids.append(f(i))
print("소요시간 : {}".format(time.time() - start))

소요시간 : 4.004309415817261


## Task in Task

병렬로 x_id와 y_id를 연산하고 z_id를 구하는 예제

In [9]:
import numpy as np

@ray.remote
def create_matrix(size):
    return np.random.normal(size=size)

@ray.remote
def multiply_matrices(x, y):
    return np.dot(x, y)

size = 10000

# Get the results with ray.
start = time.time()
x_id = create_matrix.remote([size, size])
y_id = create_matrix.remote([size, size])
z_id = multiply_matrices.remote(x_id, y_id)
z = ray.get(z_id)
print("z_id_with_ray : {}".format(time.time() - start))

# Get the results with np array.
start = time.time()
x_id = np.random.normal([size, size])
y_id = np.random.normal([size, size])
z_id = np.dot(x_id, y_id)
print("z_id_with_np_array : {}".format(time.time() - start))

z_id_with_ray : 70.33507108688354
z_id_with_np_array : 0.0007925033569335938


## Aggregation

연산을 어떻게 병렬로 배치하느냐에 따라서 보는 속도의 차이
![](https://miro.medium.com/max/1400/1*vHz3troEmr4uLns0V8VmdA.jpeg)


In [10]:
import time

@ray.remote
def add(x, y):
    time.sleep(1)
    return x + y

In [11]:
start = time.time()
# Aggregate the values slowly. This approach takes O(n) where n is the
# number of values being aggregated. In this case, 7 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(id1, 3)
id3 = add.remote(id2, 4)
id4 = add.remote(id3, 5)
id5 = add.remote(id4, 6)
id6 = add.remote(id5, 7)
id7 = add.remote(id6, 8)
result = ray.get(id7)
print("Vanilla version : {}".format(time.time() - start))


start = time.time()
# Aggregate the values in a tree-structured pattern. This approach
# takes O(log(n)). In this case, 3 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(3, 4)
id3 = add.remote(5, 6)
id4 = add.remote(7, 8)
id5 = add.remote(id1, id2)
id6 = add.remote(id3, id4)
id7 = add.remote(id5, id6)
result = ray.get(id7)
print("Advanced version : {}".format(time.time() - start))

Vanilla version : 7.02429461479187
Advanced version : 4.013916254043579


## Reference

https://towardsdatascience.com/modern-parallel-and-distributed-python-a-quick-tutorial-on-ray-99f8d70369b8