Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AutoSharding IterableDataset's when num_workers > 1 #5984

Open
mathephysicist opened this issue Jun 23, 2023 · 8 comments
Open

AutoSharding IterableDataset's when num_workers > 1 #5984

mathephysicist opened this issue Jun 23, 2023 · 8 comments
Labels
enhancement New feature or request

Comments

@mathephysicist
Copy link

Feature request

Minimal Example

import torch
from datasets import IterableDataset

d = IterableDataset.from_file(<file_name>)
dl = torch.utils.data.dataloader.DataLoader(d,num_workers=3)

for sample in dl:
    print(sample)

Warning:
Too many dataloader workers: 2 (max is dataset.n_shards=1). Stopping 1 dataloader workers.
To parallelize data loading, we give each process some shards (or data sources) to process. Therefore it's unnecessary to have a number of workers greater than dataset.n_shards=1. To enable more parallelism, please split the dataset in more files than 1.

Expected Behavior:
Dataset is sharded each cpu uses subset (contiguously - so you can do checkpoint loading/saving)

Motivation

I have a lot of unused cpu's and would like to be able to shard iterable datasets with pytorch's dataloader when num_workers > 1. This is for a very large single file. I am aware that we can use the split_dataset_by_node to ensure that each node (for distributed) gets different shards, but we should extend it so that this also continues for multiple workers.

Your contribution

If someone points me to what needs to change, I can create a PR.

@mathephysicist mathephysicist added the enhancement New feature or request label Jun 23, 2023
@mariosasko
Copy link
Collaborator

For this to be possible, we would have to switch from the "Streaming" Arrow format to the "Random Access" (IPC/Feather) format, which allows reading arbitrary record batches (explained here). We could then use these batches to construct shards.

@lhoestq @albertvillanova Do you think this use case is worth the switch? Also, we currently shard files, not inner row groups/chunks. Should we also support sharding row groups (e.g. if the number of input files is 1)?

PS: I don't expect significant speed-up for local, uncompressed Arrow files.

@lhoestq
Copy link
Member

lhoestq commented Jun 27, 2023

Alternatively we could support multiprocessing map for iterable datasets and let the user do the CPU intensive task there ?

This way it would work on arrow data but also on any iterable dataset

@imarquart
Copy link

For this to be possible, we would have to switch from the "Streaming" Arrow format to the "Random Access" (IPC/Feather) format, which allows reading arbitrary record batches (explained here). We could then use these batches to construct shards.

@lhoestq @albertvillanova Do you think this use case is worth the switch? Also, we currently shard files, not inner row groups/chunks. Should we also support sharding row groups (e.g. if the number of input files is 1)?

PS: I don't expect significant speed-up for local, uncompressed Arrow files.

Could you explain why you'd need to change the arrow format?

When we use streaming datasets we simply determine the number of worker shards and then add some modulo logic at the appropriate place. Worst case scenario, you'd skip streaming entries according to the number of shards.

For PyTorch, I'd be happy to provide an implementation or a sketch thereof, if you point me toward what the testing requirements would be for such a PR.

@lhoestq
Copy link
Member

lhoestq commented Jul 4, 2023

Could you explain why you'd need to change the arrow format?

This way workers have random access to the location of the file where its dataset subset starts. Currently we're using the Arrow streaming format which doesn't include the metadata of the record batches offsets. This is needed here to efficiently split a dataset made of one single file.

@imarquart
Copy link

imarquart commented Jul 4, 2023

Could you explain why you'd need to change the arrow format?

This way workers have random access to the location of the file where its dataset subset starts. Currently we're using the Arrow streaming format which doesn't include the metadata of the record batches offsets. This is needed here to efficiently split a dataset made of one single file.

I guess I don't understand why you'd need to subset the dataset in the first place.
It seems sufficient to figure out how to offset or skip rows.

For instance, using pyArrow, you could use RecordBatchStreamReader to zero-copy iterate over records with read_next_batch and then only initiate the next step for records modulo worker shard.
That's one way to do it, where of course you'd need to account for gpu sharding as well.

Otherwise, how did you implement worker/node/GPU sharding for iterable/streaming data where you do not have index information or prior splits (e.g. files)?

@lhoestq
Copy link
Member

lhoestq commented Jul 4, 2023

For instance, using pyArrow, you could use RecordBatchStreamReader to zero-copy iterate over records with read_next_batch and then only initiate the next step for records modulo worker shard.

That works indeed ! And what we meant is that you can make it even faster to instantiate. Indeed using RecordBatchStreamReader you need to get the list of all the record batches in each worker, whereas you could just get the list of record batches per worker if you use the record batches locations in the Arrow IPC file footer. This would be especially appreciated to have a fast instantiation in case you have tens of thousands of Arrow files for example.

@lololololoki
Copy link

Any recent updates on this ?

@pauli31
Copy link

pauli31 commented Mar 22, 2024

I would also appreciate this feature

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

6 participants