- https://github.com/volcengine/verl/blob/main/examples/ray/tutorial.ipynb

In [1]:
import os
import ray
import torch
import warnings
warnings.filterwarnings('ignore')

## Chapter 1: Ray Basics

In [2]:
# Build a local ray cluster. The head node and worker node are on this machine
ray.init()

2025-02-13 19:05:33,755	INFO worker.py:1779 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.10.13
Ray version:,2.32.0
Dashboard:,http://127.0.0.1:8265


In [3]:
ray.available_resources()

{'accelerator_type:G': 1.0,
 'node:__internal_head__': 1.0,
 'node:192.168.101.16': 1.0,
 'CPU': 64.0,
 'memory': 74039068468.0,
 'object_store_memory': 36016743628.0,
 'GPU': 2.0}

In [4]:
@ray.remote
class Accumulator:
    def __init__(self):
        self.value = 0
        
    def add(self, x):
        self.value += x
        
    def get_value(self):
        return self.value

In [5]:
accumulator = Accumulator.remote()

In [6]:
value_ref = accumulator.get_value.remote() # Check the current value. Note that this function returns immediately and does not actually wait for the remote execution to complete.
# Get the value
value = ray.get(value_ref)
print(value)

0


## Chapter 2: Resource Pool and RayWorkerGroup

- The principle of **parameter passing**:
    - The input parameter is a list of length world_size, where each element in the list is dispatched respectively to each worker in the RayWorkerGroup.
    - The return parameter is also a list, corresponding to the return value of each worker.

In [7]:
from verl.single_controller.ray.base import RayResourcePool, RayClassWithInitArgs, RayWorkerGroup, merge_resource_pool
from verl.single_controller.base import Worker

In [8]:
resource_pool = RayResourcePool([2], use_gpu=True)

In [9]:
@ray.remote
class GPUAccumulator(Worker):

    def __init__(self) -> None:
        super().__init__()
        # The initial value of each rank is the same as the rank
        self.value = torch.zeros(size=(1,), device='cuda') + self.rank

    def add(self, x):
        self.value += x
        print(f'rank {self.rank}, value: {self.value}')
        return self.value.cpu()

In [10]:
class_with_args = RayClassWithInitArgs(cls=GPUAccumulator)

In [11]:
worker_group = RayWorkerGroup(resource_pool, class_with_args)

In [12]:
print(worker_group.execute_all_sync('add', x=[1, 2]))

[36m(GPUAccumulator pid=164143)[0m rank 0, value: tensor([1.], device='cuda:0')
[tensor([1.]), tensor([3.])]


### GPU Resource Sharing

In [13]:
@ray.remote
class CPUAccumulator(Worker):
    def __init__(self) -> None:
        super().__init__()
        self.value = torch.zeros(size=(1,)) + self.rank
    def add(self, x):
        self.value += x
        print(f'rank {self.rank}, value: {self.value}')
        return self.value

In [14]:
# Create a new resource pool and then merge the newly created resource pool with the previous one.
resource_pool_cpu_1 = RayResourcePool([4], use_gpu=False, name_prefix='a')
resource_pool_cpu_2 = RayResourcePool([4], use_gpu=False, name_prefix='b')
resource_pool_merge = merge_resource_pool(resource_pool_cpu_1, resource_pool_cpu_2)

[36m(GPUAccumulator pid=165267)[0m rank 1, value: tensor([3.], device='cuda:0')


In [15]:
class_with_args_cpu = RayClassWithInitArgs(cls=CPUAccumulator)

In [16]:
# Establish a RayWorkerGroup on the newly created resource pool.
worker_group_1 = RayWorkerGroup(resource_pool_cpu_1, class_with_args_cpu)
worker_group_merge = RayWorkerGroup(resource_pool_merge, class_with_args_cpu)

In [17]:
output_1 = worker_group_1.execute_all_sync('add', x=[2, 2, 2, 2])
output_1

[36m(CPUAccumulator pid=165418)[0m rank 0, value: tensor([2.])


[tensor([2.]), tensor([3.]), tensor([4.]), tensor([5.])]

In [18]:
output_merge = worker_group_merge.execute_all_sync('add', x=[3,3,3,3,3,3,3,3])

[36m(CPUAccumulator pid=165641)[0m rank 0, value: tensor([3.])
[36m(CPUAccumulator pid=166079)[0m rank 7, value: tensor([10.])


In [19]:
output_merge

[tensor([3.]),
 tensor([4.]),
 tensor([5.]),
 tensor([6.]),
 tensor([7.]),
 tensor([8.]),
 tensor([9.]),
 tensor([10.])]

In [20]:
worker_group.world_size, worker_group_1.world_size, worker_group_merge.world_size

(2, 4, 8)

## Chapter 3: Data Dispatch, Execution and Collection

In the above example, we used the `execute_all_sync` function in the RayWorkerGroup to dispatch data from the driver to each worker. This is very inconvenient for coding. In this chapter, we use the form of function decorators to allow RayWorkerGroup to directly call functions written in the Worker, and to greatly **simplify parameter passing**.

In [21]:
from verl.single_controller.base.decorator import register, Dispatch, Execute

In [22]:
@ray.remote
class GPUAccumulatorDecorator(Worker):

    def __init__(self) -> None:
        super().__init__()
        # The initial value of each rank is the same as the rank
        self.value = torch.zeros(size=(1,), device='cuda') + self.rank
    
    # map from a single input to all the worker
    @register(Dispatch.ONE_TO_ALL)
    def add(self, x):
        print(x)
        self.value = self.value + x
        print(f'rank {self.rank}, value: {self.value}')
        return self.value.cpu()

In [23]:
class_with_args = RayClassWithInitArgs(cls=GPUAccumulatorDecorator)
gpu_accumulator_decorator = RayWorkerGroup(resource_pool, class_with_args)

In [24]:
print(gpu_accumulator_decorator.add(x=10))

[36m(GPUAccumulatorDecorator pid=171441)[0m 10
[36m(GPUAccumulatorDecorator pid=171441)[0m rank 0, value: tensor([10.], device='cuda:0')
[36m(GPUAccumulatorDecorator pid=171663)[0m 10
[tensor([10.]), tensor([11.])]
[36m(GPUAccumulatorDecorator pid=171663)[0m rank 1, value: tensor([11.], device='cuda:0')
