In [None]:
import ray
import pandas as pd
import numpy as np

COLUMNS='ABCD'
SIZE_100MiB = 100 * 1024 * 1024
PARALLELISM = 8

import tempfile
from ray.data.datasource import RandomIntRowDatasource

def generate_example_files(size_bytes: int) -> str:
    tmpdir = tempfile.mkdtemp()
    ray.data.read_datasource(
        RandomIntRowDatasource(),
        n=size_bytes // 8 // len(COLUMNS),
        num_columns=len(COLUMNS)).write_parquet(tmpdir)
    return tmpdir

example_files_dir = generate_example_files(SIZE_100MiB)
# Try `ls` on this directory from shell to see what files were created.
print(example_files_dir)

ds = ray.data.read_parquet(example_files_dir)
ds.show(limit=10)

# Pipelining execution with Ray DatasetPipelines

Datasets execute their transformations synchronously in blocking calls. However, it can be useful to overlap dataset computations with output. This can be done with a DatasetPipeline.

A [DatasetPipeline](https://docs.ray.io/en/master/data/dataset-pipeline.html) is an unified iterator over a (potentially infinite) sequence of Ray Datasets, each of which represents a window over the original data. Conceptually it is similar to a Spark DStream, but manages execution over a bounded amount of source data instead of an unbounded stream. Ray computes each dataset window on-demand and stitches their output together into a single logical data iterator. DatasetPipeline implements most of the same transformation and output methods as Datasets (e.g., map, filter, split, iter_rows, to_torch, etc.).

This enables us to turn executions that look like this:
![no pipeline](https://docs.ray.io/en/master/_images/dataset-pipeline-1.svg)

Into something like this, where each block is a Dataset operation that can be pipelined with the other steps:
![pipeline](https://docs.ray.io/en/master/_images/dataset-pipeline-2.svg)

## Creating a DatasetPipeline

A DatasetPipeline can be constructed in two ways: either by pipelining the execution of an existing Dataset (via Dataset.window), or generating repeats of an existing Dataset (via Dataset.repeat). Similar to Datasets, you can freely pass DatasetPipelines between Ray tasks, actors, and libraries.

Let's get started by taking our Dataset from the previous example and transforming it into a DatasetPipeline. Try it using `ds.repeat`:

In [None]:
from ray.data.dataset_pipeline import DatasetPipeline

NUM_EPOCHS = 3

# This line returns immediately.
# The DatasetPipeline execution happens lazily, as data gets cleared from the pipeline.
pipe = ds.repeat(NUM_EPOCHS)

Now that we have a DatasetPipeline, we can shuffle the data and pipeline that shuffle with some other execution, like a training task. This time, we'll use `.random_shuffle_each_window()` to shuffle the data in windows instead of shuffling the whole Dataset. The resulting execution will look something like this:

![ingest](https://docs.ray.io/en/master/_images/dataset-repeat-1.svg)

But what if we're using distributed training? Then, we actually need multiple synchronized pipelines, to make sure that each distributed training worker has a disjoint subset of the shuffled data. We can do this with `.split()`, which will then give us something like this:

![e2e-ingest](https://docs.ray.io/en/master/_images/dataset-repeat-2.svg)

Let's try this out with the following code.

In [None]:
def create_pipeline(num_splits):
    return ds.repeat(NUM_EPOCHS) \
            .random_shuffle_each_window() \
            .split(num_splits, equal=True)

# What does this print? Why?
splits = create_pipeline(3)
for i, shard in enumerate(splits):
    print(i, shard)

## Consuming a distributed DatasetPipeline

Now that we've created a distributed and shuffling data-loader, we can start consuming it!

We'll do that with a pool of actors, each of which will take one of the DatasetPipeline shards and start reading batches. Each batch will be one shuffled window of the dataset, i.e. a group of rows, each with 4 columns.

Let's define an actor that counts the number of rows that it's seen so far.

In [None]:
@ray.remote
class TrainingWorker:
    def __init__(self, rank: int, shard: DatasetPipeline):
        self.rank = rank
        self.shard = shard
        
        self.num_rows = 0

    def train(self):
        for epoch, training_dataset in enumerate(self.shard.iter_datasets()):
            # Following code emulates epoch based SGD training.
            print(f"Training... worker: {self.rank}, epoch: {epoch}")
            for i, batch in enumerate(training_dataset.iter_batches(batch_format="pandas")):
                self.num_rows += len(batch)
                # Can replace with actual training code.

        return self.num_rows

Now let's put this all together, and run the pipeline!

This will start 3 trainers and iterate over the full dataset for 3 iterations (epochs).
Each trainer should see exactly 1/3 of the data.

In [None]:
import time

NUM_TRAINERS = 3
splits = create_pipeline(3)
training_workers = [
    TrainingWorker.remote(rank, shard) for rank, shard in enumerate(splits)
]

# Let's run the e2e pipeline
start = time.time()
print(ray.get([worker.train.remote() for worker in training_workers]))
print(f"total ingestion time: {int(time.time() - start)}s")