# Quickstart with Ray Compiled Graphs
Let’s get started with a simple Compiled Graphs CG example!

The basic CG workflow is:
Define an asyclic graph of Ray actor tasks, to be executed lazily.
“Compile” the graph into an CG, with its own optimized execution path. After this, no changes to the CG are allowed.
Execute the CG and `ray.get()` the results, like a normal Ray task.

Step (2) is the reason we can get better performance than Ray Core. It also lets us schedule GPU-GPU communication for you, and propagate errors without hanging.

To demonstrate this, we will show how a graph of Ray actor tasks may be executed with classic Ray Core, and then show what happens with the new CG backend.

First, install Ray:


In [22]:
# Installation
!pip install ray



## Hello world example

### Default execution with Ray Core
Let’s start by defining and creating an application with two normal Ray actors. The second actor will echo the response from the first actor. If you’re running this on a machine with GPUs, feel free to add the `num_gpus=1` resource requirement to the actor definition.

In [2]:
import ray

@ray.remote
class EchoActor:
  def echo(self, msg):
    return msg

a = EchoActor.remote()
b = EchoActor.remote()

2025-03-20 16:07:25,251	INFO worker.py:1841 -- Started a local Ray instance.


To execute with Ray Core, you can do something like this:

In [3]:
msg_ref = a.echo.remote("hello")
msg_ref = b.echo.remote(msg_ref)
print(ray.get(msg_ref))

hello


The execution for this might look something like this:

<figure style="text-align: center;">
    <img src="images/image01.png" style="background: white; padding: 10px;">
    <figcaption><em>Figure 1: Default execution with Ray core</em></figcaption>
</figure>

Ray actor tasks are executed with RPCs to the actor. Each time a task is executed, the function argument “hello” is serialized into the task request and the “hello” return value is also serialized into the task response. This results in 4 copies of the “hello” value.

If the value is larger, it may be stored in Ray’s shared-memory object store. This results in fewer copies (3) than the above, because values can be passed directly through the object store instead of being copied through the driver.

However, because of the dynamic API, the classic Ray Core backend does not necessarily know how objects will be used by the time that they are created. For example, when allocating `a`’s return value, the task on `b` may not be submitted yet. Thus, we need to introduce additional protocols to dynamically track and garbage-collect values, in addition to the task execution RPC. The additional arrows to the object store in the below diagram represent these protocols.


<figure style="text-align: center;">
    <img src="images/image02.png" style="background: white; padding: 10px;">
    <figcaption><em>Figure 2: Default execution with Ray core</em></figcaption>
</figure>

Many of these arrows need to happen synchronously to guarantee correctness, which adds run-time overhead.

The dynamic nature of classic Ray Core’s API also makes it difficult to support peer-to-peer methods for data transfer like RDMA or NVIDIA’s NCCL. For example, if `a`  wants to send a tensor to `b` with NCCL, we have to also allocate resources on `b` to complete the transfer. Doing so without creating deadlock is very difficult in a dynamic environment, when `b` may be busy executing something else.


### Execution with the Ray Core DAG API

Compiled graphs restrict the application to static control flow to get around the above limitations with classic Ray Core. The API builds off of the classic Ray Core’s [DAG API](https://docs.ray.io/en/latest/ray-core/ray-dag.html).

To execute the same application with the DAG API in Ray Core, you first define a lazily executed DAG like this.

In [4]:
import ray.dag
import time

# Define a DAG for lazy execution.
with ray.dag.InputNode() as inp:
  # Bind the actor task to an input placeholder.
  intermediate_inp = a.echo.bind(inp)
  dag = b.echo.bind(intermediate_inp)

This produces the following DAG, where the input value is provided by the driver during `dag.execute`:

<figure style="text-align: center;">
    <img src="images/image03.png" style="background: white; padding: 10px;">
    <figcaption><em>Figure 3: Vanilla Ray Core DAG execution (non-accelerated)</em></figcaption>
</figure>
Now we can execute the DAG with the classic Ray Core backend:


In [5]:
# Execute the DAG with different arguments:
print(ray.get(dag.execute("hello")))
print(ray.get(dag.execute("world")))

# Time the execution:
for _ in range(10):
  start = time.perf_counter()
  ray.get(dag.execute("hello"))
  print("took", time.perf_counter() - start)

hello
world
took 0.0023351041600108147
took 0.00244308914989233
took 0.001971593126654625
took 0.001755282748490572
took 0.0017960895784199238
took 0.0017983769066631794
took 0.0020655961707234383
took 0.0019308342598378658
took 0.00178559310734272
took 0.0017898539081215858


Note that this executes in exactly the same way as the above, using the classic Ray Core backend. Thus, it will have the same potential overheads, etc. The only difference is in the API: we first define a DAG with a placeholder for the `“hello”` argument, and then we provide the value at execution time.

### *NEW* Execution with Ray Compiled Graphs
Now let’s try executing this as with the CG backend instead. To do this, we add in a new call to compile the DAG. Now, execution may be much faster because we pre-allocate the execution resources at compile time:


In [6]:
# Compile the DAG.
cdag = dag.experimental_compile()
print(cdag)

CompiledDAG(05224b2e93904b8288834435f8488706)


This call does a couple things under the hood:
* The backend statically allocated input and output buffers in the shared-memory object store for each actor task, instead of dynamically allocating them each time the DAG is executed. These buffers are reused at execution time, and actors always push results directly to the process that needs them. Each buffer is initialized with 100MB of capacity and can be resized if larger values are passed.
* The backend also allocates the actor’s execution loop ahead of time. Instead of waiting for an RPC to execute its next task, each actor waits in a loop for the arguments (passed via the allocated buffers) for the next `echo` task.
Note that the task execution happens on a background thread. Thus, the actor may still execute other tasks as normal, but these will now execute concurrently with the aDAG tasks.

Compilation looks something like this: the driver coordinates with the actors and the object store to allocate the empty object buffers and begin an execution loop on each actor.
<figure style="text-align: center;">
    <img src="images/image04.png" style="background: white; padding: 10px;">
    <figcaption><em>Figure 4: Result of compiling a Ray DAG</em></figcaption>
</figure>
Now, the per-DAG execution time should be much faster:


In [7]:
# Time the execution with aDAG
for _ in range(10):
  start = time.perf_counter()
  ray.get(cdag.execute("hello"))
  print("took", time.perf_counter() - start)

took 0.001427993644028902
took 0.0005993461236357689
took 0.0002647195942699909
took 0.00021086819469928741
took 0.00022509926930069923
took 0.00021831924095749855
took 0.00019855890423059464
took 0.00022367527708411217
took 0.0002134847454726696
took 0.00021244678646326065


This is because we no longer need to dynamically dispatch tasks and allocate objects. Instead, the execution flows directly from one process to the next. Note that now, the only arrows that the driver is involved in are the ones to provide the DAG input and read the DAG output, both of which can be done through shared memory.
<figure style="text-align: center;">
    <img src="images/image05.png" style="background: white; padding: 10px;">
    <figcaption><em>Figure 5: Accelerated execution with Ray Compiled Graphs</em></figcaption>
</figure>
Once you’re done, you can tear down the compiled graph:


In [8]:
# Tear down the DAG
cdag.teardown()

print(ray.get(a.echo.remote("done")))

2025-03-20 16:07:37,014	INFO compiled_dag_node.py:2109 -- Tearing down compiled DAG
2025-03-20 16:07:37,016	INFO compiled_dag_node.py:2115 -- Cancelling compiled worker on actor: Actor(EchoActor, 64110de81eb4c7d069b66b1c01000000)
2025-03-20 16:07:37,016	INFO compiled_dag_node.py:2115 -- Cancelling compiled worker on actor: Actor(EchoActor, f27572b741be117cbc55949b01000000)
2025-03-20 16:07:37,022	INFO compiled_dag_node.py:2137 -- Waiting for worker tasks to exit
2025-03-20 16:07:37,022	INFO compiled_dag_node.py:2143 -- Teardown complete


done


Which returns the cluster to a clean state:
<figure style="text-align: center;">
    <img src="images/image06.png" style="background: white; padding: 10px;">
    <figcaption><em>Figure 6: Ray Cluster after CG pipeline cleanup</em></figcaption>
</figure>

At this point, the CG’s resources are collected, and the background CG execution loop is stopped. You can now:
* Execute classic Ray actor tasks on the actor, without needing to guard against races with the background CG thread
* Define and compile a new CG using the same or other actors


## Executing across multiple actors
With this basic API, you can stitch together tasks across multiple actors in a variety of ways.
For example, a common pattern you might see in GPU applications is SPMD, where all processes execute the same program over different data in lockstep. In [tensor-parallel inference](https://docs.vllm.ai/en/latest/serving/distributed_serving.html), for example, each actor might hold a different shard of a model, and we pass the same input to all actors.

Here’s an example of how you might write such a program. Again, we start by creating normal Ray actors, in this case 3 of them:


In [9]:
import ray

@ray.remote
class EchoActor:
  def echo(self, msg):
    return msg

N = 3
actors = [EchoActor.remote() for _ in range(N)]

Then, we define and compile an aDAG that passes the same input placeholder to all actors. Here, we use the [MultiOutputNode](https://docs.ray.io/en/latest/ray-core/ray-dag.html#ray-dag-with-multiple-multioutputnode) syntax to wrap the outputs. This syntax is necessary when we have more than one output node.

In [10]:
# Define a DAG for lazy execution.
with ray.dag.InputNode() as inp:
  # Bind each actor task to the same input placeholder.
  outputs = [actor.echo.bind(inp) for actor in actors]
  dag = ray.dag.MultiOutputNode(outputs)

This produces a DAG like this:
<figure style="text-align: center;">
    <img src="images/image07.png" style="background: white; padding: 10px;">
    <figcaption><em>Figure 7: Ray CG across multiple parallel Actors</em></figcaption>
</figure>
Which we can compile and execute like this:


In [11]:
cdag = dag.experimental_compile()
# Execute the DAG with different arguments:
print(ray.get(cdag.execute("hello")))
# Expected: ["hello", "hello", "hello"]

['hello', 'hello', 'hello']


In this case, all actors that are on the same Ray node will share the same physical input buffer, which is synchronized by the Ray aDAG backend. This helps reduce the per-task overhead from serializing the task arguments, allocating memory for the arguments, and invoking the task.

### Pipelining execution across different actors

What if we want to pipeline the execution across different actor tasks? One example of this is (pipeline-parallel inference)[https://docs.vllm.ai/en/latest/serving/distributed_serving.html], where we pass intermediate outputs from one actor to the next through shared memory, and the data transfers should be pipelined with the compute tasks. We can pipeline execution across different actors by executing the same DAG multiple times before retrieving the output:

In [12]:
with ray.dag.InputNode() as inp:
  for actor in actors:
    # Pass each actor task output as input to the next actor task.
    inp = actor.echo.bind(inp)
  dag = inp

This produces a DAG like this:
<figure style="text-align: center;">
    <img src="images/image08.png" style="background: white; padding: 10px;">
    <figcaption><em>Figure 8: Ray CG over multiple pipelined Actors</em></figcaption>
</figure>
Which we can compile and execute like this:


In [13]:
cdag = dag.experimental_compile()
# Call dag.execute() several times. The executions will be pipelined across the different actors.
refs = [cdag.execute(f"hello{i}") for i in range(N)]
# Get the results, flushing the pipeline.
print(ray.get(refs))
# Expected: ["hello0", "hello1", "hello2"]

2025-03-20 16:07:49,939	INFO compiled_dag_node.py:2109 -- Tearing down compiled DAG
2025-03-20 16:07:49,941	INFO compiled_dag_node.py:2115 -- Cancelling compiled worker on actor: Actor(EchoActor, 728be378e1730abb2d37a78301000000)
2025-03-20 16:07:49,941	INFO compiled_dag_node.py:2115 -- Cancelling compiled worker on actor: Actor(EchoActor, 9856d387c8b630a9def5664701000000)
2025-03-20 16:07:49,942	INFO compiled_dag_node.py:2115 -- Cancelling compiled worker on actor: Actor(EchoActor, 98b52fc024098f0585d9d49201000000)
2025-03-20 16:07:49,947	INFO compiled_dag_node.py:2137 -- Waiting for worker tasks to exit
2025-03-20 16:07:49,948	INFO compiled_dag_node.py:2143 -- Teardown complete


['hello0', 'hello1', 'hello2']


Some things to be aware of:
* On the same actor, CG executions are ordered. I.e., if an actor has multiple tasks in the same CG, it will execute all of them to completion before executing on the next DAG input.
* Across actors in the same CG, the execution may be pipelined. I.e., an actor may begin executing on the next DAG input while a downstream actor executes on the current one.

For more examples of what kinds of CGs you can run, check out the general [Ray DAG API docs](https://docs.ray.io/en/latest/ray-core/ray-dag.html). Ray aDAG supports the same API except that:
* You can only invoke actors that have already been created (e.g., `EchoActor.remote()`). I.e., you cannot use the `EchoActor.bind()` syntax with CGs.
* Only actor tasks are supported. Non-actor tasks are not supported.


## Support for `asyncio`
If your CG driver is running in an asyncio event loop, use the async APIs to ensure that executing the CG and getting the results does not block the event loop. This requires a few changes for now. First, pass `enable_asyncio=True` to `dag.experimental_compile()`:


In [14]:
import ray

@ray.remote
class EchoActor:
  def echo(self, msg):
    return msg

actor = EchoActor.remote()
with ray.dag.InputNode() as inp:
  dag = actor.echo.bind(inp)

cdag = dag.experimental_compile(enable_asyncio=True)

Next, use `execute_async` to invoke the CG. Calling `await` on `execute_async` will return once the input has been submitted, and it returns a future that can be used to get the result. Then we get the result by calling `await` on the returned future.


In [15]:
ref = await cdag.execute_async("hello")
print(await ref)
# Expected: hello

hello


## Passing torch.Tensors between devices

For this section, you will also need to install Ray from [nightly wheels](https://docs.ray.io/en/latest/ray-overview/installation.html#daily-releases-nightlies) (or Ray 2.44 when it is released). In addition, you need the following:
* [PyTorch with CUDA support](https://pytorch.org/get-started/locally/), 
* [CuPy](https://docs.cupy.dev/en/stable/install.html#installing-cupy-from-pypi)
* [NVIDIA’s NCCL library](https://docs.nvidia.com/deeplearning/nccl/install-guide/index.html#down)
* Other python packages, which you can install as follows:

In [18]:
!pip install numpy pyarrow pandas



### CPU-GPU transfers
With classic Ray Core, passing torch.Tensors between actors can easily become expensive, especially when transferring between devices. This is because Ray Core does not know the final destination device. Therefore, you may see unnecessary copies across devices other than the source and destination devices.

Ray CG ship with native support for passing torch.Tensors between actors executing on different devices. Developers can now annotate the CG declaration to indicate the final destination device of a torch.Tensor. 

In [1]:
import ray
import ray.dag

import torch

@ray.remote(num_gpus=1)
class GPUActor:
    def echo_device(self, tensor: torch.Tensor) -> str:
        return str(tensor.device)

actor = GPUActor.remote()


Usage stats collection is enabled by default for nightly wheels. To disable this, run the following command: `ray disable-usage-stats` before starting Ray. See https://docs.ray.io/en/master/cluster/usage-stats.html for more details.


2025-03-20 16:11:42,293	INFO worker.py:1852 -- Started a local Ray instance.


By default, the tensor will stay on the same origin's device:

In [2]:
with ray.dag.InputNode() as inp:
  inp = inp.with_tensor_transport()
  dag = actor.echo_device.bind(inp)

cdag = dag.experimental_compile()
print(ray.get(cdag.execute(torch.zeros(10))))
# Expected: cpu
cdag.teardown()

2025-03-20 16:11:45,026	INFO compiled_dag_node.py:2173 -- Tearing down compiled DAG
2025-03-20 16:11:45,027	INFO compiled_dag_node.py:2178 -- Cancelling compiled worker on actor: Actor(GPUActor, cf0ce122f59366799197a5f201000000)
2025-03-20 16:11:45,031	INFO compiled_dag_node.py:2200 -- Waiting for worker tasks to exit
2025-03-20 16:11:45,031	INFO compiled_dag_node.py:2203 -- Teardown complete


cpu


You can also specify the destination device to be on GPU:


In [3]:

with ray.dag.InputNode() as inp:
  inp = inp.with_tensor_transport(device="cuda")
  dag = actor.echo_device.bind(inp)

cdag = dag.experimental_compile()
print(ray.get(cdag.execute(torch.zeros(10))))
# Expected: cuda:0
cdag.teardown()

2025-03-20 16:11:47,464	INFO compiled_dag_node.py:2173 -- Tearing down compiled DAG
2025-03-20 16:11:47,465	INFO compiled_dag_node.py:2178 -- Cancelling compiled worker on actor: Actor(GPUActor, cf0ce122f59366799197a5f201000000)
2025-03-20 16:11:47,469	INFO compiled_dag_node.py:2200 -- Waiting for worker tasks to exit
2025-03-20 16:11:47,469	INFO compiled_dag_node.py:2203 -- Teardown complete


cuda:0


Under the hood, the Ray aDAG backend will copy the torch.Tensor to the GPU assigned to the GPUActor by Ray Core, like this:
<figure style="text-align: center;">
    <img src="images/image09.png" style="background: white; padding: 10px;">
    <figcaption><em>Figure 9: CPU to GPU transfers</em></figcaption>
</figure>

Of course, you can also do this yourself, but there are advantages to using CGs instead:
* Ray CG can minimize the number of data copies made. For example, passing from one CPU to multiple GPU requires one copy to a shared memory buffer, and then one host-to-device copy per destination GPU.
* In the future, this can be further optimized through techniques such as memory pinning, using zero-copy deserialization when the CPU is the destination, etc.

### GPU-GPU transfers via NCCL
With classic Ray Core, GPU-GPU transfers can be done through the object store, but this typically requires many unnecessary copies through host memory. NVIDIA’s NCCL provides optimized GPU-GPU communication, avoiding these copies. The Ray CG developer preview comes with native support for p2p GPU transfers using NCCL. This is also specified using the `with_tensor_transport` hint.

For example, to pass a tensor from one GPU actor to another, let’s first create sender and receiver actors.

In [4]:
import ray
import ray.dag

import torch

@ray.remote(num_gpus=1)
class GPUSender:
  def send(self, shape):
    return torch.zeros(shape, device="cuda")

@ray.remote(num_gpus=1)
class GPUReceiver:
  def recv(self, tensor: torch.Tensor):
    return tensor.shape

sender = GPUSender.remote()
receiver = GPUReceiver.remote()

Ray CG allows us to specify that the transfer should be done via NCCL. This requires a synchronous operation between the two actors, but can improve overall performance significantly by avoiding unnecessary copies.


In [5]:
# Using NCCL for direct GPU-GPU transfer
with ray.dag.InputNode() as inp:
  dag = sender.send.bind(inp)
  # Ray CG will automatically detect that the tensor should be transferred via NCCL
  # but you can also explicitly use the `with_tensor_transport(transport="nccl")` hint
  dag = dag.with_tensor_transport()
  dag = receiver.recv.bind(dag)

# Creates a NCCL group across the participating actors.
cdag = dag.experimental_compile()
# Execute the DAG. Ray CG will orchestrate any NCCL ops.
assert ray.get(cdag.execute((10, ))) == (10, )
# Teardown the DAG. This also destroys the NCCL group.
cdag.teardown()

2025-03-20 16:11:53,495	INFO torch_tensor_nccl_channel.py:772 -- Creating NCCL group f4ffc6ac-bb86-43ed-b1e7-accb4877cb54 on actors: [Actor(GPUSender, 95fafa969f61278a3e9c370c01000000), Actor(GPUReceiver, 1d5a57fc52e27e4052543a6201000000)]
2025-03-20 16:11:54,417	INFO torch_tensor_nccl_channel.py:797 -- NCCL group initialized.
2025-03-20 16:11:54,509	INFO compiled_dag_node.py:2173 -- Tearing down compiled DAG
2025-03-20 16:11:54,511	INFO compiled_dag_node.py:2178 -- Cancelling compiled worker on actor: Actor(GPUSender, 95fafa969f61278a3e9c370c01000000)
2025-03-20 16:11:54,511	INFO compiled_dag_node.py:2178 -- Cancelling compiled worker on actor: Actor(GPUReceiver, 1d5a57fc52e27e4052543a6201000000)
2025-03-20 16:11:55,037	INFO compiled_dag_node.py:2200 -- Waiting for worker tasks to exit
2025-03-20 16:11:55,038	INFO compiled_dag_node.py:2203 -- Teardown complete


[36m(GPUSender pid=3868105)[0m Destructing NCCL group on actor: Actor(GPUSender, 95fafa969f61278a3e9c370c01000000)
[36m(GPUReceiver pid=3868106)[0m Destructing NCCL group on actor: Actor(GPUReceiver, 1d5a57fc52e27e4052543a6201000000)
