In [1]:
import ray
import numpy as np
from ray import DynamicObjectRefGenerator

In [2]:
################ num_returns set by the task caller
@ray.remote
def large_values(num_returns):
    return [
        np.random.randint(
            np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8
        ) for _ in range(num_returns)
    ]

@ray.remote
def large_values_generator(num_returns):
    for i in range(num_returns):
        yield np.random.randint(
            np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8
        )
        print(f"yielded return value {i}")


print(ray.get(large_values.remote(1)))
print()
print(ray.get(large_values.remote(1)))

2024-12-13 18:43:20,510	INFO worker.py:1812 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8268 [39m[22m


[array([[  3],
       [ 13],
       [ 71],
       ...,
       [ 96],
       [126],
       [121]], dtype=int8)]
[array([[ 84],
       [ 87],
       [ 28],
       ...,
       [124],
       [ 45],
       [  5]], dtype=int8)]


In [3]:
################ num_returns set by the task executor
@ray.remote(num_returns="dynamic")  # 返回值可动态调整
def split(array, chunk_size):
    while len(array) > 0:
        yield array[:chunk_size]
        array = array[chunk_size:]


array_ref = ray.put(np.zeros(np.random.randint(1000_000)))

block_size = 1000

dynamic_ref = split.remote(array_ref, block_size)  # Returns an ObjectRef[DynamicObjectRefGenerator].
print(dynamic_ref)
# ObjectRef(c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000)

i = -1
ref_generator = ray.get(dynamic_ref)
print(ref_generator)
# <ray._raylet.DynamicObjectRefGenerator object at 0x7f7e2116b290>

for i, ref in enumerate(ref_generator):
    # Each DynamicObjectRefGenerator iteration returns an ObjectRef.
    assert len(ray.get(ref)) <= block_size

num_blocks_generated = i + 1
array_size = len(ray.get(array_ref))
assert array_size <= num_blocks_generated * block_size
print(f"Split array of size {array_size} into {num_blocks_generated} blocks of "
      f"size {block_size} each.\n\n")

# NOTE: The dynamic_ref points to the generated ObjectRefs. Make sure that this
# ObjectRef goes out of scope so that Ray can garbage-collect the internal ObjectRefs.
del dynamic_ref

ObjectRef(c2668a65bda616c1ffffffffffffffffffffffff0100000001000000)
<ray._raylet.DynamicObjectRefGenerator object at 0x111b0e150>
Split array of size 960575 into 961 blocks of size 1000 each.


In [4]:
# 使用ray.get返回值的类型为：<ray._raylet.DynamicObjectRefGenerator>
# 可以对其进行遍历，如下 get_size 所示
@ray.remote
def get_size(ref_generator: DynamicObjectRefGenerator):
    print(ref_generator)
    num_elements = 0
    for ref in ref_generator:
        array = ray.get(ref)
        assert len(array) <= block_size
        num_elements += len(array)
    return num_elements


# Returns an ObjectRef[DynamicObjectRefGenerator].
dynamic_ref = split.remote(array_ref, block_size) # 建议使用 remote 来异步调用，避免阻塞主进程
assert array_size == ray.get(get_size.remote(dynamic_ref))
# (get_size pid=1504184) <ray._raylet.DynamicObjectRefGenerator object at 0x7f81c4250ad0>

# 通过该方法使用了 ray.get 方法，可能会造成主进程的阻塞
ref_generator = ray.get(dynamic_ref)
assert array_size == ray.get(get_size.remote(ref_generator))