Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for raw python generator in addition to Dataset for pipelines #14352

Merged
merged 2 commits into from
Nov 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion src/transformers/pipelines/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import csv
import importlib
import json
import os
import pickle
import sys
import types
import warnings
from abc import ABC, abstractmethod
from collections import UserDict
Expand Down Expand Up @@ -1035,10 +1037,20 @@ def forward(self, model_inputs, **forward_params):
def get_iterator(
self, inputs, num_workers: int, batch_size: int, preprocess_params, forward_params, postprocess_params
):
if isinstance(inputs, collections.abc.Sized):
dataset = PipelineDataset(inputs, self.preprocess, preprocess_params)
else:
if num_workers > 1:
logger.warning(
"For iterable dataset using num_workers>1 is likely to result"
" in errors since everything is iterable, setting `num_workers=1`"
" to guarantee correctness."
)
num_workers = 1
dataset = PipelineIterator(inputs, self.preprocess, preprocess_params)
if "TOKENIZERS_PARALLELISM" not in os.environ:
logger.info("Disabling tokenizer parallelism, we're using DataLoader multithreading already")
os.environ["TOKENIZERS_PARALLELISM"] = "false"
dataset = PipelineDataset(inputs, self.preprocess, preprocess_params)
collate_fn = no_collate_fn if batch_size == 1 else pad_collate_fn(self.tokenizer, self.feature_extractor)
dataloader = DataLoader(dataset, num_workers=num_workers, batch_size=batch_size, collate_fn=collate_fn)
model_iterator = PipelineIterator(dataloader, self.forward, forward_params, loader_batch_size=batch_size)
Expand Down Expand Up @@ -1074,6 +1086,14 @@ def __call__(self, inputs, *args, num_workers=0, batch_size=1, **kwargs):
return self.get_iterator(
inputs, num_workers, batch_size, preprocess_params, forward_params, postprocess_params
)
elif isinstance(inputs, types.GeneratorType):
if self.framework == "pt":
return self.get_iterator(
inputs, num_workers, batch_size, preprocess_params, forward_params, postprocess_params
)
else:
# TODO make the get_iterator work also for `tf` (and `flax`).
return self.iterate(inputs, preprocess_params, forward_params, postprocess_params)
else:
return self.run_single(inputs, preprocess_params, forward_params, postprocess_params)

Expand All @@ -1085,3 +1105,9 @@ def run_single(self, inputs, preprocess_params, forward_params, postprocess_para
model_outputs = self.forward(model_inputs, **forward_params)
outputs = self.postprocess(model_outputs, **postprocess_params)
return outputs

def iterate(self, inputs, preprocess_params, forward_params, postprocess_params):
# This function should become `get_iterator` again, this is a temporary
# easy solution.
for input_ in inputs:
yield self.run_single(input_, preprocess_params, forward_params, postprocess_params)
14 changes: 6 additions & 8 deletions src/transformers/pipelines/text_classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,12 @@ def __call__(self, *args, **kwargs):

If ``self.return_all_scores=True``, one such dictionary is returned per label.
"""
return super().__call__(*args, **kwargs)
result = super().__call__(*args, **kwargs)
if isinstance(args[0], str):
# This pipeline is odd, and return a list when single item is run
return [result]
else:
return result

def preprocess(self, inputs, **tokenizer_kwargs) -> Dict[str, GenericTensor]:
return_tensors = self.framework
Expand Down Expand Up @@ -160,10 +165,3 @@ def postprocess(self, model_outputs, function_to_apply=None, return_all_scores=F
return [{"label": self.model.config.id2label[i], "score": score.item()} for i, score in enumerate(scores)]
else:
return {"label": self.model.config.id2label[scores.argmax().item()], "score": scores.max().item()}

def run_multi(self, inputs, preprocess_params, forward_params, postprocess_params):
return [self.run_single(item, preprocess_params, forward_params, postprocess_params)[0] for item in inputs]

def run_single(self, inputs, preprocess_params, forward_params, postprocess_params):
"This pipeline is odd, and return a list when single item is run"
return [super().run_single(inputs, preprocess_params, forward_params, postprocess_params)]
38 changes: 37 additions & 1 deletion tests/test_pipelines_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
)
from transformers.pipelines import get_task
from transformers.pipelines.base import _pad
from transformers.testing_utils import is_pipeline_test, require_torch
from transformers.testing_utils import is_pipeline_test, nested_simplify, require_tf, require_torch


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -286,6 +286,42 @@ def test_check_task(self):
# Wrong framework
get_task("espnet/siddhana_slurp_entity_asr_train_asr_conformer_raw_en_word_valid.acc.ave_10best")

@require_torch
def test_iterator_data(self):
def data(n: int):
for _ in range(n):
yield "This is a test"

pipe = pipeline(model="Narsil/tiny-distilbert-sequence-classification")

results = []
for out in pipe(data(10)):
self.assertEqual(nested_simplify(out), {"label": "LABEL_1", "score": 0.502})
results.append(out)
self.assertEqual(len(results), 10)

# When using multiple workers on streamable data it should still work
# This will force using `num_workers=1` with a warning for now.
results = []
for out in pipe(data(10), num_workers=2):
self.assertEqual(nested_simplify(out), {"label": "LABEL_1", "score": 0.502})
results.append(out)
self.assertEqual(len(results), 10)

@require_tf
def test_iterator_data_tf(self):
def data(n: int):
for _ in range(n):
yield "This is a test"

pipe = pipeline(model="Narsil/tiny-distilbert-sequence-classification", framework="tf")
out = pipe("This is a test")
results = []
for out in pipe(data(10)):
self.assertEqual(nested_simplify(out), {"label": "LABEL_1", "score": 0.502})
results.append(out)
self.assertEqual(len(results), 10)


@is_pipeline_test
class PipelinePadTest(unittest.TestCase):
Expand Down