<a href="https://colab.research.google.com/github/dk-wei/python-multiprocessing/blob/main/Python_Multiprocessing_Ray.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

参考文档：[Modern Parallel and Distributed Python: A Quick Tutorial on Ray](https://towardsdatascience.com/modern-parallel-and-distributed-python-a-quick-tutorial-on-ray-99f8d70369b8#_=_)

In [23]:
#!pip install ray[default]

In [24]:
#!pip install pydantic 
#!pip install starlette
#!pip install uvicorn

In [30]:
import multiprocessing as mp
from tqdm.notebook import tqdm
import ray
import time

# Parallelism with Tasks

## Independent Task

To turn a Python function `f` into a “remote function” (a function that can be executed remotely and asynchronously), we declare the function with the `@ray.remote` decorator. Then function invocations via `f.remote()` will immediately return futures (a future is a reference to the eventual output), and the actual function execution will take place in the background (we refer to this execution as a **task**).

先将list进行chuck，再进行parallel computing

In [31]:
org_list = [3,4,5,6,5,4,3,2,1,3,4,4,3,2,4,6,2,1,32,4,55,3,2,1,4,6,78,8,5,6,0,9,8,6,3,5,6,7,7,8,8,4,2]

def split(a, n):
  '''
  a: 需要chunk的large list
  n: 希望chunk出多少个sublist
  '''
  k, m = divmod(len(a), n)
  split_data = [a[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(n)]

  # 没办法，我们得在每个sublist前面做好index的标记，最后拼接result的时候会用到
  split_data_order_number = [[i, v] for i, v in enumerate(split_data)]

  return split_data_order_number
        
sub_list = split(org_list,6)

sub_list

[[0, [3, 4, 5, 6, 5, 4, 3, 2]],
 [1, [1, 3, 4, 4, 3, 2, 4]],
 [2, [6, 2, 1, 32, 4, 55, 3]],
 [3, [2, 1, 4, 6, 78, 8, 5]],
 [4, [6, 0, 9, 8, 6, 3, 5]],
 [5, [6, 7, 7, 8, 8, 4, 2]]]

In [36]:
%%time

# Start Ray.
ray.init(num_cpus=8, ignore_reinit_error=True)  # 设置进程数

@ray.remote
def f(x):
  #time.sleep(1)
  index = x[0]
  value = x[1]
  init_value = 0
  for item in value:
    init_value += item

  return index, init_value

# Start 4 tasks in parallel.
result_ids = []

for i in sub_list:
  i = ray.put(i)    # 可以把substring放入put中，ray会给每个数据一个编号，这样就不用频繁调用原数据了
  print(i)
  result_ids.append(f.remote(i))

2021-05-07 23:28:15,897	INFO services.py:1269 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


ObjectRef(ffffffffffffffffffffffffffffffffffffffff010000001f000000)
ObjectRef(ffffffffffffffffffffffffffffffffffffffff0100000020000000)
ObjectRef(ffffffffffffffffffffffffffffffffffffffff0100000021000000)
ObjectRef(ffffffffffffffffffffffffffffffffffffffff0100000022000000)
ObjectRef(ffffffffffffffffffffffffffffffffffffffff0100000023000000)
ObjectRef(ffffffffffffffffffffffffffffffffffffffff0100000024000000)
CPU times: user 60.9 ms, sys: 55.4 ms, total: 116 ms
Wall time: 3.59 s


In [37]:
# 所有的子运算被放在了result_ids中
result_ids

[ObjectRef(d3ab1aaf09c00ee8ffffffffffffffffffffffff0100000001000000),
 ObjectRef(b9679322487eb9c3ffffffffffffffffffffffff0100000001000000),
 ObjectRef(fc8204747604c8ccffffffffffffffffffffffff0100000001000000),
 ObjectRef(9d78bd90898368caffffffffffffffffffffffff0100000001000000),
 ObjectRef(36e14f5d7a18682effffffffffffffffffffffff0100000001000000),
 ObjectRef(48b7ee15cf1ae472ffffffffffffffffffffffff0100000001000000)]

In [38]:
# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores, this will take 1 second.
# 只有到get这步才会开始运算
results = ray.get(result_ids)  # [0, 1, 2, 3]

results

[(0, 32), (1, 21), (2, 103), (3, 104), (4, 37), (5, 42)]

In [35]:
ray.shutdown()

从代码上看，远比python原生`Multiprocessing`模块要简单，而且速度会更快。

## Mutiple Function

```python
import ray

ray.init()

# Define functions you want to execute in parallel using 
# the ray.remote decorator.
@ray.remote
def func1():
    #does something

@ray.remote
def func2():
    #does something

# Execute func1 and func2 in parallel.
ray.get([func1.remote(), func2.remote()])

```

If `func1()` and `func2()` return results, you need to rewrite the code as follows:

```
# 不使用@ray.remote装饰器也是可以的
ret_id1 = func1.remote()
ret_id2 = func1.remote()
ret1, ret2 = ray.get([ret_id1, ret_id2])
```

如果是多个function作用在同样的数据上面，可以把数据放入`ray.put()`中
```python
largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])
```

## Task Dependencies

Tasks can also depend on other tasks. Below, the `multiply_matrices` task uses the outputs of the two `create_matrix` tasks, so it will not begin executing until after the first two tasks have executed. The outputs of the first two tasks will automatically be passed as arguments into the third task and the futures will be replaced with their corresponding values). In this manner, tasks can be composed together with arbitrary DAG dependencies.



几个function之间存在依赖关系

In [None]:
import numpy as np

@ray.remote
def create_matrix(size):
    return np.random.normal(size=size)

@ray.remote
def multiply_matrices(x, y):
    return np.dot(x, y)

x_id = create_matrix.remote([1000, 1000])
y_id = create_matrix.remote([1000, 1000])
z_id = multiply_matrices.remote(x_id, y_id)

# Get the results.
z = ray.get(z_id)

## Aggregating Values Efficiently

Task dependencies can be used in much more sophisticated ways. For example, suppose we wish to aggregate 8 values together. This example uses integer addition, but in many applications, aggregating large vectors across multiple machines can be a bottleneck. In this case, changing a single line of code can change the aggregation’s running time from linear to logarithmic in the number of values being aggregated.

![](https://miro.medium.com/max/1750/1*vHz3troEmr4uLns0V8VmdA.jpeg)

As described above, to feed the output of one task as an input into a subsequent task, simply pass the future returned by the first task as an argument into the second task. This task dependency will automatically be taken into account by Ray’s scheduler. The second task will not execute until the first task has finished, and the output of the first task will automatically be shipped to the machine on which the second task is executing.

In [None]:
import time

@ray.remote
def add(x, y):
    time.sleep(1)
    return x + y

# Aggregate the values slowly. This approach takes O(n) where n is the
# number of values being aggregated. In this case, 7 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(id1, 3)
id3 = add.remote(id2, 4)
id4 = add.remote(id3, 5)
id5 = add.remote(id4, 6)
id6 = add.remote(id5, 7)
id7 = add.remote(id6, 8)
result = ray.get(id7)

# Aggregate the values in a tree-structured pattern. This approach
# takes O(log(n)). In this case, 3 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(3, 4)
id3 = add.remote(5, 6)
id4 = add.remote(7, 8)
id5 = add.remote(id1, id2)
id6 = add.remote(id3, id4)
id7 = add.remote(id5, id6)
result = ray.get(id7)

The above code is very explicit, but note that both approaches can be implemented in a more concise fashion using while loops.

In [None]:
# Slow approach.
values = [1, 2, 3, 4, 5, 6, 7, 8]
while len(values) > 1:
    values = [add.remote(values[0], values[1])] + values[2:]
result = ray.get(values[0])


# Fast approach.
values = [1, 2, 3, 4, 5, 6, 7, 8]
while len(values) > 1:
    values = values[2:] + [add.remote(values[0], values[1])]
result = ray.get(values[0])

# From Classes to Actors

如果想写程序，全部放入`Class`，可以用`Actors`

In [None]:
ray.shutdown()

In [None]:
import ray
ray.init()

@ray.remote
class Counter(object):
    def __init__(self):
        self.n = 0

    def increment(self):
        self.n += 1

    def read(self):
        return self.n

counters = [Counter.remote() for i in range(4)]
[c.increment.remote() for c in counters]
futures = [c.read.remote() for c in counters]
print(ray.get(futures))

2021-05-07 03:10:02,370	INFO services.py:1269 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


[1, 1, 1, 1]


In [None]:
@ray.remote
class Counter(object):
    def __init__(self):
        self.x = 0
    
    def inc(self):
        self.x += 1
    
    def get_value(self):
        return self.x

# Create an actor process.
c = Counter.remote()

# Check the actor's counter value.
print(ray.get(c.get_value.remote()))  # 0

# Increment the counter twice and check the value again.
c.inc.remote()
c.inc.remote()
print(ray.get(c.get_value.remote()))  # 2

0
2


## Actor Handles

In [None]:
import time


@ray.remote
class MessageActor(object):
    def __init__(self):
        self.messages = []
    
    def add_message(self, message):
        self.messages.append(message)
    
    def get_and_clear_messages(self):
        messages = self.messages
        self.messages = []
        return messages


# Define a remote function which loops around and pushes
# messages to the actor.
@ray.remote
def worker(message_actor, j):
    for i in range(100):
        time.sleep(1)
        message_actor.add_message.remote(
            "Message {} from worker {}.".format(i, j))


# Create a message actor.
message_actor = MessageActor.remote()

# Start 3 tasks that push messages to the actor.
[worker.remote(message_actor, j) for j in range(3)]

# Periodically get the messages and print them.
for _ in range(10):
    new_messages = ray.get(message_actor.get_and_clear_messages.remote())
    print("New messages:", new_messages)
    time.sleep(1)

# This script prints something like the following:
# New messages: []
# New messages: ['Message 0 from worker 1.', 'Message 0 from worker 0.']
# New messages: ['Message 0 from worker 2.', 'Message 1 from worker 1.', 'Message 1 from worker 0.', 'Message 1 from worker 2.']
# New messages: ['Message 2 from worker 1.', 'Message 2 from worker 0.', 'Message 2 from worker 2.']
# New messages: ['Message 3 from worker 2.', 'Message 3 from worker 1.', 'Message 3 from worker 0.']
# New messages: ['Message 4 from worker 2.', 'Message 4 from worker 0.', 'Message 4 from worker 1.']
# New messages: ['Message 5 from worker 2.', 'Message 5 from worker 0.', 'Message 5 from worker 1.']



New messages: []
New messages: ['Message 0 from worker 0.', 'Message 0 from worker 1.', 'Message 0 from worker 2.']
New messages: ['Message 1 from worker 0.', 'Message 1 from worker 1.', 'Message 1 from worker 2.']
New messages: ['Message 2 from worker 0.', 'Message 2 from worker 1.', 'Message 2 from worker 2.']
New messages: ['Message 3 from worker 0.', 'Message 3 from worker 1.', 'Message 3 from worker 2.']
New messages: ['Message 4 from worker 0.', 'Message 4 from worker 1.', 'Message 4 from worker 2.']
New messages: ['Message 5 from worker 0.', 'Message 5 from worker 1.', 'Message 5 from worker 2.']
New messages: ['Message 6 from worker 0.', 'Message 6 from worker 1.', 'Message 6 from worker 2.']
New messages: ['Message 7 from worker 0.', 'Message 7 from worker 1.', 'Message 7 from worker 2.']
New messages: ['Message 8 from worker 0.', 'Message 8 from worker 1.', 'Message 8 from worker 2.']


Actors are extremely powerful. They allow you to take a Python class and instantiate it as a microservice which can be queried from other actors and tasks and even other applications.

# ML Model Deployment

In [None]:
from ray import serve
import pickle
import requests
from sklearn.datasets import load_iris
from sklearn.ensemble import GradientBoostingClassifier

# Train model
iris_dataset = load_iris()
model = GradientBoostingClassifier()
model.fit(iris_dataset["data"], iris_dataset["target"])

# Define Ray Serve model,
class BoostingModel:
    def __init__(self):
        self.model = model
        self.label_list = iris_dataset["target_names"].tolist()

    def __call__(self, flask_request):
        payload = flask_request.json["vector"]
        print("Worker: received flask request with data", payload)

        prediction = self.model.predict([payload])[0]
        human_name = self.label_list[prediction]
        return {"result": human_name}


# Deploy model
# client = serve.start()
# client.create_backend("iris:v1", BoostingModel)
# client.create_endpoint("iris_classifier", backend="iris:v1", route="/iris")

# Query it!
#sample_request_input = {"vector": [1.2, 1.0, 1.1, 0.9]}
#response = requests.get("http://localhost:8000/iris", json=sample_request_input)
#print(response.text)