Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
Experimentally use threaded_generator to read instances from readers …
Browse files Browse the repository at this point in the history
…simultaneously
  • Loading branch information
dirkgr committed Jan 6, 2021
1 parent 8b7668e commit a633e67
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 3 deletions.
29 changes: 29 additions & 0 deletions allennlp/common/util.py
Expand Up @@ -15,6 +15,8 @@
from contextlib import contextmanager
from itertools import islice, zip_longest
from pathlib import Path
from queue import Queue
from threading import Thread
from typing import (
Any,
Callable,
Expand Down Expand Up @@ -708,3 +710,30 @@ def hash_object(o: Any) -> str:
pickle.dump(o, buffer)
m.update(buffer.getbuffer())
return m.hexdigest()


def threaded_generator(g, maxsize=16):
q = Queue(maxsize=maxsize)

sentinel = object()

def fill_queue():
try:
for value in g:
q.put((value, None))
q.put(sentinel)
except Exception as e:
q.put((None, e))

thread = Thread(name=repr(g), target=fill_queue, daemon=True)
thread.start()

for value, error in iter(q.get, sentinel):
if error is not None:
raise error
yield value


def fake_threaded_generator(g, maxsize=16):
"""Just for debugging"""
return g
8 changes: 5 additions & 3 deletions allennlp/data/data_loaders/multitask_data_loader.py
Expand Up @@ -168,21 +168,22 @@ def __init__(
# from here each epoch, and refresh it when it runs out. We only use this in the case that
# instances_per_epoch is not None, but these iterators are lazy, so always creating them
# doesn't hurt anything.
from allennlp.common.util import threaded_generator
self._iterators: Dict[str, Iterator[Instance]] = {
# NOTE: The order in which we're calling these iterator functions is important. We want
# an infinite iterator over the data, but we want the order in which we iterate over the
# data to be different at every epoch. The cycle function will give us an infinite
# iterator, and it will call the lambda function each time it runs out of instances,
# which will produce a new shuffling of the dataset.
key: util.cycle_iterator_function(
key: threaded_generator(util.cycle_iterator_function(
# This default argument to the lambda function is necessary to create a new scope
# for the loader variable, so a _different_ loader gets saved for every iterator.
# Dictionary comprehensions don't create new scopes in python. If you don't have
# this loader, you end up with `loader` always referring to the last loader in the
# iteration... mypy also doesn't know what to do with this, for some reason I can't
# figure out.
lambda l=loader: maybe_shuffle_instances(l, self._shuffle) # type: ignore
)
), maxsize=1024)
for key, loader in self._loaders.items()
}

Expand Down Expand Up @@ -254,8 +255,9 @@ def index_with(self, vocab: Vocabulary) -> None:

def _get_instances_for_epoch(self) -> Dict[str, Iterable[Instance]]:
if self._instances_per_epoch is None:
from allennlp.common.util import threaded_generator
return {
key: maybe_shuffle_instances(loader, self._shuffle)
key: threaded_generator(maybe_shuffle_instances(loader, self._shuffle), maxsize=1024)
for key, loader in self._loaders.items()
}
if self.sampler is None:
Expand Down

0 comments on commit a633e67

Please sign in to comment.