## Streaming Operator Execution with Ray: End-to-End Guide

This notebook explain how to build a simple streaming execution engine using Ray tasks.

<div class="alert alert-block alert-info">

<b> Here is the roadmap for this notebook </b>

<ol>
  <li>Architecture Overview</li>
  <li>Key Components</li>
  <li>Part 1: Data structures and streaming task</li>
  <li>Part 2: Operators built on queues</li>
  <li>Part 3: Backpressure via a Resource Policy</li>
  <li>Part 4: A streaming executor scheduling loop</li>
</ol>
</div>

**Imports**

In [None]:
import time
from collections import deque
from dataclasses import dataclass
from typing import List

import ray

## Architecture Overview

### Key Principles

Here are the key principles of the streamig execution design:

- **Stream with generators**: Start producing outputs immediately without waiting for all work to finish
- Build a streaming topology using **operators with queues**
- **Pass data between queues as references**: Move `ObjectRef`s between operators without materializing data
- **Schedule operators to optimize throughput**: Select which operators to run based on resource and readiness

### High-Level Architecture

At a high-level, a pipeline is a sequence of operators.

Each operator runs Ray tasks that stream output blocks as they are produced. Outputs flow downstream as `ObjectRef`s are moved between operators.

<img src="https://anyscale-materials.s3.us-west-2.amazonaws.com/ray-core/streaming-execution.png" alt="streaming-execution" width="700"/>

Here is a sample executor loop that schedules operators:

<img src="https://anyscale-materials.s3.us-west-2.amazonaws.com/ray-core/streaming-scheduling-loop.png" alt="streaming-execution" width="700"/>



## Key Components

Here are the key components of the streaming operator execution pipeline:

- **Block**: data plus metadata
- **Resources**: simple CPU/memory accounting per task
- **OperatorState**: queues and task tracking
- **Operator**: stage with transform, dispatch, and resource usage
- **ResourcePolicy**: available vs. needed resources to apply backpressure
- A **custom scheduling loop** coordinating the pipeline

## Part 1: Data Structures and Streaming Task

The `Block` and `Resources` types keep things explicit and simple.



In [None]:
@dataclass
class Block:
    data: List[int]
    block_id: int

    def size_bytes(self) -> int:
        return len(self.data) * 8

    def num_rows(self) -> int:
        return len(self.data)

@dataclass
class Resources:
    cpu: float = 0.0
    memory: int = 0

    def __add__(self, other):
        return Resources(cpu=self.cpu + other.cpu, memory=self.memory + other.memory)

    def __le__(self, other):
        return self.cpu <= other.cpu and self.memory <= other.memory

The core idea: use a Ray task that returns a streaming generator. Each `yield` emits an `ObjectRef`.



In [None]:
@ray.remote(num_cpus=1, num_returns="streaming")
def map_task_generator(*blocks: Block, transform_fn, task_id: int):
    for block in blocks:
        transformed_data = transform_fn(block.data)
        output_block = Block(data=transformed_data, block_id=block.block_id)
        yield output_block  # streaming output

**Note on block sizing**: You can adjust `map_task_generator` to emit uniformly sized blocks (by rows or approximate bytes). This keeps object store load predictable, bounds data transfer time per block, and limits scheduling overheads, improving end-to-end stability.

## Part 2: Operators Built on Queues

Each operator tracks input/output queues and active streaming tasks. The state is simple and explicit.



In [None]:
class OperatorState:
    def __init__(self, operator: "Operator"):
        self.operator = operator
        self.input_queue = deque()
        self.output_queue = deque()
        self.active_tasks = {}
        self.next_task_id = 0
        self.completed_tasks = 0
        self.inputs_done = False

    def has_pending_input(self) -> bool:
        return len(self.input_queue) > 0

    def has_output(self) -> bool:
        return len(self.output_queue) > 0

    def num_active_tasks(self) -> int:
        return len(self.active_tasks)

    def is_completed(self) -> bool:
        return (
            self.inputs_done
            and len(self.input_queue) == 0
            and len(self.active_tasks) == 0
        )

The operator encapsulates a transform, resource need per task, and optional concurrency cap. It dispatches tasks that produce streaming outputs.



In [None]:
class Operator:
    def __init__(self, name, transform_fn, resource_per_task: Resources):
        self.name = name
        self.transform_fn = transform_fn
        self.resource_per_task = resource_per_task
        self.state = OperatorState(self)

    def add_input(self, block_refs: List[ray.ObjectRef]):
        self.state.input_queue.append(block_refs)

    def should_dispatch(self) -> bool:
        return self.state.has_pending_input()

    def dispatch_task(self):
        block_refs = self.state.input_queue.popleft()
        task_id = self.state.next_task_id
        self.state.next_task_id += 1
        gen = map_task_generator.remote(*block_refs, transform_fn=self.transform_fn, task_id=task_id)
        self.state.active_tasks[task_id] = gen

    def current_resource_usage(self) -> Resources:
        num_tasks = self.state.num_active_tasks()
        return Resources(
            cpu=self.resource_per_task.cpu * num_tasks,
            memory=self.resource_per_task.memory * num_tasks,
        )

**Queues expose references**: 
- `add_input` enqueues `ObjectRef`s
- `dispatch_task` consumes input refs and starts a streaming Ray task
- outputs re-enter the graph as `ObjectRef`s in `output_queue`.


## Part 3: Backpressure via a Resource Policy

The resource policy computes available vs. needed resources across all operators, enabling the scheduler to apply backpressure.



In [None]:
class StaticResourcePolicy:
    def __init__(self, total_resources: Resources):
        self.total_resources = total_resources

    def get_available_resources(self, operators: List[Operator]) -> Resources:
        used = Resources()
        for op in operators:
            used = used + op.current_resource_usage()
        return Resources(
            cpu=self.total_resources.cpu - used.cpu,
            memory=self.total_resources.memory - used.memory,
        )

    def can_dispatch(self, operator: Operator, operators: List[Operator]) -> bool:
        available = self.get_available_resources(operators)
        needed = operator.resource_per_task
        return needed <= available

ResourcePolicy = StaticResourcePolicy

**Effect**: When resources are tight, `can_dispatch` returns false for some operators, throttling concurrency and preventing overload.

## Part 4: The Streaming Executor

The executor coordinates three phases per scheduling step: process ready outputs, transfer downstream, then dispatch new work.

### Control flow: `run()` and the scheduling loop

Before diving into each phase, here is the high-level control flow that drives execution and returns final `ObjectRef`s.



In [None]:
def run_scheduling_loop(operators: list[Operator], resource_policy: ResourcePolicy) -> List[ray.ObjectRef]:
    while True:
        if all(op.state.is_completed() for op in operators):
            break

        # Phase 1: Process completed tasks
        # 1) Process ready outputs from active streaming tasks
        process_completed_tasks(operators)
        
        # 2) Transfer produced ObjectRefs to output queues
        transfer_outputs(operators)
        
        # 3) Update downstream input completion
        update_operator_states(operators)

        # 4) Dispatch as many new tasks as possible under constraints
        while True:
            op = select_operator_to_run(operators, resource_policy)
            if op is None:
                break
            op.dispatch_task()
            print(f"Dispatch: {op.name} +1")

        status = " | ".join(
            f"{op.name}[in={len(op.state.input_queue)} act={op.state.num_active_tasks()} out={len(op.state.output_queue)} done={op.state.completed_tasks}]"
            for op in operators
        )
        print(f"Status: {status}")

    # Return final ObjectRefs
    final_op = operators[-1]
    result_refs = list(final_op.state.output_queue)
    final_op.state.output_queue.clear()
    return result_refs

Next, we detail each helper invoked by the scheduling loop.

### Processing outputs from streaming tasks

This step polls active streaming tasks to fetch the next available output. It turns "work in progress" into `ObjectRef`s in the operator's `output_queue`.

- Purpose: move ready results from task generators into the graph as references
- Inputs: each operator's `active_tasks` (streaming generators)
- Outputs: enqueued `ObjectRef`s in `output_queue`; completed tasks removed
- Considerations: use a `ray.wait` with timeout; never `ray.get` here; handle `StopIteration`



In [None]:
def process_completed_tasks(operators):
    for op in operators:
        if not op.state.active_tasks:
            continue
        task_refs = list(op.state.active_tasks.values())
        # Wait up to 100ms for generator tasks to yield outputs
        ready, _ = ray.wait(task_refs, num_returns=len(task_refs), fetch_local=False, timeout=0.1)
        for ref in ready:
            task_id = next(tid for tid, task_ref in op.state.active_tasks.items() if task_ref == ref)
            try:
                block_ref = next(ref)  # next streaming result as ObjectRef
                op.state.output_queue.append(block_ref)
                print(f"[{op.name}] #{task_id} yield")
            except StopIteration:
                # Task is completed given no longer yielding block references
                del op.state.active_tasks[task_id]
                op.state.completed_tasks += 1
                print(f"[{op.name}] #{task_id} done")

**Key points**:
- `next(ref)` does not materialize data; it retrieves the next `ObjectRef` produced by the remote generator.

### Transferring outputs downstream as references

Once outputs exist, we push them to the next operator's `input_queue`. This keeps data as `ObjectRef`s (no materialization) and enables immediate downstream work.

- Purpose: propagate produced refs to the next stage
- Inputs: upstream `output_queue`
- Outputs: downstream `input_queue`
- Considerations: transfer all available outputs; backpressure is enforced by dispatch/resource checks in later steps



In [None]:
def transfer_outputs(operators):
    for upstream_op, downstream_op in zip(operators, operators[1:]):
        while upstream_op.state.has_output():
            block_ref = upstream_op.state.output_queue.popleft()
            downstream_op.add_input([block_ref])  # pass ObjectRef (object store), not the block

### Updating downstream input completion

When an upstream operator is finished and has no more outputs, we signal the downstream operator that no more inputs will arrive. This allows downstream stages to cleanly finalize.

- Purpose: propagate completion to unlock downstream termination
- Inputs: upstream `is_completed()` and `output_queue` emptiness
- Outputs: `mark_inputs_done()` on downstream operator
- Considerations: only mark once; do not prematurely mark while outputs remain



In [None]:
def update_operator_states(operators):
    for upstream_op, downstream_op in zip(operators, operators[1:]):
        if upstream_op.state.is_completed() and not upstream_op.state.has_output():
            downstream_op.state.inputs_done = True

### Selecting which operator to run

Ideally, you want to pick the bottleneck operator. In Ray Data, this is done by estimating each operator's throughput as the number of bytes produced to the object store per unit time and prioritizing the bottleneck. In our simplified example we don't track object store throughput, so we use a lightweight heuristic instead.

- Purpose: choose the next eligible operator to dispatch a task for
- Inputs: `should_dispatch()` status and resource policy `can_dispatch()`
- Output: one operator (or `None` if none are eligible)
- Considerations: the ranking policy is pluggable; different heuristics to optimize throughput.



In [None]:
def select_operator_to_run(operators: list[Operator], resource_policy: ResourcePolicy) -> Operator | None:
    eligible_ops = []
    for op in operators:
        if not op.should_dispatch():
            continue
        if not resource_policy.can_dispatch(op, operators):
            continue
        eligible_ops.append(op)
    if not eligible_ops:
        return None
    # simple ranking heuristic: prefer lower memory usage
    ranked = sorted(eligible_ops, key=lambda op: op.current_resource_usage().memory)
    return ranked[0]

## Part 5: End-to-End Demo Pipeline

The demo builds a two-stage pipeline where the first multiplies by 2 and the second adds 10. It then runs the executor until completion and only materializes final results for printing.


Here is quick context before we define the transform functions:
- **Transform functions**: Stateless functions applied to each `Block`'s `data` list.
- **Artificial delay**: `time.sleep(0.1)` simulates work so you can observe streaming and scheduling interleave.
- **Block identity**: We preserve `block_id` across stages to track provenance of outputs.


In [None]:
def multiply_by_2(data: List[int]) -> List[int]:
    time.sleep(0.1)
    return [x * 2 for x in data]

def add_10(data: List[int]) -> List[int]:
    time.sleep(0.1)
    return [x + 10 for x in data]

let's setup the operators:
- **Operator configuration**: Each `Operator` declares a `transform_fn`, `resource_per_task`, and an optional `max_concurrency` that caps concurrent streaming tasks for that operator.
- **Throughput vs. resources**: With `num_cpus=1` per task and `max_concurrency=2`, each operator can run up to two tasks if the global budget allows.


In [None]:
op1 = Operator(
    name="MultiplyOperator",
    transform_fn=multiply_by_2,
    resource_per_task=Resources(cpu=1.0, memory=1000),
)

op2 = Operator(
    name="AddOperator",
    transform_fn=add_10,
    resource_per_task=Resources(cpu=1.0, memory=1000),
)

resource_policy = StaticResourcePolicy(total_resources=Resources(cpu=3.0, memory=3000))

Let's queue up some input data:
- **Inputs as references**: We `ray.put` each `Block` to pass `ObjectRef`s through queues without materializing data in the driver.
- **Completion signal**: `mark_inputs_done()` tells the upstream operator no more inputs will arrive; downstream completion is inferred by the executor.

In [None]:
# Place 4 blocks in the input queue
input_blocks = [
    Block(data=[1, 2, 3], block_id=0),
    Block(data=[4, 5, 6], block_id=1),
    Block(data=[7, 8, 9], block_id=2),
    Block(data=[10, 11, 12], block_id=3),
]

for block in input_blocks:
    block_ref = ray.put(block)
    op1.add_input([block_ref])

op1.state.inputs_done = True

A few quick points about before running the streaming executor:
- **Global resource budget**: `Resources(cpu=3.0, memory=3000)` means at most **three tasks** across the whole pipeline can run concurrently.
- **Scheduling behavior**: Each iteration polls streaming outputs, transfers refs downstream, updates completion, then dispatches more work under the policy.
- **Materialize only at the end**: We call `ray.get` on final `result_refs`; intermediate blocks remain as `ObjectRef`s in the object store.
- **Streaming effect**: `op2` starts as soon as `op1` yields its first output block, overlapping stages for better latency/throughput.

In [None]:
result_refs = run_scheduling_loop([op1, op2], resource_policy)
blocks = ray.get(result_refs)

for block in blocks:
    print(f"Block {block.block_id}: {block.data}")
    assert block.data == [2 * x + 10 for x in input_blocks[block.block_id].data]