In [1]:
import ray
ray.init()

2024-01-30 14:51:53,260	INFO worker.py:1673 -- Started a local Ray instance.


0,1
Python version:,3.11.4
Ray version:,2.8.1


In [2]:
ray.cluster_resources()

{'memory': 30217353626.0,
 'object_store_memory': 2147483648.0,
 'node:127.0.0.1': 1.0,
 'CPU': 12.0,
 'node:__internal_head__': 1.0}

In [4]:
import time

database = [
    "Learning", "Ray",
    "Flexible", "Distributed", "Python", "for", "Machine", "Learning"
]


def retrieve(item):
    time.sleep(item / 10.)
    return item, database[item]

In [5]:
def print_runtime(input_data, start_time):
    print(f'Runtime: {time.time() - start_time:.2f} seconds, data:')
    print(*input_data, sep="\n")

In [6]:
start = time.time()
data = [retrieve(item) for item in range(8)]
print_runtime(data, start)

Runtime: 2.83 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')


In [7]:
@ray.remote  
def retrieve_task(item):
    return retrieve(item)

In [10]:
start = time.time()
object_references = [  
    retrieve_task.remote(item) for item in range(8)
]
data = ray.get(object_references)  
print_runtime(data, start)

Runtime: 0.72 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')


In [16]:
db_object_ref = ray.put(database)  


@ray.remote
def retrieve_task(item, db):  
    time.sleep(item / 10.)
    return item, db[item]

In [21]:
start = time.time()
object_references = [
    retrieve_task.remote(item, db_object_ref) for item in range(8) 
]
all_data = []
print('Start list length:', len(object_references))
while len(object_references) > 0:  
    finished, object_references = ray.wait(  
        object_references, num_returns=2, timeout=7.0
    )
    print('List length:', len(object_references))
    data = ray.get(finished)
    print_runtime(data, start)  
    all_data.extend(data)
    

Start list length: 8
List length: 6
Runtime: 0.11 seconds, data:
(0, 'Learning')
(1, 'Ray')
List length: 4
Runtime: 0.32 seconds, data:
(2, 'Flexible')
(3, 'Distributed')
List length: 2
Runtime: 0.52 seconds, data:
(4, 'Python')
(5, 'for')
List length: 0
Runtime: 0.72 seconds, data:
(6, 'Machine')
(7, 'Learning')


In [22]:
@ray.remote
def follow_up_task(retrieve_result):  
    original_item, _ = retrieve_result
    follow_up_result = retrieve(original_item + 1)  
    return retrieve_result, follow_up_result  


retrieve_refs = [retrieve_task.remote(item, db_object_ref) for item in [0, 2, 4, 6]]
follow_up_refs = [follow_up_task.remote(ref) for ref in retrieve_refs]  

result = [print(data) for data in ray.get(follow_up_refs)]

((0, 'Learning'), (1, 'Ray'))
((2, 'Flexible'), (3, 'Distributed'))
((4, 'Python'), (5, 'for'))
((6, 'Machine'), (7, 'Learning'))


In [27]:
@ray.remote  
class DataTracker:
    def __init__(self):
        self._counts = 0

    def increment(self):
        self._counts += 1

    def counts(self):
        return self._counts

In [28]:
@ray.remote
def retrieve_tracker_task(item, tracker, db):  
    time.sleep(item / 10.)
    tracker.increment.remote()  
    return item, db[item]

In [30]:
tracker = DataTracker.remote()  

object_references = [  
    retrieve_tracker_task.remote(item, tracker, db_object_ref) 
    for item in range(8)
]
data = ray.get(object_references)

print(data) 
print(ray.get(tracker.counts.remote()))

[(0, 'Learning'), (1, 'Ray'), (2, 'Flexible'), (3, 'Distributed'), (4, 'Python'), (5, 'for'), (6, 'Machine'), (7, 'Learning')]
8


In [31]:
@ray.remote
def task_owned():
    return


@ray.remote
def task(dependency):
    res_owned = task_owned.remote()
    return


val = ray.put("value")
res = task.remote(dependency=val)

In [32]:
import subprocess
zen_of_python = subprocess.check_output(["python", "-c", "import this"])
corpus = zen_of_python.split()  

num_partitions = 3
chunk = len(corpus) // num_partitions
partitions = [  
    corpus[i * chunk: (i + 1) * chunk] for i in range(num_partitions)
]

In [40]:
def map_function(document):
    for word in document.lower().split():
        yield word, 1

In [42]:
@ray.remote
def apply_map(corpus, num_partitions=3):
    map_results = [list() for _ in range(num_partitions)]  
    for document in corpus:
        for result in map_function(document):
            first_letter = result[0].decode("utf-8")[0]
            word_index = ord(first_letter) % num_partitions  
            map_results[word_index].append(result)  
    return map_results

In [43]:
map_results = [
    apply_map.options(num_returns=num_partitions)  
    .remote(data, num_partitions)  
    for data in partitions  
]

for i in range(num_partitions):
    mapper_results = ray.get(map_results[i])  
    for j, result in enumerate(mapper_results):
        print(f"Mapper {i}, return value {j}: {result[:2]}")

Mapper 0, return value 0: [(b'of', 1), (b'is', 1)]
Mapper 0, return value 1: [(b'python,', 1), (b'peters', 1)]
Mapper 0, return value 2: [(b'the', 1), (b'zen', 1)]
Mapper 1, return value 0: [(b'unless', 1), (b'in', 1)]
Mapper 1, return value 1: [(b'although', 1), (b'practicality', 1)]
Mapper 1, return value 2: [(b'beats', 1), (b'errors', 1)]
Mapper 2, return value 0: [(b'is', 1), (b'is', 1)]
Mapper 2, return value 1: [(b'although', 1), (b'a', 1)]
Mapper 2, return value 2: [(b'better', 1), (b'than', 1)]


In [44]:
@ray.remote
def apply_reduce(*results):  
    reduce_results = dict()
    for res in results:
        for key, value in res:
            if key not in reduce_results:
                reduce_results[key] = 0
            reduce_results[key] += value  

    return reduce_results

In [45]:
outputs = []
for i in range(num_partitions):
    outputs.append(  
        apply_reduce.remote(*[partition[i] for partition in map_results])
    )

counts = {k: v for output in ray.get(outputs) for k, v in output.items()}  

sorted_counts = sorted(counts.items(), key=lambda item: item[1], reverse=True)  
for count in sorted_counts:
    print(f"{count[0].decode('utf-8')}: {count[1]}")

is: 10
better: 8
than: 8
the: 6
to: 5
of: 3
although: 3
be: 3
unless: 2
one: 2
if: 2
implementation: 2
idea.: 2
special: 2
should: 2
do: 2
may: 2
a: 2
never: 2
way: 2
explain,: 2
ugly.: 1
implicit.: 1
complex.: 1
complex: 1
complicated.: 1
flat: 1
readability: 1
counts.: 1
cases: 1
rules.: 1
in: 1
face: 1
refuse: 1
one--: 1
only: 1
--obvious: 1
it.: 1
obvious: 1
first: 1
often: 1
*right*: 1
it's: 1
it: 1
idea: 1
--: 1
let's: 1
python,: 1
peters: 1
simple: 1
sparse: 1
dense.: 1
aren't: 1
practicality: 1
purity.: 1
pass: 1
silently.: 1
silenced.: 1
ambiguity,: 1
guess.: 1
and: 1
preferably: 1
at: 1
you're: 1
dutch.: 1
good: 1
are: 1
great: 1
more: 1
zen: 1
by: 1
tim: 1
beautiful: 1
explicit: 1
nested.: 1
enough: 1
break: 1
beats: 1
errors: 1
explicitly: 1
temptation: 1
there: 1
that: 1
not: 1
now: 1
never.: 1
now.: 1
hard: 1
bad: 1
easy: 1
namespaces: 1
honking: 1
those!: 1


In [3]:
items = [{"name": str(i), "data": i} for i in range(10000)]
ds = ray.data.from_items(items)   
ds.show(5)

2024-01-30 13:00:57,805	INFO worker.py:1673 -- Started a local Ray instance.
2024-01-30 13:00:58,510	INFO dataset.py:2383 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2024-01-30 13:00:58,511	INFO streaming_executor.py:104 -- Executing DAG InputDataBuffer[Input] -> LimitOperator[limit=5]
2024-01-30 13:00:58,511	INFO streaming_executor.py:105 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-30 13:00:58,512	INFO streaming_executor.py:107 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

{'name': '0', 'data': 0}
{'name': '1', 'data': 1}
{'name': '2', 'data': 2}
{'name': '3', 'data': 3}
{'name': '4', 'data': 4}


In [4]:
squares = ds.map(lambda x: x["data"] ** 2)

In [6]:
squares.take(10)

2024-01-30 13:18:49,334	INFO streaming_executor.py:104 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[Map(<lambda>)] -> LimitOperator[limit=10]
2024-01-30 13:18:49,336	INFO streaming_executor.py:105 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-30 13:18:49,336	INFO streaming_executor.py:107 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

RayTaskError(ValueError): [36mray::Map(<lambda>)()[39m (pid=31601, ip=127.0.0.1)
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 416, in _map_task
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
    for data in iter:
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 196, in __call__
    yield from self._row_fn(input, ctx)
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 235, in transform_fn
    _validate_row_output(out_row)
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 220, in _validate_row_output
    raise ValueError(
ValueError: Error validating 0: Standalone Python objects are not allowed in Ray 2.5. To return Python objects from map(), wrap them in a dict, e.g., return `{'item': item}` instead of just `item`.

2024-01-30 13:18:58,309	ERROR worker.py:406 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::Map(<lambda>)()[39m (pid=31607, ip=127.0.0.1)
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 416, in _map_task
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
    for data in iter:
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 196, in __call__
    yield from self._row_fn(input, ctx)
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 235, in transform_fn
    _validate_row_output(out_row)
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.

2024-01-30 13:18:58,320	ERROR worker.py:406 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::Map(<lambda>)()[39m (pid=31610, ip=127.0.0.1)
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 416, in _map_task
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
    for data in iter:
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 196, in __call__
    yield from self._row_fn(input, ctx)
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 235, in transform_fn
    _validate_row_output(out_row)
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.

In [5]:
evens = squares.filter(lambda x: x % 2 == 0)
evens.count()

2024-01-30 13:17:57,633	INFO streaming_executor.py:104 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[Map(<lambda>)->Filter(<lambda>)]
2024-01-30 13:17:57,634	INFO streaming_executor.py:105 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-30 13:17:57,635	INFO streaming_executor.py:107 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

RayTaskError(ValueError): [36mray::Map(<lambda>)->Filter(<lambda>)()[39m (pid=31612, ip=127.0.0.1)
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 416, in _map_task
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
    for data in iter:
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 196, in __call__
    yield from self._row_fn(input, ctx)
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 257, in transform_fn
    for row in rows:
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 248, in __call__
    for block in blocks:
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
    for data in iter:
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 196, in __call__
    yield from self._row_fn(input, ctx)
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 235, in transform_fn
    _validate_row_output(out_row)
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 220, in _validate_row_output
    raise ValueError(
ValueError: Error validating 0: Standalone Python objects are not allowed in Ray 2.5. To return Python objects from map(), wrap them in a dict, e.g., return `{'item': item}` instead of just `item`.

2024-01-30 13:18:06,238	ERROR worker.py:406 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::Map(<lambda>)->Filter(<lambda>)()[39m (pid=31602, ip=127.0.0.1)
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 416, in _map_task
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
    for data in iter:
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 196, in __call__
    yield from self._row_fn(input, ctx)
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 257, in transform_fn
    for row in rows:
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/pyt

2024-01-30 13:18:06,246	ERROR worker.py:406 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::Map(<lambda>)->Filter(<lambda>)()[39m (pid=31601, ip=127.0.0.1)
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 416, in _map_task
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
    for data in iter:
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 196, in __call__
    yield from self._row_fn(input, ctx)
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 257, in transform_fn
    for row in rows:
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/pyt

2024-01-30 13:18:06,254	ERROR worker.py:406 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::Map(<lambda>)->Filter(<lambda>)()[39m (pid=31608, ip=127.0.0.1)
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 416, in _map_task
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
    for data in iter:
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 196, in __call__
    yield from self._row_fn(input, ctx)
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/python3.11/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 257, in transform_fn
    for row in rows:
  File "/Users/sbrewer/anaconda3/envs/rllib/lib/pyt