# Ray Tutorial
참조

[Python Ray 사용법 - Python 병렬처리, 분산처리](https://zzsza.github.io/mlops/2021/01/03/python-ray/)

[Tips for first-time users](https://docs.ray.io/en/master/auto_examples/tips-for-first-time.html)

### 라이브러리 import 및 버전 확인

In [2]:
import ray
import datetime
import time

print(ray.__version__)

1.2.0


### 비교를 위한 일반 파이썬 함수 정의
-참고: datetime.datetime은 날짜와 시간을 나타내며 (연도, 월, 일, 시간, 분, 초, 마이크로 초, tzinfo)를 의미

In [4]:
def print_current_datetime():
    time.sleep(0.3)
    current_datetime = datetime.datetime.now()
    print(current_datetime)
    return current_datetime

### 일반 파이썬 함수 호출

In [5]:
print_current_datetime()

2021-03-13 07:43:38.573515


datetime.datetime(2021, 3, 13, 7, 43, 38, 573515)

### Ray 실행

In [6]:
ray.init()

2021-03-13 07:44:06,975	INFO services.py:1174 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


{'node_ip_address': '192.168.124.100',
 'raylet_ip_address': '192.168.124.100',
 'redis_address': '192.168.124.100:6379',
 'object_store_address': 'tcp://127.0.0.1:54915',
 'raylet_socket_name': 'tcp://127.0.0.1:52920',
 'webui_url': '127.0.0.1:8265',
 'session_dir': 'C:\\Users\\Public\\Documents\\ESTsoft\\CreatorTemp\\ray\\session_2021-03-13_07-44-05_646152_1072',
 'metrics_export_port': 64122,
 'node_id': 'af2cbefb99df5446ef91442cfafbe8c76f00918dee3c42e7882a7835'}

실행 시 위와 같은 값들이 출력됨
- http://127.0.0.1:8265 (localhost:8265)로 접근하면 대시보드 확인 가능
- 대시보드 내용은 밑에서 다룸

### Ray Task(Remote Function) 정의
- 기존 함수에서 `@ray.remote`만 추가

In [7]:
# Ray task
@ray.remote
def print_current_datetime():
    time.sleep(0.3)
    current_datetime = datetime.datetime.now()
    print(current_datetime)
    return current_datetime

### 일반 파이썬 함수 호출처럼 실행해 보면 TypeError 발생
- Remote 함수는 직접적으로 호출할 수 없고, .remote()를 추가하라고 함

In [8]:
print_current_datetime()

TypeError: Remote functions cannot be called directly. Instead of running '__main__.print_current_datetime()', try '__main__.print_current_datetime.remote()'.

### `print_current_datetime.remote()`를 사용해 Task 호출
- ObjectRef가 반환됨

In [9]:
print_current_datetime.remote()

ObjectRef(a67dc375e60ddd1affffffffffffffffffffffff0100000001000000)

[2m[36m(pid=3164)[0m 2021-03-13 07:48:29.946684


- 다시 또 실행하면 다른 id 반환

In [10]:
print_current_datetime.remote()

ObjectRef(63964fa4841d4a2effffffffffffffffffffffff0100000001000000)

[2m[36m(pid=3164)[0m 2021-03-13 07:48:52.290017


### `ray.get()` 을 사용해 값 받기

In [11]:
ray.get(print_current_datetime.remote())

datetime.datetime(2021, 3, 13, 7, 50, 34, 601129)

[2m[36m(pid=3164)[0m 2021-03-13 07:50:34.601129


### 동시에 4번 실행하기
- List Comprehension을 사용해 futures를 생성 후, 출력하면 4개의 ObjectRef를 확인할 수 있음
- ray.get을 통해 값 받기

In [13]:
futures = [print_current_datetime.remote() for i in range(4)]

print(futures)

ray.get(futures)

[ObjectRef(623b26bdd75b28e9ffffffffffffffffffffffff0100000001000000), ObjectRef(1e9d04d3b7e4dfb2ffffffffffffffffffffffff0100000001000000), ObjectRef(609d7f556b6757adffffffffffffffffffffffff0100000001000000), ObjectRef(c6953afc4a9f69e9ffffffffffffffffffffffff0100000001000000)]


[datetime.datetime(2021, 3, 13, 7, 51, 57, 145250),
 datetime.datetime(2021, 3, 13, 7, 51, 57, 145250),
 datetime.datetime(2021, 3, 13, 7, 51, 57, 145250),
 datetime.datetime(2021, 3, 13, 7, 51, 57, 145250)]

[2m[36m(pid=3164)[0m 2021-03-13 07:51:57.145250
[2m[36m(pid=6524)[0m 2021-03-13 07:51:57.145250
[2m[36m(pid=11608)[0m 2021-03-13 07:51:57.145250
[2m[36m(pid=6460)[0m 2021-03-13 07:51:57.145250


### Ray Dashboard 
대시보드는 윈도우에서 아직 지원하지 않는다고 한다.ㅠㅠ

### 처음 사용하는 사람들을 위한 Tip
공식문서 [https://docs.ray.io/en/master/auto_examples/tips-for-first-time.html](Tips for first-time users) 의 핵심 내용. 해당 문서는 꼭 읽어보기

#### 1) `ray.get()`은 실행 횟수 파악하기(가능하면 늦게 호출)
+ 다음과 같이 remote를 하며 호출하는 경우는 느림
    + 예시(List Comprehension하며 ray.get을 반복 호출)

In [1]:
import time

def do_some_work(x):
    time.sleep(1)
    return x

start = time.time()
results = [do_some_work(x) for x in range(4)]
print('duration = ', time.time()-start)
print('results = ', results)

duration =  4.028665542602539
results =  [0, 1, 2, 3]


위는 ray없이 실행시킨 경우로 실행시간이 약 4초가 나왔다.

In [None]:
import ray
ray.init()

In [4]:
@ray.remote
def do_some_work(x):
    time.sleep(1)
    return x

start = time.time()
results = [ray.get(do_some_work.remote(x)) for x in range(4)]
print('duration = ', time.time() - start)
print('results = ', results)

duration =  4.057691335678101
results =  [0, 1, 2, 3]


ray를 사용했으나 동일하게 4초가 나오는 것을 알 수 있다. 

이를 해결하려면 List Comprehension 결과를 ray.get으로 받아 1번만 호출하면 된다.

In [7]:
start = time.time()
result = ray.get([do_some_work.remote(x) for x in range(4)])
print('duration = ', time.time() - start)
print('results = ', results)

duration =  1.0089175701141357
results =  [0, 1, 2, 3]


위처럼 1초로 줄은 모습을 알 수 있다.

#### 2. CPU가 적거나 클러스터가 적은 상황에서는 성능 개선이 크지 않을 수 있음

In [3]:
def tiny_work(x):
    time.sleep(0.0001)
    return x

start = time.time()
results = [tiny_work(x) for x in range(1000)]
print('duration = ', time.time() - start)

duration =  15.98154592514038


In [1]:
import time
import ray

ray.init(num_cpus = 4)

2021-03-13 09:54:40,599	INFO services.py:1174 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


{'node_ip_address': '192.168.124.100',
 'raylet_ip_address': '192.168.124.100',
 'redis_address': '192.168.124.100:6379',
 'object_store_address': 'tcp://127.0.0.1:64813',
 'raylet_socket_name': 'tcp://127.0.0.1:58676',
 'webui_url': '127.0.0.1:8265',
 'session_dir': 'C:\\Users\\Public\\Documents\\ESTsoft\\CreatorTemp\\ray\\session_2021-03-13_09-54-39_988147_10036',
 'metrics_export_port': 54838,
 'node_id': '179e4096bbbd4f3c79330346a2694401a4b869d19273124cbdc4a6a4'}

In [2]:
@ray.remote
def tiny_work(x):
    time.sleep(0.0001)
    return x

start = time.time()
results_ids = [tiny_work.remote(x) for x in range(1000)]
results = ray.get(results_ids)
print('duration = ', time.time() - start)

duration =  4.029665946960449


예제에서는 ray를 사용했을 때 duration이 2배정도 증가했는데 이 노트북에서는 약 4배정도 감소한 것으로 보인다. 왤까..? 

여튼 프로그램의 속도를 증가시키려면 태스크를 좀더 크게 만들면 된다.

In [3]:
def tiny_work(x):
    time.sleep(0.0001) # replace this is with work you need to do
    return x

@ray.remote
def mega_work(start, end):
    return [tiny_work(x) for x in range(start, end)]

start = time.time()
result_ids = []
[result_ids.append(mega_work.remote(x*1000, (x+1)*1000)) for x in range(100)]
results = ray.get(result_ids)
print('duration =', time.time() - start)

duration = 398.89446926116943


빨라진건가..? 잘 모르겠다. 아무튼 문서에 따르면 ray 프로그램을 개발할 때 작업에 최소 몇 밀리초가 걸리는 것이 좋다고 한다.

#### 3. 같은 오브젝트를 원격 태스크에 반복적으로 보내지 않기
- 큰 오브젝트를 원격 함수에 전달할때, ray는 `ray.put()`을 호출해 해당 오브젝트를 로컬 오브젝트 저장소에 저장한다. 이렇게 하면 모든 로컬 태스크가 오브젝트 저장소를 공유하므로 원격 태스크가 로컬로 실행될 때 원격 태스크 호출 성능이 크게 향상된다.

- 그러나 작업 호출 시 `ray.put()`을 자동으로 호출시 성능문제가 발생하는 경우가 있다. 예를 들어, 아래 프로그램 같이 동일한 큰 개체를 반복적으로 전달하는 경우를 살펴보자.


In [5]:
import numpy as np

@ray.remote
def no_work(a):
    return

start = time.time()
a = np.zeros((5000, 5000))
result_ids = [no_work.remote(a) for x in range(10)]
results = ray.get(result_ids)
print('duration = ', time.time() - start)

duration =  0.23221206665039062


- 위는 ray를 사용하지 않고 태스크를 10번 반복했을 때 0.23초가 걸렸음을 알 수 있다. 

- 문서에서는 10개 원격 태스크만 호출하는 프로그램에 비해 실행시간이 매우 길다고 설명한다.(문서에서는 1초가 걸림) 시간이 긴 이유는 ray가 `no_work(a)`를 호출할 때 마다 `ray.put(a)`를 호출하여 어레이를 오브젝트 저장소에 복사하기 때문이다. 어레이 a에는 250만개의 항목이 있으므로 복사하는데 약간의 시간이 걸린다.

- `no_work(a)`를 호출할 때 마다 어레이를 복사하지 않으려면 아래와 같이 `ray.put(a)`를 명시적으로 호출한 다음 `no_work()`에 ID를 전달하면 된다.

In [6]:
@ray.remote
def no_work(a):
    return

start = time.time()
a_id = ray.put(np.zeros((5000, 5000)))
result_ids = [no_work.remote(a_id) for x in range(10)]
results = ray.get(result_ids)
print('duration = ', time.time() - start)

duration =  0.0520474910736084


약 4.5배 정도 빨라졌다!!

#### 4. 파이프라인 데이터 처리
- 여러 작업 결과에 `ray.get()`을 사용할 경우 각 작업이 완료될 때까지 기다려야 한다. 
    + 작업 시간이 크게 다른 경우 문제가 발생할 수 있다.

- 이 문제를 설명하기 위해, 각 작업이 0초에서 4초 사이에 균일하게 분포된 시간을 가지면서 4개의 `do_some_work()` 태스크를 병렬로 실행하는 다음 예를 살펴본다. 
- 다음으로, 이러한 작업의 결과가 `process_results()`에 의해 처리된다고 가정하면 결과당 1초가 소요
-  예상 실행 시간은 `do_some_work()` 작업을 실행하는 데 걸리는 시간 + `process_results()`를 실행하는 데 걸리는 시간(4초)

In [11]:
import time
import random
import ray

#ray.init()

In [12]:
@ray.remote
def do_some_work(x):
    time.sleep(random.uniform(0, 4))
    return 4

def process_results(results):
    sum = 0
    for x in results:
        time.sleep(1)
        sum += x
    return sum

start = time.time()
data_list = ray.get([do_some_work.remote(x) for x in range(4)])
sum = process_results(data_list)
print('duration = ', time.time() - start, '\nresult = ', sum)

duration =  5.743727922439575 
result =  16


- 실행시간은 약 6초가 걸렸다.

- 다른 태스크가 훨씬 일찍 끝났을 때 마지막 태스크가 완료될 때까지 기다리면 프로그램 실행 시간이 불필요하게 늘어난다.
- Ray는 객체 ID 목록에서 `ray.wait()`를 호출하여 데이터를 사용할 수 있는 즉시 처리한다.
    - 이 함수는 다른 매개 변수를 지정하지 않고 인수 목록의 개체가 준비되는 즉시 반환
    - 이 호출에는 (1) 준비된 개체의 ID와 (2
    ) 아직 준비되지 않은 개체의 ID를 포함하는 목록의 두 가지 반환이 있다.
- 수정된 프로그램은 아래와 같다. `process_results()`를 한 번에 하나의 결과를 처리하는 `process_incremental()`으로 대체한다.

In [13]:
@ray.remote
def do_some_work(x):
    time.sleep(random.uniform(0, 4))
    return x

def process_incremental(sum, result):
    time.sleep(1)
    return sum + result

start = time.time()
result_ids = [do_some_work.remote(x) for x in range(4)]
sum = 0
while len(result_ids):
    done_id, result_ids = ray.wait(result_ids)
    sum = process_incremental(sum, ray.get(done_id[0]))
print('duration = ', time.time() - start, '\nresult = ', sum)

duration =  5.190225839614868 
result =  6


- 실행시간이 5.74초에서 5.19초로 약간 줄었다. 

- 이해를 돕기 위해 그림 1은 `ray.get()`를 사용하여 모든 결과를 사용할 수 있을 때까지 기다릴 때와 `ray.wait()`를 사용하여 결과를 사용할 수 있을 때 처리하는 두 가지 경우 모두 실행 일정을 보여준다.

![ray](./pipeline1.png)