In [1]:
import random
import string
from collections.abc import Generator

import ray
from ray.util import ActorPool

from dataclasses import dataclass


In [2]:
ray.init(address="ray://10.21.79.111:10001")
print("Connected to Ray Cluster:", ray.cluster_resources())

2025-10-14 16:32:54,713	INFO client_builder.py:244 -- Passing the following kwargs to ray.init() on the server: log_to_driver
SIGTERM handler is not set because current thread is not the main thread.


Connected to Ray Cluster: {'accelerator_type:G': 1.0, 'memory': 111754080870.0, 'GPU': 1.0, 'node:10.21.79.111': 1.0, 'object_store_memory': 12240656793.0, 'node:__internal_head__': 1.0, 'CPU': 24.0}


[36m(Predictor pid=1751)[0m __init__ Predictor
[36m(Predictor pid=1763)[0m __init__ Predictor
[36m(Predictor pid=1762)[0m __init__ Predictor
[36m(Predictor pid=1761)[0m __init__ Predictor
[36m(Predictor pid=1759)[0m __init__ Predictor
[36m(Consumer pid=1752)[0m __init__ Consumer
[36m(Consumer pid=1752)[0m consume ['input-3-0', 'input-3-1', 'input-3-2', 'input-3-3', 'input-3-4', 'input-3-5', 'input-3-6', 'input-3-7', 'input-3-8', 'input-3-9']
[36m(Consumer pid=1750)[0m __init__ Consumer
[36m(Consumer pid=1750)[0m consume ['input-6-0', 'input-6-1', 'input-6-2', 'input-6-3', 'input-6-4', 'input-6-5', 'input-6-6', 'input-6-7', 'input-6-8', 'input-6-9']
[36m(Consumer pid=1753)[0m __init__ Consumer
[36m(Consumer pid=1753)[0m consume ['input-5-0', 'input-5-1', 'input-5-2', 'input-5-3', 'input-5-4', 'input-5-5', 'input-5-6', 'input-5-7', 'input-5-8', 'input-5-9']
[36m(Consumer pid=1760)[0m __init__ Consumer
[36m(Consumer pid=1760)[0m consume ['input-1-0', 'input-1-1',

In [3]:
@dataclass
class Prediction:
    key: str
    value: str


@ray.remote
class Predictor:
    def __init__(self):
        print("__init__ Predictor")

    def predict(self, input_keys: list[str]) -> list[Prediction]:
        return [Prediction(key=k, value=random.choice(string.ascii_letters)) for k in input_keys]

@ray.remote
class Consumer:
    def __init__(self, predictors: list[ray.actor.ActorHandle]):
        print("__init__ Consumer")
        self.predictors = predictors

    def consume(self, input_keys: list[str]) -> list[Prediction]:
        print(f"consume {input_keys}")

        predictor = random.choice(self.predictors)
        ref = predictor.predict.remote(input_keys)
        ref = ray.get(ref)
        return ref


In [4]:

def main():
    print("creating actors")
    num_predictors = 5
    predictors = [Predictor.remote() for _ in range(num_predictors)]

    num_consumers = 10
    consumers = [Consumer.remote(predictors) for _ in range(num_consumers)]

    pool = ActorPool(consumers)

    input_keys = [[f"input-{i}-{j}" for j in range(10)] for i in range(10)]
    results: Generator[list[Prediction], None, None] = pool.map_unordered(
        lambda a, v: a.consume.remote(v), input_keys
    )

    print("wait on results")
    for r in results:
        print(r)


if __name__ == "__main__":
    main()


creating actors
wait on results
[Prediction(key='input-1-0', value='E'), Prediction(key='input-1-1', value='q'), Prediction(key='input-1-2', value='q'), Prediction(key='input-1-3', value='s'), Prediction(key='input-1-4', value='F'), Prediction(key='input-1-5', value='W'), Prediction(key='input-1-6', value='A'), Prediction(key='input-1-7', value='q'), Prediction(key='input-1-8', value='I'), Prediction(key='input-1-9', value='m')]
[Prediction(key='input-0-0', value='b'), Prediction(key='input-0-1', value='K'), Prediction(key='input-0-2', value='n'), Prediction(key='input-0-3', value='B'), Prediction(key='input-0-4', value='Q'), Prediction(key='input-0-5', value='F'), Prediction(key='input-0-6', value='v'), Prediction(key='input-0-7', value='i'), Prediction(key='input-0-8', value='m'), Prediction(key='input-0-9', value='p')]
[Prediction(key='input-2-0', value='J'), Prediction(key='input-2-1', value='R'), Prediction(key='input-2-2', value='z'), Prediction(key='input-2-3', value='i'), Predi