diff --git a/delta/corpus.py b/delta/corpus.py index 5bc00fb..8492105 100644 --- a/delta/corpus.py +++ b/delta/corpus.py @@ -9,6 +9,7 @@ class which represents the feature matrix. Also contained are default import os import glob +from collections import Mapping from fnmatch import fnmatch from inspect import signature from typing import Type @@ -20,6 +21,8 @@ class which represents the feature matrix. Also contained are default from math import ceil from delta.util import Metadata, DocumentDescriber, DefaultDocumentDescriber, ngrams +from joblib import Parallel, delayed + import logging @@ -62,7 +65,9 @@ def __init__(self, lower_case=False, encoding="utf-8", glob='*.txt', skip=None, token_pattern=LETTERS_PATTERN, max_tokens=None, - ngrams=None): + ngrams=None, + parallel=False + ): """ Creates a customized default feature generator. @@ -80,6 +85,10 @@ def __init__(self, lower_case=False, encoding="utf-8", glob='*.txt', regular expressions*) that contains at least one letter. max_tokens (int): If set, stop reading each file after that many words. ngrams (int): Count token ngrams instead of single tokens + parallel(bool|int|Parallel): If truish, read and parse files in parallel. The actual argument may be + - None or False for no special processing + - an int for the required number of jobs + - a dictionary with Parallel arguments for finer control """ self.lower_case = lower_case self.encoding = encoding @@ -89,6 +98,7 @@ def __init__(self, lower_case=False, encoding="utf-8", glob='*.txt', self.max_tokens = max_tokens self.ngrams = ngrams self.logger = logging.getLogger(__name__) + self.parallel = parallel def __repr__(self): return type(self).__name__ + '(' + \ @@ -217,11 +227,24 @@ def process_directory(self, directory): len(filenames), self.glob, directory) - data = (self.process_file(filename) - for filename in filenames - if self.skip is None or not(fnmatch(filename, self.skip))) + used_filenames = [filename for filename in filenames if self.skip is None or not(fnmatch(filename, self.skip))] + parallel = self._get_parallel_executor() + if parallel: + data = parallel(delayed(self.process_file)(filename) for filename in used_filenames) + else: + data = (self.process_file(filename) for filename in used_filenames) return {series.name: series for series in data} + def _get_parallel_executor(self) -> Parallel: + if self.parallel: + if isinstance(self.parallel, Mapping): + return Parallel(**self.parallel) + elif self.parallel == True: + return Parallel(-1) + elif isinstance(self.parallel, int): + return Parallel(self.parallel) + return None + def __call__(self, directory): """ Runs the feature extraction using :meth:`process_directory` for the diff --git a/setup.py b/setup.py index 59aac4d..e7d0f31 100755 --- a/setup.py +++ b/setup.py @@ -24,6 +24,7 @@ 'pandas>=0.13.1', # https://github.com/cophi-wue/pydelta/issues/6 # 'profig>=0.2.8', 'scikit-learn>=0.16.0', + 'joblib>=1.0.1', 'regex' ], test_suite = 'nose.collector'