# A Guided Tour of Ray Core: Parallel Iterators

[*Parallel Iterators*](https://docs.ray.io/en/latest/iter.html) provide a simple yet powerful API for data ingest and stream processing, where transformations are based on method chaining.

Parallel iterators get partitioned into *data shards*, and Ray creates a worker (an *actor*) to produce the data for each shard.
Evaluation is *lazy*, i.e., only executed when the application calls `next()` to fetch the next item in a sequence.

Parallel iterators are fully serializable, so they can be passed to remote tasks and actors.
In effect, these can be used to operate over infinite sequences of items, with the processing distributed across a cluster.

---

First, let's start Ray…

In [None]:
import logging
import ray

ray.init(
    ignore_reinit_error=True,
    logging_level=logging.ERROR,
)

## Parallel Iterators

We'll create a parallel iterator from the sequence `items`, using 2 worker actors:

In [None]:
%%time

items = [1, 2, 3, 4, 5]

iter1 = ray.util.iter.from_items(items, num_shards=2)
iter1

This `iter1` object can now be passed (i.e., serialized) to remote tasks and remote methods.

To read elements from a parallel iterator, it can be converted to a [`LocalIterator`](https://docs.ray.io/en/latest/iter.html#ray.util.iter.LocalIterator) using two approaches: *synchronous* and *asynchronous*.

Calling [`gather_sync()`](https://docs.ray.io/en/latest/iter.html#ray.util.iter.ParallelIterator.gather_sync)
returns a local iterable for *synchronous* iteration.
In other words, next items will be fetched from the shards on-demand as the application steps through the iterator sequence.

In [None]:
%%time

local_iter1 = iter1.gather_sync()
local_iter1

In [None]:
for item in local_iter1:
    print(item)

When applying a function to the sequence (i.e., some kind of transformation) a parallel iterator provides semantic guarantees for *fetch ordering*. In other words, the transformation is guaranteed to get applied to each element of the sequence before the next item is fetched from the source actor.
For example, this can be useful if you need to update the source actor between iterator steps.

To illustrate a simple case of how to apply a function, first we'll define a class to perform some calculation:

In [None]:
class CumulativeSum:
    def __init__ (self):
        self.total = 0

    def __call__ (self, x):
        self.total += x
        return (self.total, x)

Now apply that class to the sequence of items, using the [`for_each()`](https://docs.ray.io/en/latest/iter.html#ray.util.iter.ParallelIterator.for_each) method:

In [None]:
%%time

for x in iter1.for_each(CumulativeSum()).gather_sync():
    print(x)

Note the values produced: the `CumulativeSum()` function is operating on two different shards.

Alternatively, calling [`gather_async()`](https://docs.ray.io/en/latest/iter.html#ray.util.iter.ParallelIterator.gather_async)
returns a local iterable for *asynchronous* iteration.
In other words, next items will be fetched from the shards asynchronously as soon as the previous item gets computed.
In this case, the fetch ordering only applies per shard.

Another way to access a parallel iterator is as a collection of its shards:

In [None]:
iter1.shards()

In [None]:
for shard in iter1.shards():
    print("shard", shard)
    
    for item in shard:
        print("item", item)

Note that each shard should only be read by one process at a time.

As a more extended example, let's iterate through the JSON source for each of the Jupyter notebooks in this repo – roughly speaking, as if that were a streaming input source.

First, we'll load the JSON files as remote objects, saving their object references in a list comprehension:

In [None]:
%%time

from pathlib import Path
import json

nb_items = [
    ray.put(nb_path.read_text())
    for nb_path in Path(".").glob("ex_*.ipynb")
]

nb_items

Next we'll create a parallel iterator, to distribute processing across a cluster – or merely across the processor cores on your laptop.

For each remote object, the following will:

  * parse the text as JSON
  * extract the text from markdown cells
  * count the number of lines that contain the string `"Ray"`
  * apply a sliding window based on `window_width`, to simulate streamed input
  * calculate the average number of references to Ray in each notebook

In other words, the processing gets evaluated in batches, one for each "window" of input:

In [None]:
import numpy as np

window_width = 20

iter2 = (
    ray.util.iter.from_items(nb_items, num_shards=3)
        .for_each(lambda obj_ref: json.loads(ray.get(obj_ref)))
        .for_each(lambda nb: nb["cells"])
        .flatten()
        .for_each(lambda cell: cell["source"] if cell["cell_type"] == "markdown" else [])
        .flatten()
        .for_each(lambda line: 1 if "Ray" in line else 0)
        .batch(window_width)
        .for_each(np.mean)
)

iter2

Now calculate the probability of the term `Ray` occurring within the lines in each batch:

In [None]:
%%time

for freq in iter2.gather_async():
    print(freq)

---

Let's show a different example which passes iterator shards to remote functions.

First we'll define a remote function `word_count` to tally a *word count* across the documents specified by one shard:

In [None]:
from collections import defaultdict

@ray.remote
def word_count (shard):
    wc = defaultdict(int)
    
    for obj_ref in shard:
        text = ray.get(obj_ref)
        
        for token in text.split():
            token = token.lower().strip("'`<>[](){}*.,:…-'")
            wc[token] += 1

    return wc

Next we'll load the documents as remote objects:

In [None]:
!ls -lth dat/*.txt

In [None]:
%%time

import pathlib

dir = pathlib.Path("dat")

text_objs = [
    ray.put(file.read_text())
    for file in dir.glob("*.txt")
    ]

text_objs

Now pass each of the shards to the `word_count()` remote function:

In [None]:
%%time

work = [
    word_count.remote(shard)
    for shard in ray.util.iter.from_items(text_objs, num_shards=3).shards()
]
work

Aggregate the word counts from each shard, then list the tokens ranked in descending order:

In [None]:
%%time

wc_sum = defaultdict(int)

for wc in ray.get(work):
    for token, count in wc.items():
        wc_sum[token] += count

for token in sorted(wc_sum, key=wc_sum.get, reverse=True):
    print(wc_sum[token], token)

Finally, shutdown Ray

In [None]:
ray.shutdown()

---

## Summary

Parallel iterators provide a somewhat higher-level abstraction which uses Ray actors and `ray.wait` loops, and fit conveninently into efficient software patterns in Python.

Engineering trade-offs are available a multiple levels:

  * trade-off compute and memory requirements by partitioning sequences of items into data shards
  * trade-off compute and memory requirements for transformations on items by passing the data shards to remote functions (stateless) and remote methods (stateful)
  * trade-off the semantic guarantees on fetch ordering by using asynchronous iteration