<a href="https://colab.research.google.com/github/goya5858/OSERO/blob/main/Test_Ray.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [25]:
!pip install -q -q -q -U 'ray[default]' 

# 関数を並列実行

In [1]:
import time

def worker_func(pid):
    time.sleep(5)
    return f"pid {pid} finished"

start = time.time()
results = [worker_func(i) for i in range(3)]
print(results)
print('Elapsed:', time.time()-start)

['pid 0 finished', 'pid 1 finished', 'pid 2 finished']
Elapsed: 15.014969825744629


# 分散同期処理
関数を並列で実行し、全ての処理が終わったのちに結果をまとめて出力する

In [5]:
import ray
import time

@ray.remote
def worker_func(pid):
    time.sleep(5)
    return f"pid {pid} finished"

ray.init(ignore_reinit_error=True)
start = time.time()
results = [worker_func.remote(i) for i in range(3)]
#time.sleep(15)
print(results)
print("途中：", time.time() - start)
print( ray.get(results) )
print('Elapsed:', time.time()-start)

# ray.get()により "同期処理" を実行することができる
# remote自体を保管した result 自体をprintしても保管されたオブジェクトのみが返される
# ray.get()を使うことでそれぞれのremoteの出力を入手できる
# なお、ray.get()を使わなくても処理自体は進行しているらしい

2021-06-12 16:26:57,227	INFO services.py:1274 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


[ObjectRef(a67dc375e60ddd1affffffffffffffffffffffff0100000001000000), ObjectRef(63964fa4841d4a2effffffffffffffffffffffff0100000001000000), ObjectRef(69a6825d641b4613ffffffffffffffffffffffff0100000001000000)]
途中： 0.05024313926696777
['pid 0 finished', 'pid 1 finished', 'pid 2 finished']
Elapsed: 10.103117942810059


# # 非同期分散処理
関数を並列で実行し、処理が終わった順に結果を出力していく

In [6]:
import ray
import time
import random

@ray.remote
def worker_func(pid):
    time.sleep(random.randint(3,15))
    return f"pid {pid} finished"

ray.init(ignore_reinit_error=True)
start = time.time()
work_in_progresses = [worker_func.remote(i) for i in range(10)]

for i in range(10):
    finished, work_in_progresses = ray.wait(work_in_progresses, num_returns=1)
    print(finished)
    orf = finished

    print(ray.get(orf))
    print(ray.get(orf[0]))
    print("Elapsed:", time.time()-start, "\n")

# ray.waint()で "非同期処理" ができる
# ObjectRef, work_in_progresses = ray.wait(work_in_progresses)
# で 今回終わった処理の結果を内包した"ObjectRef" と 残りの実行中の処理:work_in_progresses を入手できる
# あとは同期処理と同様に、ray.get(ObjectRef)で実際の返り値を入手できる
# なお、返り値が複数ある場合もあるので、orf[0]のように中身を取り出す

2021-06-12 16:27:13,545	INFO worker.py:737 -- Calling ray.init() again after it has already been called.


[ObjectRef(ee4e90da584ab0ebffffffffffffffffffffffff0100000001000000)]
['pid 0 finished']
pid 0 finished
Elapsed: 3.0156149864196777 

[ObjectRef(4ee449587774c1f0ffffffffffffffffffffffff0100000001000000)]
['pid 1 finished']
pid 1 finished
Elapsed: 4.016332149505615 

[ObjectRef(480a853c2c4c6f27ffffffffffffffffffffffff0100000001000000)]
['pid 3 finished']
pid 3 finished
Elapsed: 15.028555870056152 

[ObjectRef(32cccd03c567a254ffffffffffffffffffffffff0100000001000000)]
['pid 2 finished']
pid 2 finished
Elapsed: 16.02357506752014 

[ObjectRef(623b26bdd75b28e9ffffffffffffffffffffffff0100000001000000)]
['pid 4 finished']
pid 4 finished
Elapsed: 18.035106420516968 

[ObjectRef(1e9d04d3b7e4dfb2ffffffffffffffffffffffff0100000001000000)]
['pid 5 finished']
pid 5 finished
Elapsed: 30.038304567337036 

[ObjectRef(609d7f556b6757adffffffffffffffffffffffff0100000001000000)]
['pid 6 finished']
pid 6 finished
Elapsed: 33.05211091041565 

[ObjectRef(402ddcfdf56ca87affffffffffffffffffffffff01000000010000

# クラスを利用したクラス単位でのサブプロセス化

In [16]:
import ray
import random
import time

@ray.remote
class Worker:
    def __init__(self, worker_id):
        self.worker_id = worker_id
        self.n = 0 #オブジェクトの保存する変数
    def add(self, n):
        #5秒間待ってからオブジェクトの保存しているself.nに数字を足す作業を行う
        time.sleep(self.worker_id)
        self.n += n
    def get_value(self):
        return f"Process: {self.worker_id}, value: {self.n}"




ray.init(ignore_reinit_error=True)
start = time.time()

workers = [ Worker.remote(i) for i in range(5) ] 
# それぞれ5秒待ってからself.nに5を足すタスクを実行させる
for worker in workers:
    worker.add.remote(5)

for worker in workers:
    print(ray.get( worker.get_value.remote() ))
#workers = ray.get( workers )
print("Elapsed:", time.time()-start)

# それぞれ5秒待ってからself.nに10を足すタスクの実行命令を出す
for worker in workers:
    worker.add.remote(10)

for i, worker in enumerate(workers):
    print(f"start_num_{i}")
    print(ray.get( worker.get_value.remote() ))
#workers = ray.get( workers )
print("Elapsed:", time.time()-start)

# Classを用いることで内部情報を保存した実行を行うことができる
# @ray.remoteで修飾されたクラスを"Actor"といい、各Actorにはremoteメソッドを使うことで処理の実行開始命令を行うことができる
# remoteメソッドで行うのは処理の実行開始命令だけで、命令を出す順番はfor文等に依存する
# ray.get( worker.get_value.remote() )では各プロセスが終了すれば返り値をくれる

2021-06-12 16:45:52,021	INFO worker.py:737 -- Calling ray.init() again after it has already been called.


Process: 0, value: 5
Process: 1, value: 5
Process: 2, value: 5
Process: 3, value: 5
Process: 4, value: 5
Elapsed: 6.406382322311401
start_num_0
Process: 0, value: 15
start_num_1
Process: 1, value: 15
start_num_2
Process: 2, value: 15
start_num_3
Process: 3, value: 15
start_num_4
Process: 4, value: 15
Elapsed: 10.412129878997803


# クラスでの分散同期処理

In [20]:
import ray
import random
import time

@ray.remote
class Worker:
    def __init__(self, worker_id):
        self.worker_id = worker_id
        self.n = 0 #オブジェクトの保存する変数
    def add(self, n):
        #5秒間待ってからオブジェクトの保存しているself.nに数字を足す作業を行う
        time.sleep(self.worker_id)
        self.n += n
    def get_value(self):
        return f"Process: {self.worker_id}, value: {self.n}"




ray.init(ignore_reinit_error=True)
start = time.time()

workers = [ Worker.remote(i) for i in range(5) ] 
# それぞれ5秒待ってからself.nに5を足すタスクを実行させる
for worker in workers:
    worker.add.remote(5)

# それぞれ5秒待ってからself.nに10を足すタスクの実行命令を出す
for worker in workers:
    worker.add.remote(10)

results = [ worker.get_value.remote() for worker in workers ]
results = ray.get( results )
print(results)
print("Elapsed:", time.time()-start)

# results = [ worker.get_value.remote() for worker in workers ]
# でObjectRefのリストを作成している
# ray.get()ではActor自身は引数にできず、ORefを対象とする
# results = ray.get( results )で終了を同期している

2021-06-12 16:54:53,369	INFO worker.py:737 -- Calling ray.init() again after it has already been called.


['Process: 0, value: 15', 'Process: 1, value: 15', 'Process: 2, value: 15', 'Process: 3, value: 15', 'Process: 4, value: 15']
Elapsed: 10.832329750061035


# クラスでの分散非同期

In [24]:
import ray
import random
import time

@ray.remote
class Worker:
    def __init__(self, worker_id):
        self.worker_id = worker_id
        self.n = 0 #オブジェクトの保存する変数
    def add(self, n):
        #5秒間待ってからオブジェクトの保存しているself.nに数字を足す作業を行う
        time.sleep(random.randint(1,10))
        self.n += n
    def get_value(self):
        return f"Process: {self.worker_id}, value: {self.n}"




ray.init(ignore_reinit_error=True)
start = time.time()

workers = [ Worker.remote(i) for i in range(5) ] 
# それぞれ5秒待ってからself.nに5を足すタスクを実行させる
for worker in workers:
    worker.add.remote(5)

# それぞれ5秒待ってからself.nに10を足すタスクの実行命令を出す
for worker in workers:
    worker.add.remote(10)

work_in_progresses = [ worker.get_value.remote() for worker in workers ]

for _ in range( len(work_in_progresses) ):
    finished, work_in_progresses = ray.wait(work_in_progresses, num_returns=1)
    orf = finished
    print(ray.get(orf[0]))
    print("Elapsed:", time.time()-start, "\n")

# work_in_progresses = [ worker.get_value.remote() for worker in workers ]
# で実行中のObjectRefのリストを作成している
# あとは実行中の処理が終わるたびに結果と残りの処理を受け取っていく

2021-06-12 17:03:17,712	INFO worker.py:737 -- Calling ray.init() again after it has already been called.


Process: 1, value: 15
Elapsed: 7.995789289474487 

Process: 3, value: 15
Elapsed: 9.906349420547485 

Process: 0, value: 15
Elapsed: 9.934542655944824 

Process: 4, value: 15
Elapsed: 11.634456396102905 

Process: 2, value: 15
Elapsed: 20.9136381149292 

