Skip to content

Commit

Permalink
Adding support for raw python generator in addition to Dataset
Browse files Browse the repository at this point in the history
The main goal is to ease the create of streaming data to the pipe.

`Dataset` is more involved and pytorch specific.

This PR, provides a way to use a python iterator too.
This enabled huggingface#14250 but can be proposed as a standalone PR.

```python
from transformers import pipeline

def read_data(filename):
    with open(filename, 'r') as f:
        for line in f:
            yield f

pipe = pipeline("text-classification")
for classified in pipe(read_data("large_file.txt")):
    print("Success ! ", classified)
```

The main caveat of this, is the interaction with `DataLoader` with
`num_workers>1`. When you have multiple workers, each receive a copy
of the generator (like `IterableDataset`). That means the naive Iterator
will fail since all workers iterate on all items of the generator.

There are ways to do clever "skipping", but it could be bad still
because all workers still do have to pass through all items of the
generator (they just ignore items they don't handle), depending on
the case it might be bad.

Using `num_workers=1` is the simplest fix and if the cost of loading
your data is small enough should be good enough. In the above example
trying to do smart tricks to skip some lines is unlikely to be a net
positive for instance.

If there are better ways to do "jumps" on some data, then using
`Dataset` is more advised (since then differents workers can just jump
themselves).
  • Loading branch information
Narsil committed Nov 10, 2021
1 parent 5c15307 commit dfccc12
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 2 deletions.
23 changes: 21 additions & 2 deletions src/transformers/pipelines/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import os
import pickle
import sys
import types
import warnings
from abc import ABC, abstractmethod
from collections import UserDict
Expand Down Expand Up @@ -1035,10 +1036,23 @@ def forward(self, model_inputs, **forward_params):
def get_iterator(
self, inputs, num_workers: int, batch_size: int, preprocess_params, forward_params, postprocess_params
):
try:
n = len(inputs)
except TypeError:
# Iterator
n = None
if n is not None:
dataset = PipelineDataset(inputs, self.preprocess, preprocess_params)
else:
if num_workers > 1:
logger.warning(
"For iterable dataset using num_workers>1 is likely to result in errors since everything is iterable, setting `num_workers=1` to guarantee correctness."
)
num_workers = 1
dataset = PipelineIterator(inputs, self.preprocess, preprocess_params)
if "TOKENIZERS_PARALLELISM" not in os.environ:
logger.info("Disabling tokenizer parallelism, we're using DataLoader multithreading already")
os.environ["TOKENIZERS_PARALLELISM"] = "false"
dataset = PipelineDataset(inputs, self.preprocess, preprocess_params)
collate_fn = no_collate_fn if batch_size == 1 else pad_collate_fn(self.tokenizer, self.feature_extractor)
dataloader = DataLoader(dataset, num_workers=num_workers, batch_size=batch_size, collate_fn=collate_fn)
model_iterator = PipelineIterator(dataloader, self.forward, forward_params, loader_batch_size=batch_size)
Expand Down Expand Up @@ -1070,7 +1084,12 @@ def __call__(self, inputs, *args, num_workers=0, batch_size=1, **kwargs):
return outputs
else:
return self.run_multi(inputs, preprocess_params, forward_params, postprocess_params)
elif Dataset is not None and isinstance(inputs, Dataset):
elif (
Dataset is not None
and isinstance(inputs, Dataset)
or isinstance(inputs, types.GeneratorType)
and self.framework == "pt"
):
return self.get_iterator(
inputs, num_workers, batch_size, preprocess_params, forward_params, postprocess_params
)
Expand Down
22 changes: 22 additions & 0 deletions tests/test_pipelines_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,28 @@ def test_check_task(self):
# Wrong framework
get_task("espnet/siddhana_slurp_entity_asr_train_asr_conformer_raw_en_word_valid.acc.ave_10best")

@require_torch
def test_iterator_data(self):
def data(n: int):
for _ in range(n):
yield "This is a test"

pipe = pipeline(model="Narsil/tiny-distilbert-sequence-classification")

results = []
for out in pipe(data(10)):
self.assertEqual(out, {"label": "LABEL_1", "score": 0.5023466348648071})
results.append(out)
self.assertEqual(len(results), 10)

# When using multiple workers on streamable data it should still work
# This will force using `num_workers=1` with a warning for now.
results = []
for out in pipe(data(10), num_workers=2):
self.assertEqual(out, {"label": "LABEL_1", "score": 0.5023466348648071})
results.append(out)
self.assertEqual(len(results), 10)


@is_pipeline_test
class PipelinePadTest(unittest.TestCase):
Expand Down

0 comments on commit dfccc12

Please sign in to comment.