# Ray Datasets: Distributed Data Preprocessing

Ray Datasets are the standard way to load and exchange data in Ray libraries and applications. They provide basic distributed data transformations such as maps ([`map_batches`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map_batches.html#ray.data.Dataset.map_batches "ray.data.Dataset.map_batches")), global and grouped aggregations ([`GroupedDataset`](https://docs.ray.io/en/latest/data/api/doc/ray.data.grouped_dataset.GroupedDataset.html#ray.data.grouped_dataset.GroupedDataset "ray.data.grouped_dataset.GroupedDataset")), and shuffling operations ([`random_shuffle`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.random_shuffle.html#ray.data.Dataset.random_shuffle "ray.data.Dataset.random_shuffle"), [`sort`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.sort.html#ray.data.Dataset.sort "ray.data.Dataset.sort"), [`repartition`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.repartition.html#ray.data.Dataset.repartition "ray.data.Dataset.repartition")), and are compatible with a variety of file formats, data sources, and distributed frameworks.

Here's an overview of the integrations with other processing frameworks, file formats, and supported operations, as well as a glimpse at the Ray Datasets API.

Check the [Input/Output reference](https://docs.ray.io/en/latest/data/api/input_output.html#input-output) to see if your favorite format is already supported.

![../_images/dataset.svg](https://docs.ray.io/en/latest/_images/dataset.svg)

Data Loading and Preprocessing for ML Training[](https://docs.ray.io/en/latest/data/dataset.html#data-loading-and-preprocessing-for-ml-training "Permalink to this headline")
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Use Ray Datasets to load and preprocess data for distributed [ML training pipelines](https://docs.ray.io/en/latest/train/train.html#train-docs). Compared to other loading solutions, Datasets are more flexible (e.g., can express higher-quality per-epoch global shuffles) and provides [higher overall performance](https://www.anyscale.com/blog/why-third-generation-ml-platforms-are-more-performant).

Use Datasets as a last-mile bridge from storage or ETL pipeline outputs to distributed applications and libraries in Ray. Don't use it as a replacement for more general data processing systems.

<img src='https://docs.ray.io/en/latest/_images/dataset-loading-1.png' width=70%/>

To learn more about the features Datasets supports, read the [Datasets User Guide](https://docs.ray.io/en/latest/data/user-guide.html#data-user-guide).

Datasets for Parallel Compute[](https://docs.ray.io/en/latest/data/dataset.html#datasets-for-parallel-compute "Permalink to this headline")
-------------------------------------------------------------------------------------------------------------------------------------------

Datasets also simplify general purpose parallel GPU and CPU compute in Ray; for instance, for [GPU batch inference](https://docs.ray.io/en/latest/ray-overview/use-cases.html#ref-use-cases-batch-infer). They provide a higher-level API for Ray tasks and actors for such embarrassingly parallel compute, internally handling operations like batching, pipelining, and memory management.

<img src='https://docs.ray.io/en/latest/_images/dataset-compute-1.png' width=60%/>

As part of the Ray ecosystem, Ray Datasets can leverage the full functionality of Ray's distributed scheduler, e.g., using actors for optimizing setup time and GPU scheduling.

Datasets[](https://docs.ray.io/en/latest/data/key-concepts.html#datasets "Permalink to this headline")
------------------------------------------------------------------------------------------------------

A Dataset consists of a list of Ray object references to *blocks*. Each block holds a set of items in either [Arrow table format](https://arrow.apache.org/docs/python/data.html#tables) or a Python list (for non-tabular data). For ML use cases, Datasets also natively supports mixing [Tensor](https://docs.ray.io/en/latest/data/dataset-tensor-support.html#datasets-tensor-support) and tabular data. Having multiple blocks in a dataset allows for parallel transformation and ingest.

Informally, we refer to:

-   A Dataset with Arrow blocks as a *Tabular Dataset*,

-   A Dataset with Python list blocks as a *Simple Dataset*, and

-   A Tabular Dataset with one or more tensor-type columns as a *Tensor Dataset*.

The following figure visualizes a tabular dataset with three blocks, each block holding 1000 rows each:

<img src='https://docs.ray.io/en/latest/_images/dataset-arch.svg' width=70%/>

Since a Dataset is just a list of Ray object references, it can be freely passed between Ray tasks, actors, and libraries like any other object reference. This flexibility is a unique characteristic of Ray Datasets.

### Reading Data[](https://docs.ray.io/en/latest/data/key-concepts.html#reading-data "Permalink to this headline")

Datasets uses Ray tasks to read data from remote storage. When reading from a file-based datasource (e.g., S3, GCS), it creates a number of read tasks proportional to the number of CPUs in the cluster. Each read task reads its assigned files and produces an output block:

![../_images/dataset-read.svg](https://docs.ray.io/en/latest/_images/dataset-read.svg)

The parallelism can also be manually specified, but the final parallelism for a read is always capped by the number of files in the underlying dataset. See the [Creating Datasets Guide](https://docs.ray.io/en/latest/data/creating-datasets.html#creating-datasets) for an in-depth guide on creating datasets.

### Transforming Data[](https://docs.ray.io/en/latest/data/key-concepts.html#transforming-data "Permalink to this headline")

Datasets can use either Ray tasks or Ray actors to transform datasets. By default, tasks are used. Actors can be specified using `compute=ActorPoolStrategy()`, which creates an autoscaling pool of Ray actors to process transformations. Using actors allows for expensive state initialization (e.g., for GPU-based tasks) to be cached:

![../_images/dataset-map.svg](https://docs.ray.io/en/latest/_images/dataset-map.svg)

See the [Transforming Datasets Guide](https://docs.ray.io/en/latest/data/transforming-datasets.html#transforming-datasets) for an in-depth guide on transforming datasets.

### Shuffling Data[](https://docs.ray.io/en/latest/data/key-concepts.html#shuffling-data "Permalink to this headline")

Certain operations like *sort* or *groupby* require data blocks to be partitioned by value, or *shuffled*. Datasets uses tasks to implement distributed shuffles in a map-reduce style, using map tasks to partition blocks by value, and then reduce tasks to merge co-partitioned blocks together.

You can also change just the number of blocks of a Dataset using [`repartition()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.repartition.html#ray.data.Dataset.repartition "ray.data.Dataset.repartition"). Repartition has two modes:

1.  `shuffle=False` - performs the minimal data movement needed to equalize block sizes

2.  `shuffle=True` - performs a full distributed shuffle

![../_images/dataset-shuffle.svg](https://docs.ray.io/en/latest/_images/dataset-shuffle.svg)

Datasets shuffle can scale to processing hundreds of terabytes of data. See the [Performance Tips Guide](https://docs.ray.io/en/latest/data/performance-tips.html#shuffle-performance-tips) for an in-depth guide on shuffle performance.

### Execution mode[](https://docs.ray.io/en/latest/data/key-concepts.html#execution-mode "Permalink to this headline")

Most transformations are lazy. They don't execute until you consume a dataset or call [`Dataset.materialize()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.materialize.html#ray.data.Dataset.materialize "ray.data.Dataset.materialize").

The transformations are executed in a streaming way, incrementally on the data and with operators processed in parallel, see [Streaming Execution](https://docs.ray.io/en/latest/data/dataset-internals.html#datasets-streaming-execution).

For an in-depth guide on Datasets execution, read [Execution](https://docs.ray.io/en/latest/data/dataset-internals.html#datasets-execution).

### Fault tolerance[](https://docs.ray.io/en/latest/data/key-concepts.html#fault-tolerance "Permalink to this headline")

Datasets performs *lineage reconstruction* to recover data. If an application error or system failure occurs, Datasets recreates lost blocks by re-executing tasks.

Fault tolerance isn't supported in two cases:

-   If the original worker process that created the Dataset dies. This is because the creator stores the metadata for the [objects](https://docs.ray.io/en/latest/ray-core/fault_tolerance/objects.html#object-fault-tolerance) that comprise the Dataset.

-   If you specify `compute=ActorPoolStrategy()` for transformations. This is because Datasets relies on [task-based fault tolerance](https://docs.ray.io/en/latest/ray-core/fault_tolerance/tasks.html#task-fault-tolerance).

## Example operations: Transforming Datasets

Datasets transformations take in datasets and produce new datasets. For example, *map* is a transformation that applies a [user-defined function](https://docs.ray.io/en/latest/data/transforming-datasets.html#transform-datasets-writing-udfs) on each dataset record and returns a new dataset as the result. Datasets transformations can be composed to express a chain of computations.

There are two main types of transformations:

-   One-to-one: each input block will contribute to only one output block, such as [`ds.map_batches()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map_batches.html#ray.data.Dataset.map_batches "ray.data.Dataset.map_batches").

-   All-to-all: input blocks can contribute to multiple output blocks, such as [`ds.random_shuffle()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.random_shuffle.html#ray.data.Dataset.random_shuffle "ray.data.Dataset.random_shuffle").

Here is a table listing some common transformations supported by Ray Datasets.

Common Ray Datasets transformations.[](https://docs.ray.io/en/latest/data/transforming-datasets.html#id2 "Permalink to this table")

| Transformation | Type | Description |
| --- | --- | --- |
|[`ds.map_batches()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map_batches.html#ray.data.Dataset.map_batches "ray.data.Dataset.map_batches")|One-to-one|Apply a given function to batches of records of this dataset.|
|[`ds.add_column()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.add_column.html#ray.data.Dataset.add_column "ray.data.Dataset.add_column")|One-to-one|Apply a given function to batches of records to create a new column.|
|[`ds.drop_columns()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.add_column.html#ray.data.Dataset.add_column "ray.data.Dataset.add_column")|One-to-one|Drop the given columns from the dataset.|
|[`ds.split()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.split.html#ray.data.Dataset.split "ray.data.Dataset.split")|One-to-one|Split the dataset into N disjoint pieces.|
|[`ds.repartition(shuffle=False)`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.repartition.html#ray.data.Dataset.repartition "ray.data.Dataset.repartition")|One-to-one|Repartition the dataset into N blocks, without shuffling the data.|
|[`ds.repartition(shuffle=True)`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.repartition.html#ray.data.Dataset.repartition "ray.data.Dataset.repartition")|All-to-all|Repartition the dataset into N blocks, shuffling the data during repartition.|
|[`ds.random_shuffle()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.random_shuffle.html#ray.data.Dataset.random_shuffle "ray.data.Dataset.random_shuffle")|All-to-all|Randomly shuffle the elements of this dataset.|
|[`ds.sort()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.sort.html#ray.data.Dataset.sort "ray.data.Dataset.sort")|All-to-all|Sort the dataset by a sortkey.|
|[`ds.groupby()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.groupby.html#ray.data.Dataset.groupby "ray.data.Dataset.groupby")|All-to-all|Group the dataset by a groupkey.|

> Tip
>
> Datasets also provides the convenience transformation methods [`ds.map()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map.html#ray.data.Dataset.map "ray.data.Dataset.map"), [`ds.flat_map()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.flat_map.html#ray.data.Dataset.flat_map "ray.data.Dataset.flat_map"), and [`ds.filter()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.filter.html#ray.data.Dataset.filter "ray.data.Dataset.filter"), which are not vectorized (slower than [`ds.map_batches()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map_batches.html#ray.data.Dataset.map_batches "ray.data.Dataset.map_batches")), but may be useful for development.

The following is an example to make use of those transformation APIs for processing the Iris dataset.

In [None]:
import ray
import pandas

# Create a dataset from file with Iris data.
ds = ray.data.read_csv("iris.csv")

ds

In [None]:
ds.show(3)

In [None]:
ds = ds.repartition(5)

ds

In [None]:
# Find rows with sepal.length < 5.5 and petal.length > 3.5.
def transform_batch(df: pandas.DataFrame) -> pandas.DataFrame:
    return df[(df["sepal.length"] < 5.5) & (df["petal.length"] > 3.5)]

# Map processing the dataset.
ds.map_batches(transform_batch).show()

In [None]:
# Split the dataset into 2 datasets
ds.split(2)

In [None]:
# Sort the dataset by sepal.length.
ds = ds.sort("sepal.length")
ds.show(3)

In [None]:
# Shuffle the dataset.
ds = ds.random_shuffle()
ds.show(3)

In [None]:
# Group by the variety.
ds.groupby("variety").count().show()

Writing User-defined Functions (UDFs)[](https://docs.ray.io/en/latest/data/transforming-datasets.html#writing-user-defined-functions-udfs "Permalink to this headline")
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------

User-defined functions (UDFs) are routines that apply on one row (e.g. [`.map()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map.html#ray.data.Dataset.map "ray.data.Dataset.map")) or a batch of rows (e.g. [`.map_batches()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map_batches.html#ray.data.Dataset.map_batches "ray.data.Dataset.map_batches")) of a dataset. UDFs let you express your customized business logic in transformations. Here we will focus on [`.map_batches()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map_batches.html#ray.data.Dataset.map_batches "ray.data.Dataset.map_batches") as it's the primary mapping API in Datasets.

Here are the basics that you need to know about UDFs:

-   A UDF can be either a function, or if using the [actor compute strategy](https://docs.ray.io/en/latest/data/transforming-datasets.html#transform-datasets-compute-strategy), a [callable class](https://docs.ray.io/en/latest/data/transforming-datasets.html#transform-datasets-callable-classes).

-   Select the UDF input [batch format](https://docs.ray.io/en/latest/data/transforming-datasets.html#transform-datasets-batch-formats) using the `batch_format` argument.

-   The UDF output type determines the Dataset schema of the transformation result.

### Callable Class UDFs[](https://docs.ray.io/en/latest/data/transforming-datasets.html#callable-class-udfs "Permalink to this headline")

When using the actor compute strategy, per-row and per-batch UDFs can also be *callable classes*, i.e. classes that implement the `__call__` magic method. The constructor of the class can be used for stateful setup, and will be only invoked once per worker actor.

<div class="alert alert-block alert-warning">
<b>Note:</b> These transformation APIs take the uninstantiated callable class as an argument, not an instance of the class.
</div>

In [None]:
ds = ray.data.read_csv("iris.csv")

def is_long(record):
    return record['sepal.length'] > 4.7

ds.map(is_long).show(10)

In [None]:
def is_long_batch(batch):
    output = batch['sepal.length'] > 4.7
    return output.to_frame()
    
ds.map_batches(is_long_batch).show(10)

In [None]:
import pandas as pd

class ModelUDF:
    def __init__(self):
        self.model = lambda df: df["sepal.length"] > 4.7

    def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
        # Apply model.
        df["output"] = self.model(df)
        return df

ds.map_batches(ModelUDF, compute="actors").show(10)

For more info on...
* batch formats
* zero-copy
* compute strategy

...refer to https://docs.ray.io/en/latest/data/transforming-datasets.html#udf-input-batch-format

In [None]:
import ray
import numpy as np

dogs_train = 's3://anonymous@air-example-data-2/imagenette2/train/n02102040'
dogs_val = 's3://anonymous@air-example-data-2/imagenette2/val/n02102040'
fish_train = 's3://anonymous@air-example-data-2/imagenette2/train/n01440764'
fish_val = 's3://anonymous@air-example-data-2/imagenette2/val/n01440764'

train_ds_images = ray.data.read_images(dogs_train).limit(50).union(ray.data.read_images(fish_train).limit(50))

labels = np.zeros(100, dtype=np.uint8)
labels[50:] = 1

train_ds_labels = ray.data.from_numpy(labels)

In [None]:
train_ds_images.schema()

In [None]:
from PIL import Image

im = train_ds_images.take(1)[0]['image']
PIL.Image.fromarray(im)

In [None]:
train_ds_labels.schema()

In [None]:
train_ds = train_ds_images.zip(train_ds_labels)

In [None]:
model_name_or_path = 'google/vit-base-patch16-224-in21k'
feature_extractor = ViTImageProcessor.from_pretrained(model_name_or_path)

    def transform(example_batch):
        # Take a list of PIL images and turn them to pixel values
        inputs = feature_extractor([x for x in example_batch['image']], return_tensors='pt')

        # Don't forget to include the labels!
        inputs['labels'] = example_batch['labels']
        return inputs

    prepared_ds = ds.with_transform(transform)

In [None]:
ray.