- https://zhuanlan.zhihu.com/p/26833089345
- `ray start --head --num-cpus=4`
    - `--num-cpus`: cpu 进程的数量

In [8]:
import os
import ray
import torch
import warnings
warnings.filterwarnings('ignore')
import time

In [6]:
ray.init(num_cpus=4)

2025-03-16 18:54:51,022	INFO worker.py:1832 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.9.0
Ray version:,2.42.0
Dashboard:,http://127.0.0.1:8265


[33m(raylet)[0m The autoscaler failed with the following error:
Terminated with signal 15
  File "/home/whaow/anaconda3/envs/verl/lib/python3.9/site-packages/ray/autoscaler/_private/monitor.py", line 719, in <module>
    monitor.run()
  File "/home/whaow/anaconda3/envs/verl/lib/python3.9/site-packages/ray/autoscaler/_private/monitor.py", line 604, in run
    self._run()
  File "/home/whaow/anaconda3/envs/verl/lib/python3.9/site-packages/ray/autoscaler/_private/monitor.py", line 458, in _run
    time.sleep(AUTOSCALER_UPDATE_INTERVAL_S)



In [10]:
@ray.remote
def example_task(x):
    time.sleep(2)
    return x * x

# 使用 ray.get 确保任务触发
results = ray.get([example_task.remote(i) for i in range(10)])
print(results)

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


In [11]:
# ray.shutdown()

### basics

- Tasks: remote (stateless) functions；Actors: remote classes
- 我们可以调用这个 Task/Actor 的 remote 方法，按照调度策略将这个 Task/Actor 分配到某个节点的进程池上运行或初始化。对于 driver 来说，分发出去的任务是异步运行的，因此还需要通过 ray.get去获取异步运行结果。
- 如下例：
    - 一个 Actor 中可以 remote 创建和调用另一个 Actor。

In [23]:
@ray.remote
class ChildActor:
    def do_work(self):
        return "Work done by child"

@ray.remote
class ParentActor:
    def create_child(self):
        self.child_actor = ChildActor.remote()

    def get_work(self):
        return ray.get(self.child_actor.do_work.remote())

In [25]:
ChildActor, ParentActor

(<__main__.ActorClass(ChildActor) at 0x7fc5e5c133a0>,
 <__main__.ActorClass(ParentActor) at 0x7fc5e5c10160>)

In [26]:
parent_actor = ParentActor.remote()

In [27]:
parent_actor

Actor(ParentActor, 4e2fbc70b725c06782600a4601000000)

In [28]:
parent_actor.create_child.remote()

ObjectRef(5d4b8d1788f12d2d4e2fbc70b725c06782600a460100000001000000)

In [29]:
ray.get(parent_actor.create_child.remote())

In [31]:
ray.get(parent_actor.get_work.remote())

'Work done by child'

### ray.remote

In [3]:
@ray.remote
class Accumulator:
    def __init__(self):
        self.value = 0
        
    def add(self, x):
        self.value += x
        
    def get_value(self):
        return self.value

In [4]:
accumulator = Accumulator.remote()

In [5]:
# Check the current value. Note that this function returns immediately and does not actually wait for the remote execution to complete.
value_ref = accumulator.get_value.remote() 
# Get the value
value = ray.get(value_ref)
print(value)

0


In [6]:
# Accumulate, then check the result.
accumulator.add.remote(10)  # Similarly, the 'add' here will return immediately.
new_value = ray.get(accumulator.get_value.remote())
print(new_value)

10


### Resource Pool and RayWorkerGroup

-  execute_all_sync function in the RayWorkerGroup to dispatch data from the **driver** to **each worker**.

In [7]:
from verl.single_controller.ray.base import RayResourcePool, RayClassWithInitArgs, RayWorkerGroup, merge_resource_pool
from verl.single_controller.base import Worker

In [8]:
resource_pool = RayResourcePool([2], use_gpu=True)

In [9]:
@ray.remote
class GPUAccumulator(Worker):

    def __init__(self) -> None:
        super().__init__()
        # The initial value of each rank is the same as the rank
        self.value = torch.zeros(size=(1,), device='cuda') + self.rank

    def add(self, x):
        self.value += x
        print(f'rank {self.rank}, value: {self.value}')
        return self.value.cpu()
        
    def get_value(self):
        return self.value

In [10]:
# Each worker's initial value is its rank, and then each rank's value is incremented by 1, so the values obtained on each rank are [1, 2, 3, 4]
class_with_args = RayClassWithInitArgs(cls=GPUAccumulator)
worker_group = RayWorkerGroup(resource_pool, class_with_args)

In [12]:
worker_group.workers

[Actor(GPUAccumulator, 2856196208ad0ee8758ea19701000000),
 Actor(GPUAccumulator, 07b0ea304ca05b56917b607e01000000)]

In [15]:
worker_group.workers

[Actor(GPUAccumulator, 2856196208ad0ee8758ea19701000000),
 Actor(GPUAccumulator, 07b0ea304ca05b56917b607e01000000)]

In [16]:
# 0 + 1 => 1
# 1 + 3 => 4
print(worker_group.execute_all_sync('add', x=[1, 3]))

[36m(GPUAccumulator pid=1172476)[0m rank 0, value: tensor([1.], device='cuda:0')
[tensor([1.]), tensor([4.])]
[36m(GPUAccumulator pid=1172642)[0m rank 1, value: tensor([4.], device='cuda:0')


In [17]:
worker_group.world_size

2

### GPU Resource Sharing

### Data Dispatch, Execution and Collection

In [18]:
from verl.single_controller.base.decorator import register, Dispatch, Execute

In [19]:
@ray.remote
class GPUAccumulatorDecorator(Worker):

    def __init__(self) -> None:
        super().__init__()
        # The initial value of each rank is the same as the rank
        self.value = torch.zeros(size=(1,), device='cuda') + self.rank
    
    # map from a single input to all the worker
    @register(Dispatch.ONE_TO_ALL)
    def add(self, x):
        print(x)
        self.value = self.value + x
        print(f'rank {self.rank}, value: {self.value}')
        return self.value.cpu()

In [20]:
class_with_args = RayClassWithInitArgs(cls=GPUAccumulatorDecorator)
gpu_accumulator_decorator = RayWorkerGroup(resource_pool, class_with_args)

In [21]:
gpu_accumulator_decorator.add(10)

[36m(GPUAccumulatorDecorator pid=1173038)[0m 10
[36m(GPUAccumulatorDecorator pid=1173188)[0m 10


[tensor([10.]), tensor([11.])]

[36m(GPUAccumulatorDecorator pid=1173038)[0m rank 0, value: tensor([10.], device='cuda:0')


In [22]:
from verl.single_controller.base.decorator import register, Dispatch, collect_all_to_all, Execute