# VeRL Ray API Tutorial

## Chapter 1: Ray Basics

In [1]:
!pip install ray -Uq

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m70.1/70.1 MB[0m [31m12.3 MB/s[0m eta [36m0:00:00[0m
[?25h

In [1]:
import os

In [2]:
import warnings

import ray
import torch

warnings.filterwarnings("ignore")

In [18]:
# Build a local ray cluster. The head node and worker node are on this machine
ray.init(num_cpus=3, num_gpus=1)

2025-09-10 15:07:34,492	INFO worker.py:1951 -- Started a local Ray instance.


0,1
Python version:,3.12.11
Ray version:,2.49.1


Implement an Accumulator class.

In [4]:
@ray.remote(memory=500 * 1024 * 1024)
class Accumulator:
    def __init__(self):
        self.value = 0

    def add(self, x):
        self.value += x

    def get_value(self):
        return self.value

In [5]:
# Instantiate an accumulator. Accumulator can be viewed as a process, acting as an RPC service.
accumulator = Accumulator.remote()

In [6]:
value_ref = accumulator.get_value.remote()  # Check the current value. Note that this function returns immediately and does not actually wait for the remote execution to complete.
# Get the value
value = ray.get(value_ref)
print(value)

0


In [17]:
ray.shutdown()

[36m(MapWorker(MapBatches(EmbedImages)) pid=20933)[0m 2025-09-10 15:07:22.494588: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
[36m(MapWorker(MapBatches(EmbedImages)) pid=20933)[0m E0000 00:00:1757516842.551957   20933 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
[36m(MapWorker(MapBatches(EmbedImages)) pid=20933)[0m E0000 00:00:1757516842.569632   20933 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
[36m(MapWorker(MapBatches(EmbedImages)) pid=20933)[0m W0000 00:00:1757516842.637188   20933 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.[32m [repeated 4x across cluster][0m


In [7]:
# Accumulate, then check the result.
accumulator.add.remote(10000)  # Similarly, the 'add' here will return immediately.
new_value = ray.get(accumulator.get_value.remote())
print(new_value)

10000


In [19]:
!ray status

2025-09-10 15:07:49,209 - INFO - NumExpr defaulting to 2 threads.
Node status
---------------------------------------------------------------
Active:
 1 node_d36a56d9d43b382a13988f2f2c1f5a9a8b43d359112f5e964f1c3463
Pending:
 (no pending nodes)
Recent failures:
 (no failures)

Resources
---------------------------------------------------------------
Total Usage:
 0.0/3.0 CPU
 0.0/1.0 GPU
 0B/8.61GiB memory
 0B/3.69GiB object_store_memory

Total Constraints:
 (no request_resources() constraints)
Total Demands:
 (no resource demands)
[0m

In [20]:
ds = ray.data.read_images(
    "s3://doggos-dataset/train",
    include_paths=True,
    shuffle="files",
)
# ds.take(1)

In [4]:
def add_class(row):
    row["class"] = row["path"].rsplit("/", 3)[-2]
    return row

  # Add class.
ds = ds.map(add_class,
    num_cpus=1,
    num_gpus=0,
    concurrency=4)

In [21]:
import numpy as np
from PIL import Image
import torch
from transformers import CLIPModel, CLIPProcessor

class EmbedImages(object):
    def __init__(self, model_id, device):
        # Load CLIP model and processor
        self.processor = CLIPProcessor.from_pretrained(model_id)
        self.model = CLIPModel.from_pretrained(model_id)
        print("load model success.")
        self.model.to(device)
        self.device = device

    def __call__(self, batch):
        # Load and preprocess images
        images = [Image.fromarray(np.uint8(img)).convert("RGB") for img in batch["image"]]
        inputs = self.processor(images=images, return_tensors="pt", padding=True).to(self.device)

        # Generate embeddings
        with torch.inference_mode():
            batch["embedding"] = self.model.get_image_features(**inputs).cpu().numpy()

        return batch

In [24]:
# Generate batch embeddings
embeddings_ds = ds.map_batches(
    EmbedImages,
    fn_constructor_kwargs={
        "model_id": "openai/clip-vit-base-patch32",
        "device": "cuda",
    },  # class kwargs
    fn_kwargs={},  # __call__ kwargs
    concurrency=4,
    batch_size=64,
    num_gpus=1,
    accelerator_type="T4",
)
embeddings_ds = embeddings_ds.drop_columns(["image"])  # remove image column


In [25]:
embeddings_ds.take(1)

2025-09-10 15:08:50,583	INFO logging.py:295 -- Registered dataset logger for dataset dataset_6_0
2025-09-10 15:08:50,600	INFO streaming_executor.py:159 -- Starting execution of Dataset dataset_6_0. Full logs are in /tmp/ray/session_2025-09-10_15-07-29_606331_17896/logs/ray-data
2025-09-10 15:08:50,601	INFO streaming_executor.py:160 -- Execution plan of Dataset dataset_6_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadImage] -> ActorPoolMapOperator[MapBatches(EmbedImages)] -> TaskPoolMapOperator[MapBatches(drop_columns)] -> LimitOperator[limit=1]


Running 0: 0.00 row [00:00, ? row/s]

- ReadImage 1: 0.00 row [00:00, ? row/s]

- MapBatches(EmbedImages) 2: 0.00 row [00:00, ? row/s]

- MapBatches(drop_columns) 3: 0.00 row [00:00, ? row/s]

- limit=1 4: 0.00 row [00:00, ? row/s]

[36m(MapWorker(MapBatches(EmbedImages)) pid=21828)[0m 2025-09-10 15:09:02.013601: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
[36m(MapWorker(MapBatches(EmbedImages)) pid=21828)[0m E0000 00:00:1757516942.049603   21828 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
[36m(MapWorker(MapBatches(EmbedImages)) pid=21828)[0m E0000 00:00:1757516942.060645   21828 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
[36m(MapWorker(MapBatches(EmbedImages)) pid=21828)[0m W0000 00:00:1757516942.087773   21828 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
[36m(MapWorker(MapBatches(EmbedImages

[36m(MapWorker(MapBatches(EmbedImages)) pid=21828)[0m load model success.


[36m(MapWorker(MapBatches(EmbedImages)) pid=21828)[0m Exception raised in creation task: The actor died because of an error raised in its creation task, [36mray::MapWorker(MapBatches(EmbedImages)).__init__()[39m (pid=21828, ip=172.28.0.12, actor_id=a4e293550eb13fa44d50e03501000000, repr=MapWorker(MapBatches(EmbedImages)))
[36m(MapWorker(MapBatches(EmbedImages)) pid=21828)[0m            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[36m(MapWorker(MapBatches(EmbedImages)) pid=21828)[0m            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[36m(MapWorker(MapBatches(EmbedImages)) pid=21828)[0m   File "/usr/local/lib/python3.12/dist-packages/ray/data/_internal/execution/operators/actor_pool_map_operator.py", line 519, in __init__
[36m(MapWorker(MapBatches(EmbedImages)) pid=21828)[0m     self._map_transformer.init()
[36m(MapWorker(MapBatches(EmbedImages)) pid=21828)[0m   File "/usr/local/lib/python3.12/dist-packages/ray/data/_internal/execution/operators/map_transformer.py", line 202, in init
[36

[{'path': 'doggos-dataset/train/miniature_poodle/miniature_poodle_578.jpg',
  'embedding': array([ 2.42514044e-01,  4.79291752e-03, -2.53213882e-01, -1.38595656e-01,
          1.18952475e-01, -7.33686328e-01,  8.69309306e-01,  3.81725520e-01,
          4.21172976e-01,  1.50552899e-01, -1.65305138e-01,  4.52316701e-01,
          4.23103034e-01, -1.36677727e-01,  2.15538830e-01,  5.10052025e-01,
          1.13740516e+00, -2.98703879e-01,  1.67848662e-01, -1.52105302e-01,
         -5.85613668e-01, -5.45159094e-02,  6.17732048e-01, -5.20813346e-01,
          7.73368403e-02,  1.31762996e-01,  5.61286151e-01, -9.57396440e-03,
          1.33864611e-01, -1.70108750e-01,  1.54682606e-01,  2.26547718e-01,
          2.83533037e-01,  1.15474433e-01,  2.44471803e-01,  1.24475501e-01,
          1.31247312e-01, -3.00186992e-01, -7.41292313e-02,  1.67433965e+00,
         -6.44214392e-01,  1.35142908e-01,  4.02424157e-01, -3.53436381e-01,
          2.34429687e-01, -1.01177502e+00, -5.34806252e-01,  2.5

## Chapter 2: Resource Pool and RayWorkerGroup
In the previous example, it was a simple single-process worker.
In this example, we implement a worker with a GPU and form a RayWorkerGroup. Within this RayWorkerGroup, we implement a simple operation of an accumulator.

In [15]:
from verl.single_controller.base import Worker
from verl.single_controller.ray.base import RayClassWithInitArgs, RayResourcePool, RayWorkerGroup, merge_resource_pool

ModuleNotFoundError: No module named 'verl'

In [None]:
resource_pool = RayResourcePool([4], use_gpu=True)

In [None]:
@ray.remote
class GPUAccumulator(Worker):
    def __init__(self) -> None:
        super().__init__()
        # The initial value of each rank is the same as the rank
        self.value = torch.zeros(size=(1,), device="cuda") + self.rank

    def add(self, x):
        self.value += x
        print(f"rank {self.rank}, value: {self.value}")
        return self.value.cpu()

In [None]:
# Each worker's initial value is its rank, and then each rank's value is incremented by 1, so the values obtained on each rank are [1, 2, 3, 4]
class_with_args = RayClassWithInitArgs(cls=GPUAccumulator)
worker_group = RayWorkerGroup(resource_pool, class_with_args)
print(worker_group.execute_all_sync("add", x=[1, 1, 1, 1]))

[tensor([1.]), tensor([2.]), tensor([3.]), tensor([4.])]


The principle of parameter passing: The input parameter is a list of length world_size, where each element in the list is dispatched respectively to each worker in the RayWorkerGroup.
The return parameter is also a list, corresponding to the return value of each worker.

### GPU Resource Sharing

RayWorkerGroups mapped to the same resource pool share the GPU. In this example, we implement three resource pools: the first occupies 4 GPUs, the second also occupies 4 GPUs, and the last occupies all 8 GPUs. Among them, the first resource pool reuses the resource pool mentioned above.

In [None]:
# Create a new resource pool and then merge the newly created resource pool with the previous one.
resource_pool_1 = RayResourcePool([4], use_gpu=True, name_prefix="a")
resource_pool_merge = merge_resource_pool(resource_pool, resource_pool_1)

In [None]:
# Establish a RayWorkerGroup on the newly created resource pool.
worker_group_1 = RayWorkerGroup(resource_pool_1, class_with_args)
worker_group_merge = RayWorkerGroup(resource_pool_merge, class_with_args)

In [None]:
# Run 'add' on the second set of 4 GPUs; the result should be [2, 3, 4, 5].
output_1 = worker_group_1.execute_all_sync("add", x=[2, 2, 2, 2])
print(output_1)

[tensor([2.]), tensor([3.]), tensor([4.]), tensor([5.])]


In [None]:
# Run 'add' on the merged set of 8 GPUs; the result should be [3, 4, 5, 6, 7, 8, 9, 10].
output_merge = worker_group_merge.execute_all_sync("add", x=[3, 3, 3, 3, 3, 3, 3, 3])
print(output_merge)

[tensor([3.]), tensor([4.]), tensor([5.]), tensor([6.]), tensor([7.]), tensor([8.]), tensor([9.]), tensor([10.])]


In [None]:
print(worker_group.world_size, worker_group_1.world_size, worker_group_merge.world_size)

4 4 8


## Chapter 3: Data Dispatch, Execution and Collection

In the above example, we used the `execute_all_sync` function in the RayWorkerGroup to dispatch data from the driver to each worker. This is very inconvenient for coding.
In this chapter, we use the form of function decorators to allow RayWorkerGroup to directly call functions written in the Worker, and to greatly simplify parameter passing.

In [None]:
from verl.single_controller.base.decorator import Dispatch, Execute, register

In [None]:
@ray.remote
class GPUAccumulatorDecorator(Worker):
    def __init__(self) -> None:
        super().__init__()
        # The initial value of each rank is the same as the rank
        self.value = torch.zeros(size=(1,), device="cuda") + self.rank

    # map from a single input to all the worker
    @register(Dispatch.ONE_TO_ALL)
    def add(self, x):
        print(x)
        self.value = self.value + x
        print(f"rank {self.rank}, value: {self.value}")
        return self.value.cpu()

In [None]:
class_with_args = RayClassWithInitArgs(cls=GPUAccumulatorDecorator)
gpu_accumulator_decorator = RayWorkerGroup(resource_pool_merge, class_with_args)

In [None]:
# As we can see, 10 is automatically dispatched to each Worker in this RayWorkerGroup.
print(gpu_accumulator_decorator.add(x=10))

[tensor([10.]), tensor([11.]), tensor([12.]), tensor([13.]), tensor([14.]), tensor([15.]), tensor([16.]), tensor([17.])]


### Custom Dispatch, Collection
Users can customize `dispatch` and `collection` function. You only need to write the `dispatch_fn` and `collect_fn` functions yourself. We also support executing RPC only on rank_zero, with specific examples provided below.

In [None]:
from verl.single_controller.base.decorator import Dispatch, collect_all_to_all, register

In [None]:
def two_to_all_dispatch_fn(worker_group, *args, **kwargs):
    """
    Assume the input is a list of 2. Duplicate the input interleaved and pass to each worker.
    """
    for arg in args:
        assert len(arg) == 2
        for i in range(worker_group.world_size - 2):
            arg.append(arg[i % 2])
    for k, v in kwargs.items():
        assert len(v) == 2
        for i in range(worker_group.world_size - 2):
            v.append(v[i % 2])
    return args, kwargs


@ray.remote
class TestActor(Worker):
    # TODO: pass *args and **kwargs is bug prone and not very convincing
    def __init__(self, x) -> None:
        super().__init__()
        self._x = x

    def foo(self, y):
        return self._x + y

    @register(dispatch_mode=Dispatch.ALL_TO_ALL, execute_mode=Execute.RANK_ZERO)
    def foo_rank_zero(self, x, y):
        return self._x + y + x

    @register(dispatch_mode={"dispatch_fn": two_to_all_dispatch_fn, "collect_fn": collect_all_to_all})
    def foo_custom(self, x, y):
        return self._x + y + x

In [None]:
class_with_args = RayClassWithInitArgs(cls=TestActor, x=2)
worker_group = RayWorkerGroup(resource_pool, class_with_args)

In [None]:
output_ref = worker_group.foo_custom(x=[1, 2], y=[5, 6])
assert output_ref == [8, 10, 8, 10]

output_ref = worker_group.foo_rank_zero(x=1, y=2)
assert output_ref == 5

In [None]:
print(gpu_accumulator_decorator.world_size)

8


In [None]:
# Shutdown ray cluster
ray.shutdown()

## Chapter 4: NVMegatronRayWorkerGroup

Due to the Ray issue, we can only support max_colocate_count=1 in RayResourcePool for now.
This means that each GPU can only have one process.
We can support max_colocate > 1 when applying this pull request: https://github.com/ray-project/ray/pull/44385

Therefore, we need to restart the ray and initialize a new resource_pool to demonstrate the **NVMegatronRayWorkerGroup**

In [None]:
# Build a local ray cluster. The head node and worker node are on this machine
ray.init()

Finally, we implement a `NVMegatronRayWorkerGroup`, within which we create a Megatron and then run a tensor parallel (tp) split Llama mlp layer. Here, we use a complex dispatch mode, `Megatron_COMPUTE`. This dispatch mode assumes that user passes the data partitioned by DP dimension. The data is dispatched to all tp/pp ranks within the same dp group, and ultimately only collects output data from tp=0 and the last pp. In this way, for users that only write code on the driver, the Megatron behind the RPC becomes transparent.

In [None]:
import sys

current_pythonpath = os.environ.get("PYTHONPATH", "")

new_path = "/opt/tiger/Megatron-LM"

new_pythonpath = f"{new_path}:{current_pythonpath}" if current_pythonpath else new_path

os.environ["PYTHONPATH"] = new_pythonpath

print(new_path)
sys.path.append(new_path)

import megatron

print(megatron.__file__)

/opt/tiger/Megatron-LM
/opt/tiger/Megatron-LM/megatron/__init__.py


In [None]:
from megatron.core import parallel_state as mpu
from omegaconf import OmegaConf

from verl.single_controller.base.decorator import Dispatch, Execute, register
from verl.single_controller.base.megatron.worker import MegatronWorker
from verl.single_controller.ray.base import RayClassWithInitArgs, RayResourcePool, RayWorkerGroup
from verl.single_controller.ray.megatron import NVMegatronRayWorkerGroup

In [None]:
resource_pool = RayResourcePool([4], use_gpu=True, max_colocate_count=1)

In [None]:
@ray.remote
class MLPLayerWorker(MegatronWorker):
    def __init__(self):
        super().__init__()
        rank = int(os.environ["LOCAL_RANK"])
        torch.distributed.init_process_group(backend="nccl")
        torch.cuda.set_device(rank)

        mpu.initialize_model_parallel(
            tensor_model_parallel_size=4,
            pipeline_model_parallel_size=1,
            virtual_pipeline_model_parallel_size=None,
            pipeline_model_parallel_split_rank=None,
            use_sharp=False,
            context_parallel_size=1,
            expert_model_parallel_size=1,
            nccl_communicator_config_path=None,
        )
        from megatron.core import tensor_parallel

        tensor_parallel.model_parallel_cuda_manual_seed(10)

    @register(Dispatch.ONE_TO_ALL)
    def init_model(self, config):
        from omegaconf import OmegaConf

        from verl.models.llama.megatron.layers import ParallelLlamaMLP
        from verl.utils.megatron_utils import init_model_parallel_config

        megatron_config = OmegaConf.create(
            {
                "sequence_parallel": False,
                "param_dtype": "fp32",
                "tensor_model_parallel_size": mpu.get_tensor_model_parallel_world_size(),
                "pipeline_model_parallel_rank": mpu.get_pipeline_model_parallel_rank(),
                "pipeline_model_parallel_size": mpu.get_pipeline_model_parallel_world_size(),
                "virtual_pipeline_model_parallel_rank": mpu.get_virtual_pipeline_model_parallel_rank(),
                "virtual_pipeline_model_parallel_size": mpu.get_virtual_pipeline_model_parallel_world_size(),
            }
        )

        megatron_config = init_model_parallel_config(megatron_config)
        self.parallel_layer = ParallelLlamaMLP(config=config, megatron_config=megatron_config)

    @register(Dispatch.ONE_TO_ALL)
    def get_weights(self):
        output = {}
        for key, val in self.parallel_layer.named_parameters():
            output[key] = val
        return output

    @register(Dispatch.MEGATRON_COMPUTE)
    def run_layer(self, x):
        x = x.to("cuda")
        y = self.parallel_layer(x)
        return y

In [None]:
layer_cls = RayClassWithInitArgs(cls=MLPLayerWorker)
layer_worker_group = NVMegatronRayWorkerGroup(
    resource_pool=resource_pool,
    ray_cls_with_init=layer_cls,
)

In [None]:
print(layer_worker_group.world_size, layer_worker_group.tp_size, layer_worker_group.pp_size, layer_worker_group.dp_size)

4 4 1 1


In [None]:
ffn_hidden_size = 11008
batch_size = 16
seq_len = 2048
hidden_size = 4096

config = OmegaConf.create(
    {
        "hidden_size": hidden_size,
        "intermediate_size": ffn_hidden_size,
        "hidden_act": "silu",
        "pretraining_tp": 1,
        "tp": layer_worker_group.tp_size,
    }
)

In [None]:
x = torch.rand(size=(seq_len, batch_size, hidden_size), dtype=torch.float32)

In [None]:
layer_worker_group.init_model(config)

[None, None, None, None]

In [None]:
output = layer_worker_group.run_layer(
    [x]
)  # This must be a list of size 1, ensuring that the input equals the data parallel (dp).
print(output[0].shape)

torch.Size([2048, 16, 4096])


In [None]:
# Shutdown ray cluster
ray.shutdown()