# Ray Data under the hood

This notebook will provide a high-level overview of the architecture of the `Ray Data` library.

<div class="alert alert-info">
    <b>Here is the Roadmap:</b>
    <ul>
        <li>Streaming execution</li>
        <li>Dataset and blocks</li>
        <li>Ray memory model</li>
        <li>Operators and planning</li>
        <li>Streaming topology</li>
        <li>Data flow within an operator</li>
        <li>Streaming executor's scheduling loop</li>
        <li>Resource management and allocation</li>
    </ul>
</div>

## Imports

In [None]:
import gc
import ray
import numpy as np
import pandas as pd

## Streaming execution

Ray Data processes large datasets efficiently using a streaming model, which works with **blocks** as the basic units of data.

This approach replaces traditional bulk processing, where the entire dataset and intermediate results had to fit in the cluster's memory.

For example:

Here is a batch inference pipeline with a bulk processing approach.

<img src="https://anyscale-materials.s3.us-west-2.amazonaws.com/cko-2025-q1/batch-processing.png" width="800" alt="Traditional Batch Processing">

Note how:
- Execution is performed in stages
- The entire dataset can be repartitioned across stage boundaries

In contrast, here is the same batch inference pipeline with Ray Data's streaming model.

<img src="https://anyscale-materials.s3.us-west-2.amazonaws.com/cko-2025-q1/pipelining.png" width="800" alt="Streaming Model Pipelining">

Note how:
- Execution across stages of the pipeline is performed in parallel (i.e pipeline parallelism)
- Data is passed incrementally without blocking the pipeline

<div class="alert alert-secondary">
    
Ray Data was first implemented using a bulk processing approach (now referred to as "legacy API").

Some artifacts of the legacy API, like `Dataset.repartition`, still reflect the older bulk processing model, which required the entire dataset to be partitioned across the cluster.
</div>


## Dataset and blocks

A Ray Dataset defines a data loading and processing pipeline.

When either "materialized" or "consumed", a Ray Dataset manifests as a distributed collection of blocks stored in the Ray Object Store.

Let's start by creating a materialized Ray Dataset to inspect its underlying blocks.

### Step 1: List the existing objects before executing the code

Running the below command will list the objects in the Ray object store.

The Ray object store is a shared memory space to store and pass data between tasks.

In [None]:
!ray list objects

As expected, there are no objects created yet.

### Step 2: Prepare some data

Let's build a parquet dataset given a target in-memory size.

In [None]:
size_mb = 64

df = pd.DataFrame(
    {
        "a": np.random.rand(size_mb * 1024**2 // 8).astype(np.float64),
    }
)

memory_usage = (df.memory_usage(deep=True) / 1024**2).sum() # in MiB
print(f"Memory usage: {memory_usage} MiB")

df.to_parquet("/mnt/cluster_storage/data.parquet")

Let's inspect the parquet file.


In [None]:
!ls -lh /mnt/cluster_storage/data.parquet

### Step 3: Create a materialized dataset

Let's create a `Dataset` from the parquet file using `read_parquet`

In [None]:
ds = ray.data.read_parquet("/mnt/cluster_storage/data.parquet")
ds

Let's materialize the `Dataset` using `materialize`

In [None]:
ds_materialized = ds.materialize()
ds_materialized

### Step 4: List the objects again

We can see an object with a size of ~64 MiB has been created. 

In [None]:
!ray list objects

Note that we can verify the object was indeed generated by a Ray Data task by following the `CALL_SITE` of the object.

### Step 5: Inspect the blocks

Instead of browsing through all the objects in the object store, we can directly fetch the blocks of a materialized dataset using Ray Data.

It turns out that we can iterate over the blocks of a dataset using `iter_internal_ref_bundles`.


In [None]:
for ref_bundle in ds_materialized.iter_internal_ref_bundles():
    print(ref_bundle)

A reference bundle `RefBundle` is simply a bundle of:
- a reference to the block
- metadata about the block

In [None]:
block_ref, block_metadata = ref_bundle.blocks[0]
block_ref

A block is the basic unit of data that Ray Data stores in the object store and transfers over the network. 

Each block contains a disjoint subset of rows, and Ray Data loads and transforms these blocks in a distributed manner.


<img src="https://docs.ray.io/en/latest/_images/dataset-arch.svg" width="600">


To fetch the block, we can use `ray.get`.

In [None]:
block = ray.get(block_ref)
block

Ray Data stores blocks as either pandas Dataframes or pyarrow Tables. In this case, when materializing from a `read_parquet`, the block is a pyarrow Table.

<!-- TODO - figure out adding info below: -->
<!-- Note, that regardless of the data type that Ray Data uses to store the block, Ray Data will convert the block to the required batch format when batching the data and transforming it. -->

In [None]:
type(block)

In this case, the block contains the same data as the original dataframe.

In [None]:
block.shape, df.shape

let's clean up references to the objects we created so Ray can garbage collect them.

In [None]:
%xdel block
%xdel block_ref
%xdel ds
%xdel ds_materialized
%xdel ref_bundle
gc.collect()

We can see that the object has been garbage collected.

In [None]:
!ray list objects

### Controlling the number of blocks
<!-- Disclaimer that users ideally should not have to worry about this -->

#### Block size limiting
Ray Data bounds block sizes to avoid excessive communication overhead and prevent out-of-memory errors.

* Small blocks are good for latency and more streamed execution
* Large blocks reduce scheduler and communication overhead. 

The default range attempts to make a good tradeoff for most jobs. 

Here are the default values that Ray Data uses:

In [None]:
ctx = ray.data.DataContext.get_current()
default_min_block_size = ctx.target_min_block_size / 1024**2
default_max_block_size = ctx.target_max_block_size / 1024**2

print(f"Default min block size: {default_min_block_size:.2f} MiB")
print(f"Default max block size: {default_max_block_size:.2f} MiB")

To change the block size range, configure the `target_min_block_size` and `target_max_block_size` attributes of `DataContext`.


In [None]:
ctx.target_min_block_size = 1 * 1024**2
ctx.target_max_block_size = 33 * 1024**2

Let's re-create the dataset and materialize it again.

In [None]:
ds = ray.data.read_parquet("/mnt/cluster_storage/data.parquet")
ds_materialized = ds.materialize()
ds_materialized

Notice how instead of a single block of 64 MiB, we now have 2 blocks of ~32 MiB each.

In [None]:
!ray list objects

In [None]:
%xdel ds
%xdel ds_materialized
gc.collect()
!ray list objects # check objects are garbage collected

## Ray memory model

Ray manages memory in several ways to efficiently handle distributed tasks:

1. **Task Execution Memory**:
   - Used by workers to execute tasks and actors.
   - Allocated from the worker's heap memory.
   - High memory pressure can cause Ray to terminate some tasks to free up resources.

2. **Object Store Memory**:
   - Serves as the medium for passing data between tasks.
   - Objects are stored in a shared memory space, using up to 30% of a node's memory.
   - If more space is needed, objects can be spilled to disk or stored on disk in a slower-access format.

This setup allows Ray to optimize performance and resource usage in distributed applications.

Here is a diagram of the above memory model:

<img src="https://docs.ray.io/en/latest/_images/memory.svg" width="600">

### Example of a numpy array in the object store

We will show two tasks:
- `producer_task`: creates a numpy array of size 4 GiB
- `consumer_task`: reads the output of `producer_task`

Here is the code
```python
@ray.remote
def producer_task(size_mb: int = 4 * 1024) -> np.ndarray:
    array = np.random.rand((1024**2 * size_mb // 8)).astype(np.float64)
    return array


@ray.remote
def consumer_task(array: np.ndarray) -> None:
    assert isinstance(array, np.ndarray)
    assert not array.flags.owndata

arr_ref = producer_task.remote()  # Produce a 4 GiB array.
output_ref = consumer_task.remote(arr_ref)  # Consume the array.
```

What happens under the hood?
- `producer_task` will:
    - allocate memory on the worker heap to create the array
    - allocate memory on the object store to store the array effectively calling `ray.put`
- `consumer_task` will:
    - directly access the array in the object store (zero-copy deserialization)
    - only incur the cost of copying the array if it is running on a different node.

Here is the diagram showing how the object store is used:

<img src="https://anyscale-materials.s3.us-west-2.amazonaws.com/ray-data-deep-dive/producer-consumer-object-store-v2.png" width="600">

Let's run a script that will inspect the memory usage of the tasks.

In [None]:
!python scripts/memory_model/memory_inspection.py

### Ray Data and the Object Store

In Ray Data, the object store is used as follows:

- Upstream operator tasks place pyarrow Tables in the shared memory object store (similar to `producer_task` in the example above)
- Downstream operator tasks "get" them from the object store (similar to `consumer_task` in the example above)

To verify that a pyarrow Table's underlying array is also zero-copy deserialized, see the below script.

In [None]:
!python scripts/memory_model/pyarrow_zero_copy.py

### Ray Data Resource Management and Limits

Ray Data uses a `ResourceManager` to manage resource limits for dataset execution. 

#### Default Behavior
- **Object Store Capacity**: By default, the `ResourceManager` limits usage to 50% of the object store capacity. This is controlled by `ResourceManager.DEFAULT_OBJECT_STORE_MEMORY_LIMIT_FRACTION`.
- **CPU and GPU Resources**: The `ResourceManager` will utilize all available CPU and GPU resources on the cluster by default.

#### Customizing Resource Limits
You can customize resource limits using the `DataContext`:

- **Object Store Memory Limit**:
  ```python
  ctx = ray.data.DataContext.get_current()
  ctx.execution_options.resource_limits.object_store_memory = 10e9  # 10 GiB
  ```

- **CPU and GPU Limits**:
  ```python
  ctx = ray.data.DataContext.get_current()
  ctx.execution_options.resource_limits.cpu = 10
  ctx.execution_options.resource_limits.gpu = 5
  ```

- **Excluding Resources**:
  ```python
  ctx = ray.data.DataContext.get_current()
  ctx.execution_options.exclude_resources = ExecutionResources(cpu=10, gpu=5)  # Exclude 10 CPUs and 5 GPUs
  ```

To verify that limiting compute resources affects dataset execution, run the below script.


In [None]:
!python scripts/deliberate_backpressure/compute_throttled.py

## Operators and planning

We start with a user-defined function (UDF) that will be applied to the dataset.


In [None]:
def increment_column(batch, column_name):
    batch[column_name] = batch[column_name] + 1
    return batch

We apply the UDF to the dataset.

In [None]:
ds = (
    ray.data.read_parquet("/mnt/cluster_storage/data.parquet")
    .map_batches(increment_column, fn_kwargs={"column_name": "a"})
)

ds

### Planning

Below are the steps that Ray Data takes to plan the execution of a dataset.

1. Ray Data optimizes the logical plan performing a series of optimizations to produce an optimized logical plan. 

2. The plan then gets translated into a physical plan using a physical planner

3. The physical plan is then further optimized (e.g. fusing operators) to produce an optimized physical plan.

See the below diagram for the planning process:

<img src="https://anyscale-materials.s3.us-west-2.amazonaws.com/ray-data-deep-dive/get_execution_plan.png" width="600">


#### Logical plan

To access the logical plan, we can use the `_plan` attribute of the dataset.

`_plan` returns an `ExecutionPlan` which wraps a `LogicalPlan`.

The `LogicalPlan` simply represents "what" operations will be applied to the dataset.

In [None]:
logical_plan = ds._plan._logical_plan
logical_plan.dag

To generate the optimized physical plan, we can use the `get_execution_plan` method.

In [None]:
from ray.data._internal.logical.optimizers import get_execution_plan

optimized_physical_plan = get_execution_plan(ds._plan._logical_plan)
optimized_physical_plan.dag

### Operators

The above DAG shows that "logically" we require operations like `ReadFiles` and `MapBatches` to be run.

Physical plans indicate "how" the logical operators will be executed.

The above DAG shows a plan of physical operators that will be executed:
- `InputDataBuffer` is a placeholder for the input data.
- `TaskPoolMapOperator` is a physical operator which will execute logical operations like `ReadFiles` and `MapBatches` using a "pool" of Ray Tasks. 
- The syntax is `{PhysicalOperator}[{LogicalOperator}]`. 

#### Operator Fusion
Under certain conditions, Ray Data will fuse operators together to reduce data movement and improve execution efficiency.

Here is the syntax for fusing operators:
- `{PhysicalOperator}[{LogicalOperator1}->{LogicalOperator2}]`

In [None]:
ds_repeated = ds.map_batches(increment_column, fn_kwargs={"column_name": "a"})
get_execution_plan(ds_repeated._plan._logical_plan).dag

#### Conditions for Operator Fusion

Ray Data currently supports fusing two operators if the following are all true:
- We are fusing either `MapOperator -> MapOperator` or `MapOperator -> AllToAllOperator`.
- They either use the same compute configuration, or the upstream operator uses a task pool while the downstream operator uses an actor pool.
- If both operators involve callable classes, the callable classes are the same class AND constructor args are the same for both.
- They have compatible remote arguments.


<details>
<summary>Here are the implementation details for the OperatorFusionRule</summary>

You can see the conditions for operator fusion in the `OperatorFusionRule.can_fuse` method.

```python
from ray.data._internal.logical.rules.operator_fusion import OperatorFusionRule

%psource OperatorFusionRule._can_fuse
```

The `OperatorFusionRule._get_fused_map_operator` method will create a new `MapOperator` with a "fused" `MapTransformer`.

You can view the implementation of the method below.

```python
%psource OperatorFusionRule._get_fused_map_operator
```

Below is the snippet of the code that creates the new/fused `MapOperator`.

```python
op = MapOperator.create(
    up_op.get_map_transformer().fuse(down_op.get_map_transformer()),
    input_op,
    target_max_block_size=target_max_block_size,
    name=name,
    compute_strategy=compute,
    min_rows_per_bundle=min_rows_per_bundled_input,
    ray_remote_args=ray_remote_args,
    ray_remote_args_fn=ray_remote_args_fn,
)
```

Let's look at the implementation of `MapTransformer.fuse`


```python
from ray.data._internal.execution.operators.map_transformer import MapTransformer

%psource MapTransformer.fuse
```

It creates a single `MapTransformer` with a fused init function and transform functions.
```python
def fused_init_fn():
    self_init_fn()
    other_init_fn()

fused_transform_fns = self._transform_fns + other._transform_fns
transformer = MapTransformer(fused_transform_fns, init_fn=fused_init_fn)
```

Here is how the transformation is then applied.

```python
%psource MapTransformer.apply_transform
```
You can see the transform functions are applied sequentially to the input iterable within a single task.

This means data is not transferred across the object store anymore.

</details>


## Streaming Topology

After constructing the optimized DAG of physical operators, the execution plan is handed to the `StreamingExecutor` to execute.

The first step is to build a streaming topology, here is a sample diagram of the streaming topology:

<img src="https://anyscale-materials.s3.us-west-2.amazonaws.com/ray-data-deep-dive/build_streaming_topology.png" width="1000">

Each physical operator will be wired to the next physical operator downstream in the execution plan. 

An upstream operator's external output queue will *refer to the same queue* as the input of the downstream operator.


## Data flow within an operator

Below is a diagram showing the data flow within an operator.

<img src="https://anyscale-materials.s3.us-west-2.amazonaws.com/ray-data-deep-dive/data_flow_simplified_v4.png" width="800">


Reference bundles are passed between operators, where:
- Each reference bundle packages references to blocks and metadata about the blocks.
- Most operators will process bundles by submitting tasks.

Processing a bundle means the following steps:
1. Consume the block reference from an external queue
2. Add it to an internal queue for processing
3. Submit a task to process the block.
4. Generate blocks from the task 
5. Move the blocks to an internal out-queue
6. Send the blocks downstream via the external queue


### One-to-one operator mapping from block to task

In the case of One-to-One operators:
- Each reference bundle contains a single block 
- Each task will process a single block.

To see how this works, run the below script.


In [None]:
!python scripts/data_flow/one_to_one_operator.py

Below is a screenshot from the Ray Dashboard Job UI showing the Ray Data overview of the two datasets created by the script.
<img src="https://anyscale-materials.s3.us-west-2.amazonaws.com/ray-data-deep-dive/ray-data-overview-one-to-one-op.png" width="900">

Below is a screenshot from the Ray Dashboard Job UI showing the Ray Core overview of the two datasets created by the script.
<img src="https://anyscale-materials.s3.us-west-2.amazonaws.com/ray-data-deep-dive/ray-core-overview-one-to-one-op.png" width="900">

We can see that:
- ds-with-1-block only generated 1 block
- ds-with-1-block took a total of 8s to execute `_slow_add`

Whereas:
- ds-with-8-blocks generated 8 blocks
- ds-with-8-blocks took a total of 1s to execute `_slow_add`

This shows that the more blocks we have, the more tasks we can submit and in this case, the faster the dataset will be processed.

### Ray Data Dashboard

The above dataflow is measured and visualized under the Metrics tab of the Ray Data Dashboard, specifically the following sections
- Ray Data Metrics (Overview)
- Ray Data Metrics (Inputs)
- Ray Data Metrics (Outputs)
- Ray Data Metrics (Tasks)
- Ray Data Metrics (Object Store Memory)
- Ray Data Metrics (Iteration)


Here is the same dataflow diagram above with hooks shown for both input and output metrics.

<img src="https://anyscale-materials.s3.us-west-2.amazonaws.com/pinterest/data_flow_simplified_v4_input_output_metrics.png" width="800">


Here is the same diagram with the object store metrics included.

<img src="https://anyscale-materials.s3.us-west-2.amazonaws.com/pinterest/data_flow_simplified_v4_object_store_metrics.png" width="800">


## Streaming executor's scheduling loop

The `StreamingExecutor` schedules operators to run and moves data between them.

The executor and operators are located on the process where dataset execution starts. 

* For batch inference jobs, this process is usually the driver. 
* For training jobs, the executor runs on a special actor called `SplitCoordinator` which handles `streaming_split()`.

Tasks and actors launched by operators are scheduled across the cluster, and outputs are stored in Ray's distributed object store. The executor manipulates references to objects, and doesn't fetch the underlying data itself to the executor.

### The Scheduling Loop

The `StreamingExecutor` takes an event-loop like approach to scheduling. 

Each step of the loop works like this:

1. Wait until running tasks and actors have new outputs (up to a 0.1s timeout)
2. Move new outputs from the streaming generator buffer into the appropriate operator out-queues.
3. Choose some operators and assign new inputs to them. (Operators process the new inputs either by launching new tasks or manipulating metadata.)
4. Repeat until all operators are completed.
 
Here is a diagram of the executor loop:

<img src="https://anyscale-materials.s3.us-west-2.amazonaws.com/ray-data-deep-dive/scheduling_loop_v3.png" width="1000">

<details>
<summary>Here is where to find the scheduling loop code:</summary>

```python
from ray.data._internal.execution.streaming_executor import StreamingExecutor

%psource StreamingExecutor._scheduling_loop_step
```
</details>

### Operator Selection

Choosing the best operator to assign inputs is one of the most important decisions in Ray Data. 

The executor can schedule an operator if the operator satisfies the following conditions:

- The operator has inputs.
- There are adequate resources available.
- The operator isn't backpressured.

If there are multiple viable operators, the executor chooses the operator with the **smallest out queue in terms of MB produced onto the object store**.

Intuitively, this means that the "slowest producer will be chosen to avoid bottlenecks".

Here is a diagram of the operator selection process:

<img src="https://anyscale-materials.s3.us-west-2.amazonaws.com/ray-data-deep-dive/selecting_an_operator.png" width=1000>



<details>
<summary>Here are the technical details of the operator selection process:</summary>

`scheduling_loop_step()` will invoke:
  
- `select_operator_to_run()` to choose an operator to execute.
  - `select_operator_to_run()` will check if the operator is submission-backpressured by:
    - Checking the operator's resource usage
    - Checking the operator's backpressure policies
  - `select_operator_to_run()` will then choose a non-backpressured operator with ready inputs and the smallest out-queue size.

Here is the code for `select_operator_to_run()`:

```python
from ray.data._internal.execution.streaming_executor_state import select_operator_to_run

%psource select_operator_to_run
```
</details>



## Resource management and allocation

Ray data will:
- dynamically allocate resources across operators
- backpressure operators that have exceeded their resource budgets

### Operator backpressure in Ray Data

Backpressure in Ray Data is essential for managing resource contention and ensuring fair resource utilization across operators. Its primary goal is to maximize execution plan throughput by controlling task flow.

### How Backpressure Works

Backpressure can be applied to each physical operator in an execution plan through two approaches:

1. **Submission-Based Backpressure**: Prevents an operator from submitting new tasks if it exceeds its resource budget.
2. **Output-Based Backpressure**: Prevents an operator from moving outputs to its out-queue if it produces more data than its resource budget allows.


### Where is backpressure applied?

Below is a diagram of the scheduling loop highlighting where backpressure is applied.

<img src="https://anyscale-materials.s3.us-west-2.amazonaws.com/ray-data-deep-dive/scheduling_loop_with_backpressure.png" width="1000">



<details>
<summary>Here is how backpressure is implemented:</summary>

##### Submission-based backpressure
Submission-based backpressure is implemented in the `ResourceManager.can_submit_new_task()` method. It is used in the `select_operator_to_run()` method to determine if an operator can submit new tasks.

##### Output-based backpressure

Output-based backpressure is implemented in the `ResourceManager.max_task_output_bytes_to_read()` method. It is used in the `process_completed_tasks()` method to determine the maximum bytes of task outputs that can be read and moved to an external queue.

<details>
<summary>Here are the technical details where output-based backpressure is applied:</summary>

The `scheduling_loop_step()` will invoke:

- `process_completed_tasks()` to:
  - Wait and gather completed task outputs from all operators
  - Move completed task outputs to the operator's in-queue

`process_completed_tasks` will make use of `max_task_output_bytes_to_read()` to determine the maximum bytes of task outputs that can be read and moved to an external queue.

Here is the code for `process_completed_tasks()`:

```python
from ray.data._internal.execution.streaming_executor_state import process_completed_tasks

%psource process_completed_tasks
```
</details>



### Determining Backpressure

A component of the `StreamingExecutor` is the `ResourceManager`.

The `ResourceManager` is responsible for setting the resource budgets for each operator.

### Inspecting Operator Resource Budgets

It is generally a useful practice to report debug logs from the `ResourceManager` to help you understand resource budgetting and allocation.

You can do so by setting `RAY_DATA_DEBUG_RESOURCE_MANAGER=1` in your environment.

See the below script for an example.

In [None]:
!python scripts/resource_manager/debug.py

### Resource Allocation

#### Tracking Operator Usage
A `ResourceManager` tracks the usage of every operator 

Each Usage estimate contains:
- CPU
- GPU
- Object store memory


<details>
<summary>Here is how the ResourceManager tracks operator usage:</summary>

The `ResourceManager` tracks operator usage by calling `update_usages()`.

`ResourceManager.update_usages()` is called as part of the scheduling loop:
1. At the very beginning of the loop
2. After waiting on running task outputs
3. Before selecting an operator to execute

```python
from ray.data._internal.execution.resource_manager import ResourceManager

%psource ResourceManager.update_usages
```


</details>


### Dynamic Budget Allocation

**Intuition:**
The rate at which source tasks are scheduled, has to be just right.
- If the *source* tasks are launched too slowly, the downstream operators will have no input to process, leading to under-utilization of resources. 
- Conversely, if the source tasks are launched too aggressively, then these tasks will occupy most resources, reducing available resources for downstream operators and slowing down execution.
    - It will eventually cause back-pressuring of the source operator 
    - It might also force intermediate data to be spilled to disk, 


Ray data uses a dynamic budget algorithm to regulate the rate at which source tasks are launched. 
- Intuitively, the budget is an optimistic estimate of the resources available for new data partitions to enter the system.
- The key idea is to equalize the processing rates of each operator, in terms of bytes per second, in order to maximize the overall throughput. 


Ray Data implements dynamic budget allocation using a reservation-based approach where by default, 50% of the global resources are reserved for operator outputs, while the remaining resources are shared among all operators

<details>
<summary>Here are more details on the reservation-based approach:</summary>

1. Reservation-based resource allocation at the operator level via the `ReservationOpResourceAllocator`


### Reservation-based resource allocation at the operator level

Overall, this approach allows for resource sharing across operators, while still allowing for operator-specific resource reservations.

It works in the following way:
1. For each map operator, resources are reserved as `reservation_ratio * global_resources / num_map_ops`, with half reserved specifically for operator outputs, excluding pending task outputs.
2. Non-reserved resources are shared among all operators.
3. In each scheduling iteration, each map operator receives "remaining of their own reserved resources" plus "remaining of shared resources / num_map_ops" resources.

The `reservation_ratio` is set to 50% by default. Users can tune this value to adjust how aggressive or conservative the resource allocation is. A higher value will make the resource allocation more even, but may lead to underutilization. And vice versa.

<details>
<summary>Here is where to find the code for the `ReservationOpResourceAllocator`</summary>

```python
from ray.data._internal.execution.resource_manager import ReservationOpResourceAllocator

%psource ReservationOpResourceAllocator
```
</details>
