Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
improve data loading docs (#4909)
Browse files Browse the repository at this point in the history
* improve data loading docs

* document best practices, add 'get_batch_size' method to samplers

* try fix annoying unrelated test

* revert that

* clarify handling of 'max_instances_in_memory'
  • Loading branch information
epwalsh committed Jan 12, 2021
1 parent 2f54570 commit effcc4e
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 59 deletions.
167 changes: 109 additions & 58 deletions allennlp/data/data_loaders/multiprocess_data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class MultiProcessDataLoader(DataLoader):
See
[Using your reader with multi-process or distributed data loading](/api/data/dataset_readers/dataset_reader/#datasetreader.using_your_reader_with_multi-process_or_distributed_data_loading)
for more information on how to optimize your `DatasetReader`.
for more information on how to optimize your `DatasetReader` for use with this `DataLoader`.
# Parameters
Expand Down Expand Up @@ -73,7 +73,7 @@ class MultiProcessDataLoader(DataLoader):
to `__iter__()`.
num_workers: `int`, optional (default = `0`)
The number of workers to use to read `Instance`s in parallel.
The number of workers to use to read `Instances` in parallel.
If `num_workers = 0`, everything is done in the main process. Otherwise `num_workers`
workers are forked or spawned (depending on the value of `start_method`), each of which
calls `read()` on their copy of the `reader`.
Expand All @@ -82,6 +82,11 @@ class MultiProcessDataLoader(DataLoader):
the `reader` needs to implement
[`manual_multiprocess_sharding`](/api/data/dataset_readers/dataset_reader/#datasetreader).
!!! Warning
Multi-processing code in Python is complicated! We highly recommend you read the short
[Best practices](#multiprocessdataloader.best_practices) and
[Common issues](#multiprocessdataloader.common_issues) sections below before using this option.
max_instances_in_memory: `int`, optional (default = `None`)
If not specified, all instances will be read and cached in memory for the duration
of the data loader's life. This is generally ideal when your data can fit in memory
Expand All @@ -91,14 +96,22 @@ class MultiProcessDataLoader(DataLoader):
!!! Note
This setting will affect how a `batch_sampler` is applied. If
`max_instances_in_memory` is `None`, the sampler will be applied to all `Instance`s.
Otherwise the sampler will be applied to only `max_instances_in_memory` `Instance`s
`max_instances_in_memory` is `None`, the sampler will be applied to all `Instances`.
Otherwise the sampler will be applied to only `max_instances_in_memory` `Instances`
at a time.
Therefore when using this option with a sampler, you should generally set it to a multiple of
the sampler's `batch_size` (if it has one).
start_method: `str`, optional (default = `"fork"`)
The [start method](https://docs.python.org/3.7/library/multiprocessing.html#contexts-and-start-methods)
used to spin up workers.
On Linux or OS X, "fork" usually has the lowest overhead for starting workers
but could potentially lead to dead-locks if you're using lower-level libraries that are not fork-safe.
If you run into these issues, try using "spawn" instead.
cuda_device: `Optional[Union[int, str, torch.device]]`, optional (default = `None`)
If given, batches will automatically be put on this device.
Expand All @@ -107,9 +120,41 @@ class MultiProcessDataLoader(DataLoader):
will automatically call [`set_target_device()`](#set_target_device) before iterating
over batches.
!!! Warning
# Best practices
- **Large datasets**
If your dataset is too big to fit into memory (a common problem), you'll need to load it lazily.
This is done by simply setting the `max_instances_in_memory` parameter to a non-zero integer.
The optimal value depends on your use case.
If you're using a `batch_sampler`, you will generally get better samples by setting
`max_instances_in_memory` to a higher number - such as 10 to 100 times your batch size -
since this determines how many `Instances` your `batch_sampler` gets to sample from at a time.
If you're not using a `batch_sampler` then this number is much less important. Setting it to
2 to 10 times your batch size is a reasonable value.
Keep in mind that using `max_instances_in_memory` generally results in a slower
training loop unless you load data in worker processes by setting the `num_workers` option to a
non-zero integer (see below). That way data loading won't block the main process.
- **Performance**
The quickest way to increase the performance of data loading is adjust the `num_workers` parameter.
`num_workers` determines how many workers are used to read `Instances` from your
`DatasetReader`. By default, this is set to `0`, which means everything is done in the main process.
Before trying to set `num_workers` to a non-zero number, you should make sure your `DatasetReader`
is [optimized for use with multi-process data loading]
(/api/data/dataset_readers/dataset_reader/#datasetreader.using_your_reader_with_multi-process_or_distributed_data_loading).
# Common issues
- **Dead-locks**
Multiprocessing code in Python is complicated! Especially code that involves lower-level libraries
which may be spawning their own threads / processes. If you run into dead-locks while
which may be spawning their own threads. If you run into dead-locks while
using `num_workers > 0`, luckily there are two simple work-arounds which usually fix the issue.
The first work-around is to disable parallelism for these low-level libraries.
Expand All @@ -121,14 +166,23 @@ class MultiProcessDataLoader(DataLoader):
See [issue #4848](https://github.com/allenai/allennlp/issues/4848) for more info.
!!! Warning
Another issue besides dead-locks that you could run into when using `num_workers > 0`
is running out of shared memory, since tensors are passed between processes
using shared memory, and some systems impose strict limits on the allowed size of shared
memory.
Dead-locks could also be caused by running out of shared memory (see below).
- **Shared memory restrictions**
Tensors are passed between processes using shared memory, and some systems impose strict
limits on the allowed size of shared memory.
Luckily this is simple to debug and simple to fix.
Luckily there is also a simple work-around for this. Either decrease `max_instances_in_memory`
or increase your system's `ulimit`.
First, to verify that this is your issue just watch your shared memory as your data loader runs.
For example, run `watch -n 0.3 'df -h | grep shm'`.
If you're seeing your shared memory blow up until it maxes-out, then you either need to decrease
`max_instances_in_memory` or increase your system's `ulimit`.
If you're using Docker, you can increase the shared memory available on a container by running
it with the option `--ipc=host` or by setting `--shm-size`.
See [issue #4847](https://github.com/allenai/allennlp/issues/4847) for more info.
Expand Down Expand Up @@ -203,13 +257,18 @@ def __init__(
# They have to be big enough that is doesn't hurt performance, but small enough
# that they don't take up too many resources when there is a bottleneck on the
# consuming end of a queue.
effective_batch_size = (
self.batch_size if self.batch_sampler is None else self.batch_sampler.get_batch_size()
)
self._max_instance_queue_size = (
None if max_instances_in_memory is None else max_instances_in_memory * 4
None
if max_instances_in_memory is None
else 2 * self.num_workers * max_instances_in_memory
)
self._max_batch_queue_size = (
None
if max_instances_in_memory is None
else 4 * max_instances_in_memory // (batch_size or 1)
else 2 * self.num_workers * max_instances_in_memory // (effective_batch_size or 1)
)

# If max_instances_in_memory is not given, we'll keep a cache of all instances in this list.
Expand Down Expand Up @@ -406,8 +465,8 @@ def _instance_worker(self, worker_id: int, queue: mp.JoinableQueue) -> None:
f"Found a TextField ({field_name}) with token_indexers already "
"applied, but you're using num_workers > 0 in your data loader. "
"Make sure your dataset reader's text_to_instance() method doesn't "
"add any token_indexers to the TextFields it creates. The token_indexers "
"should be added to the instances in apply_token_indexers() method of your "
"add any token_indexers to the TextFields it creates. Instead, the token_indexers "
"should be added to the instances in the apply_token_indexers() method of your "
"dataset reader (which you'll have to implement if you haven't done "
"so already)."
)
Expand Down Expand Up @@ -466,56 +525,48 @@ def _instances_to_batches(
) -> Iterator[TensorDict]:
instance_iterator = (self._index_instance(instance) for instance in instance_iterator)

if self.max_instances_in_memory is not None:
max_instances_in_memory = max(
1, self.max_instances_in_memory // max(self.num_workers, 1)
if move_to_device and self.cuda_device is not None:
tensorize = lambda batch: nn_util.move_to_device( # noqa: E731
self.collate_fn(batch), self.cuda_device
)
else:
tensorize = self.collate_fn

if max_instances_in_memory > 1 and self.batch_size is not None and self.batch_size > 1:
# Make sure max_instances_in_memory is a multiple of `batch_size`.
max_instances_in_memory = (
(max_instances_in_memory + self.batch_size - 1) // self.batch_size
) * self.batch_size

if self.shuffle:
instance_iterator = shuffle_iterable(
instance_iterator,
max_instances_in_memory,
)
if self.batch_sampler is not None:
instance_chunks: Iterable[List[Instance]]

instance_chunks: Iterable[List[Instance]] = lazy_groups_of(
instance_iterator, max_instances_in_memory
)
else:
# At this point we've already loaded the instances in memory and indexed them,
# so this won't take long.
instance_chunks = [list(instance_iterator)]
if self.shuffle:
random.shuffle(instance_chunks[0])
if self.max_instances_in_memory is not None:
instance_chunks = lazy_groups_of(instance_iterator, self.max_instances_in_memory)
else:
instance_chunks = [list(instance_iterator)]

for instances in instance_chunks:
batches: Iterator[List[Instance]]
if self.batch_sampler:
for instances in instance_chunks:
batches = (
[instances[i] for i in batch_indices]
for batch_indices in self.batch_sampler.get_batch_indices(instances)
)
else:
# NOTE: it's safe to assume `batch_size` is not `None` when `batch_sampler` is `None`.
# Hence the `type: ignore` comment.
batches = lazy_groups_of(instances, self.batch_size) # type: ignore[arg-type]

for batch in batches:
if (
self.batch_sampler is None
and self.drop_last
and len(batch) < self.batch_size # type: ignore[operator]
):
for batch in batches:
yield tensorize(batch)
else:
# Safe to assume this is not `None` when `self.batch_sampler` is `None`.
assert self.batch_size is not None

if self.shuffle:
if self.max_instances_in_memory is not None:
instance_iterator = shuffle_iterable(
instance_iterator,
self.max_instances_in_memory,
)
else:
# At this point we've already loaded the instances in memory and indexed them,
# so this won't take long.
instance_iterator = list(instance_iterator)
random.shuffle(instance_iterator)

for batch in lazy_groups_of(instance_iterator, self.batch_size):
if self.drop_last and len(batch) < self.batch_size:
break
tensor_dict = self.collate_fn(batch)
if move_to_device and self.cuda_device is not None:
tensor_dict = nn_util.move_to_device(tensor_dict, self.cuda_device)
yield tensor_dict
yield tensorize(batch)


class WorkerError(Exception):
Expand Down
4 changes: 4 additions & 0 deletions allennlp/data/dataset_readers/dataset_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ class DatasetReader(Registrable):
serialization_dir: `str`, optional (default=`None`)
The directory in which the training output is saved to, or the directory the model is loaded from.
!!! Note
This is typically not given an entry in a configuration file. It will be set automatically
when using the built-in `allennp` commands.
# Using your reader with multi-process or distributed data loading
There are two things you may need to update in your `DatasetReader` in order for
Expand Down
9 changes: 8 additions & 1 deletion allennlp/data/samplers/batch_sampler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Iterable, Sequence
from typing import List, Iterable, Sequence, Optional

from allennlp.common.registrable import Registrable
from allennlp.data.instance import Instance
Expand All @@ -10,3 +10,10 @@ def get_batch_indices(self, instances: Sequence[Instance]) -> Iterable[List[int]

def get_num_batches(self, instances: Sequence[Instance]) -> int:
raise NotImplementedError

def get_batch_size(self) -> Optional[int]:
"""
Not all `BatchSamplers` define a consistent `batch_size`, but those that
do should override this method.
"""
return None
3 changes: 3 additions & 0 deletions allennlp/data/samplers/bucket_batch_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,6 @@ def get_num_batches(self, instances: Sequence[Instance]) -> int:
return math.floor(batch_count_float)
else:
return math.ceil(batch_count_float)

def get_batch_size(self) -> Optional[int]:
return self.batch_size

0 comments on commit effcc4e

Please sign in to comment.