(ray-data-load-inspect-save)=
# Data Loading, Inspection, and Saving

Ray Data is compatible with various data sources, including files, in-memory data, and databases.

In [1]:
import os
import shutil
import urllib.request
from pathlib import Path

import ray

if ray.is_initialized:
    ray.shutdown()

ray.init()

2024-02-15 09:08:22,094	INFO worker.py:1724 -- Started a local Ray instance.


0,1
Python version:,3.11.7
Ray version:,2.9.0


## Loading Data

Ray Data provides numerous built-in methods for loading data, including reading files, reading in-memory data such as pandas DataFrames, and reading data from databases. Here, we demonstrate reading a Parquet file using an example of New York City taxi dataset.

First, download the data. The following code checks whether the data is already downloaded to the target folder. If downloaded, there is no need to repeat the download.

In [2]:
folder_path = os.path.join(os.getcwd(), "../data/nyc-taxi")
download_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-06.parquet"
file_name = download_url.split("/")[-1]
parquet_file_path = os.path.join(folder_path, file_name)
if not os.path.exists(folder_path):
    os.makedirs(folder_path)
    with urllib.request.urlopen(download_url) as response, open(parquet_file_path, 'wb') as out_file:
        shutil.copyfileobj(response, out_file)

Use the [`ray.data.read_parquet()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_parquet.html) method to read the data, and get a `Dataset`.

In [3]:
dataset = ray.data.read_parquet(parquet_file_path)

Parquet Files Sample 0:   0%|          | 0/1 [00:00<?, ?it/s]

To inspect the schema of this dataset:

In [4]:
dataset.schema()

Column                 Type
------                 ----
VendorID               int32
tpep_pickup_datetime   timestamp[us]
tpep_dropoff_datetime  timestamp[us]
passenger_count        int64
trip_distance          double
RatecodeID             int64
store_and_fwd_flag     large_string
PULocationID           int32
DOLocationID           int32
payment_type           int64
fare_amount            double
extra                  double
mta_tax                double
tip_amount             double
tolls_amount           double
improvement_surcharge  double
total_amount           double
congestion_surcharge   double
Airport_fee            double

For other types of file formats (CSV, TFRecord, etc.), the reading methods are illustrated in {numref}`ray-data-read-files`.

```{table} Methods for data reading
:name: ray-data-read-files
|      	|     Parquet    	|     Text    	|     CSV    	|     TFRecord     	| Binary              	|
|:----:	|:--------------:	|:-----------:	|:----------:	|:----------------:	|---------------------	|
| Method 	| [`read_parquet()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_parquet.html) 	| [read_text()](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_text.html) 	| [read_csv()](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_csv.html) 	| [read_tfrecords()](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_tfrecords.html) 	| [read_binary_files()](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_binary_files.html) 	|
```

### Column and Row Pruning

The original file contains many columns. If we are only interested in specific columns, such as `passenger_count`, `tip_amount`, `payment_type`, etc., we can filter unnecessary columns by using the `columns` parameter of the `read_parquet()` method.

In [5]:
dataset = ray.data.read_parquet(
    parquet_file_path, 
    columns=["passenger_count", "tip_amount", "payment_type"]
)
dataset.schema()

Parquet Files Sample 0:   0%|          | 0/1 [00:00<?, ?it/s]

Column           Type
------           ----
passenger_count  int64
tip_amount       double
payment_type     int64

After adding the `columns` restriction, only the columns of interest are read, and other columns are not. This is known as Column Pruning, which reduces data reading overhead and is one of the commonly used optimization techniques in the field of data engineering.

In addition to Column Pruning, Ray Data also supports Row Pruning, meaning rows that meet specific conditions are read. For example, rows where `tip_amount` is greater than 6.0 are filtered out:

In [6]:
import pyarrow as pa

dataset = ray.data.read_parquet(
    parquet_file_path, 
    columns=["passenger_count", "tip_amount", "payment_type"],
    filter=pa.dataset.field("tip_amount") > 6.0
)
dataset.show(limit=2)

Parquet Files Sample 0:   0%|          | 0/1 [00:00<?, ?it/s]

2024-02-15 09:08:23,918	INFO dataset.py:2488 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2024-02-15 09:08:23,924	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=173 for stage ReadParquet to satisfy output blocks of size at least DataContext.get_current().target_min_block_size=1.0MiB.
2024-02-15 09:08:23,925	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 173, each read task output is split into 173 smaller blocks.
2024-02-15 09:08:23,926	INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=2]
2024-02-15 09:08:23,927	INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_

Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[36m(ReadParquet->SplitBlocks(173) pid=99592)[0m   return transform_pyarrow.concat(tables)


{'passenger_count': 1, 'tip_amount': 6.7, 'payment_type': 1}
{'passenger_count': 1, 'tip_amount': 10.0, 'payment_type': 1}


### Parallelism Optimization

As mentioned in {numref}`ray-data-intro`, Ray Data uses task or actor to parallelize the data processing. During data reading, you can set the `parallelism` parameter to optimize the parallel data processing. For various data reading methods provided by Ray Data, including `read_parquet()`, you can set the `parallelism` parameter to control the underlying parallel execution process. If `parallelism` is not set, Ray Data probes `parallelism` in the following way:

1. Ray gets the available number of CPU cores in the cluster.
2. `parallelism` is set to twice the number of CPU cores. If `parallelism` is less than 8, it is set to 8.
3. Estimate the size of each `Block`. If the average size of each `Block` is greater than 512 MiB, Ray increases `parallelism` until each `Block` is less than 512 MiB.

Users can also manually set `parallelism` based on the actual data size. For example, `ray.data.read_parquet(path, parallelism=512)` will force the generation of 512 Ray tasks to read data in parallel.

## Inspecting Data

Inspecting data involves examining the schema, specific rows, or batches of data. This includes the `show()` method used earlier, as well as upcoming methods such as `count()` and `take()`.

Get the number of samples of this `Dataset`:

In [7]:
dataset.count()

Read progress 0:   0%|          | 0/1 [00:00<?, ?it/s]

[36m(_execute_read_task_split pid=99587)[0m   return transform_pyarrow.concat(tables)


474915

To view rows of data, you can utilize the [`Dataset.take()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.take.html) or [`Dataset.take_all()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.take_all.html) methods. The `take()` method extracts a particular row from the `Dataset` and prints it in the form of a dictionary, where the keys represent field names, and the values correspond to their respective values.

In [8]:
dataset.take(limit=1)

2024-02-15 09:08:24,832	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=173 for stage ReadParquet to satisfy output blocks of size at least DataContext.get_current().target_min_block_size=1.0MiB.
2024-02-15 09:08:24,832	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 173, each read task output is split into 173 smaller blocks.
2024-02-15 09:08:24,833	INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=1]
2024-02-15 09:08:24,833	INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-15 09:08:24,833	INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.Data

Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[{'passenger_count': 1, 'tip_amount': 6.7, 'payment_type': 1}]

Another option is to split the `Dataset` into smaller batches using the [`Dataset.take_batch()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.take_batch.html) method. To inspect the data in a batch, the `take_batch()` method is employed. An essential parameter of this method is `batch_size`, which determines the size of each batch.

In [9]:
batch = dataset.take_batch(batch_size=2)
batch

2024-02-15 09:08:24,954	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=173 for stage ReadParquet to satisfy output blocks of size at least DataContext.get_current().target_min_block_size=1.0MiB.
2024-02-15 09:08:24,954	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 173, each read task output is split into 173 smaller blocks.
2024-02-15 09:08:24,955	INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=2]
2024-02-15 09:08:24,955	INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-15 09:08:24,955	INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.Data

Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[36m(ReadParquet->SplitBlocks(173) pid=99590)[0m   return transform_pyarrow.concat(tables)


{'passenger_count': array([1, 1]),
 'tip_amount': array([ 6.7, 10. ]),
 'payment_type': array([1, 1])}

## Iterating Over Data

### Sequential Iterating

Ray Data offers methods for iterating through data, namely [`Dataset.iter_rows()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.iter_rows.html) and [`Dataset.iter_batches()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.iter_batches.html). `iter_rows()` iterates through each row, while `iter_batches()` iterates through each batch.

For example, to iterate through the first 5 rows:

In [10]:
cnt = 0
for row in dataset.iter_rows():
    cnt += 1
    if cnt > 5:
        break
    print(row)

{'passenger_count': 1, 'tip_amount': 6.7, 'payment_type': 1}
{'passenger_count': 1, 'tip_amount': 10.0, 'payment_type': 1}
{'passenger_count': 3, 'tip_amount': 8.88, 'payment_type': 1}
{'passenger_count': 4, 'tip_amount': 19.66, 'payment_type': 1}
{'passenger_count': 5, 'tip_amount': 19.56, 'payment_type': 1}


To iterate through the first 5 batches:

In [11]:
cnt = 0
for batch in dataset.iter_batches(batch_size=2):
    cnt += 1
    if cnt > 5:
        break
    print(batch)

{'passenger_count': array([1, 1]), 'tip_amount': array([ 6.7, 10. ]), 'payment_type': array([1, 1])}
{'passenger_count': array([3, 4]), 'tip_amount': array([ 8.88, 19.66]), 'payment_type': array([1, 1])}
{'passenger_count': array([5, 1]), 'tip_amount': array([19.56,  7.11]), 'payment_type': array([1, 1])}
{'passenger_count': array([1, 1]), 'tip_amount': array([ 8.  , 11.43]), 'payment_type': array([1, 1])}
{'passenger_count': array([1, 5]), 'tip_amount': array([6.36, 9.99]), 'payment_type': array([1, 1])}


In deep learning frameworks like PyTorch and TensorFlow, training or inference often involves processing data in batches. To seamlessly integrate with these frameworks, Ray Data provides [`Dataset.iter_torch_batches()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.iter_torch_batches.html) and [`Dataset.iter_tf_batches()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.iter_tf_batches.html) methods. These methods convert the data into PyTorch and TensorFlow `Tensor` formats.

In [12]:
cnt = 0
for batch in dataset.iter_torch_batches(batch_size=2):
    cnt += 1
    if cnt > 5:
        break
    print(batch)

{'passenger_count': tensor([1, 1]), 'tip_amount': tensor([ 6.7000, 10.0000], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([3, 4]), 'tip_amount': tensor([ 8.8800, 19.6600], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([5, 1]), 'tip_amount': tensor([19.5600,  7.1100], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1, 1]), 'tip_amount': tensor([ 8.0000, 11.4300], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1, 5]), 'tip_amount': tensor([6.3600, 9.9900], dtype=torch.float64), 'payment_type': tensor([1, 1])}


### Randomly Iterating

In machine learning, it is common to shuffle sample data, and Ray Data provides two methods for this purpose:

* Full Data Shuffle
* Local Buffer Shuffle

The [`Dataset.random_shuffle()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.random_shuffle.html) method shuffles the entire dataset, implying that data scattered across different compute nodes will be exchanged, incurring significant inter-node communication overhead and resulting in slow performance.

Local Cache Shuffle involves using a local cache on each compute node, shuffling the data within the cache. This reduces randomness but significantly improves performance compared to full data shuffle, and also reduces inter-node communication overhead. To implement this, you only need to use the `local_shuffle_buffer_size` parameter in the iteration method, and set the random seed with `local_shuffle_seed`.

In the following example, a cache region is set up, ensuring that the cache contains at least 250 rows of data, meaning shuffling is performed on a subset of at least 250 rows of data.

In [13]:
cnt = 0
for batch in dataset.iter_torch_batches(batch_size=2, local_shuffle_buffer_size=250):
    cnt += 1
    if cnt > 5:
        break
    print(batch)

{'passenger_count': tensor([1, 1]), 'tip_amount': tensor([7.8400, 6.5800], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1, 1]), 'tip_amount': tensor([10.0000, 13.0900], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1, 1]), 'tip_amount': tensor([ 9.2400, 13.7900], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([3, 1]), 'tip_amount': tensor([15.0000, 11.6100], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([2, 1]), 'tip_amount': tensor([10.0000, 16.4600], dtype=torch.float64), 'payment_type': tensor([1, 1])}


:::{note}
Local Shuffle is a strategy that strikes a balance between randomness and performance. In machine learning, higher randomness often leads to higher model accuracy.
:::

## Saving Data

Data saving can be categorized into two types:

* Saving data to a local or shared file system, such as the local filesystem or S3.
* Transforming data into other formats or writing to specific databases, such as pandas or MongoDB.

### Writing to Filesystems

When using HDFS, S3, or other filesystems, Ray Data adheres to the URI and file system scheme standards mentioned in {numref}`uri-schemes`. It is essential to explicitly specify the scheme information in the URI.

{numref}`ray-data-save` lists several APIs for saving the `Dataset` in various file formats.

```{table} Writing Dataset into filesystems
:name: ray-data-save
|      	|     Parquet    	|     CSV    	|     JSON    	|     TFRecord     	|
|:----:	|:--------------:	|:-----------:	|:----------:	|:----------------:	|
| Method 	| [Dataset.write_parquet()](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.write_parquet.html) 	| [Dataset.write_csv()](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.write_csv.html) 	| [Dataset.write_json()](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.write_json.html) 	| [Dataset.write_tfrecords()](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.write_tfrecords.html) 	|
```

When persisting data to the filesystem, do not forget to specify the filesystem scheme.

In [14]:
dataset.write_parquet("file:///tmp/trip")

2024-02-15 09:08:25,962	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=173 for stage ReadParquet to satisfy output blocks of size at least DataContext.get_current().target_min_block_size=1.0MiB.
2024-02-15 09:08:25,962	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 173, each read task output is split into 173 smaller blocks.
2024-02-15 09:08:25,962	INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[Write]
2024-02-15 09:08:25,963	INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-15 09:08:25,963	INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.

Running 0:   0%|          | 0/1 [00:00<?, ?it/s]



By default, Ray Data writes data to the filesystem in the form of multiple chunked files, where each `Block` corresponds to one file. The number of files is equal to the number of `Block`s. You can use [`repartition()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.repartition.html) to modify the number of files.

In [15]:
if os.path.exists("/tmp/files/"):
    shutil.rmtree("/tmp/files/")
dataset.repartition(3).write_csv("/tmp/files/")
print(os.listdir("/tmp/files/"))

2024-02-15 09:08:26,255	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=173 for stage ReadParquet to satisfy output blocks of size at least DataContext.get_current().target_min_block_size=1.0MiB.
2024-02-15 09:08:26,255	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 173, each read task output is split into 173 smaller blocks.
2024-02-15 09:08:26,255	INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> AllToAllOperator[Repartition] -> TaskPoolMapOperator[Write]
2024-02-15 09:08:26,255	INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-15 09:08:26,256	INFO streaming_executor.py:115 -- Tip: For detailed p

- Repartition 1:   0%|          | 0/3 [00:00<?, ?it/s]

Split Repartition 2:   0%|          | 0/3 [00:00<?, ?it/s]

Running 0:   0%|          | 0/3 [00:00<?, ?it/s]

[36m(ReadParquet->SplitBlocks(173) pid=99593)[0m   return transform_pyarrow.concat(tables)


['9_000002_000000.csv', '9_000000_000000.csv', '9_000001_000000.csv']


[36m(reduce pid=99588)[0m   return transform_pyarrow.concat(tables)


### Converting to Other Framework Formats

Ray Data allows the conversion of its data into single-node pandas DataFrame or a distributed Dask DataFrame, as illustrated in {numref}`ray-data-convert-other-library`.

```{table} Converting for other frameworks
:name: ray-data-convert-other-library
|      	|     pandas    	|     Dask    	|     Spark    	| 
|:----:	|:--------------:	|:-----------:	|:----------:	|
| Method 	|  [Dataset.to_pandas()](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.to_pandas.html) 	| [Dataset.to_dask()](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.to_dask.html) 	| [Dataset.to_spark()](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.to_spark.html) 	|
```