Skip to content

Commit

Permalink
Allow parallel parsing of corpus files
Browse files Browse the repository at this point in the history
  • Loading branch information
thvitt committed Jun 13, 2021
1 parent e9da8ce commit 1a6856e
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 4 deletions.
31 changes: 27 additions & 4 deletions delta/corpus.py
Expand Up @@ -9,6 +9,7 @@ class which represents the feature matrix. Also contained are default

import os
import glob
from collections import Mapping
from fnmatch import fnmatch
from inspect import signature
from typing import Type
Expand All @@ -20,6 +21,8 @@ class which represents the feature matrix. Also contained are default
from math import ceil
from delta.util import Metadata, DocumentDescriber, DefaultDocumentDescriber, ngrams

from joblib import Parallel, delayed

import logging


Expand Down Expand Up @@ -62,7 +65,9 @@ def __init__(self, lower_case=False, encoding="utf-8", glob='*.txt',
skip=None,
token_pattern=LETTERS_PATTERN,
max_tokens=None,
ngrams=None):
ngrams=None,
parallel=False
):
"""
Creates a customized default feature generator.
Expand All @@ -80,6 +85,10 @@ def __init__(self, lower_case=False, encoding="utf-8", glob='*.txt',
regular expressions*) that contains at least one letter.
max_tokens (int): If set, stop reading each file after that many words.
ngrams (int): Count token ngrams instead of single tokens
parallel(bool|int|Parallel): If truish, read and parse files in parallel. The actual argument may be
- None or False for no special processing
- an int for the required number of jobs
- a dictionary with Parallel arguments for finer control
"""
self.lower_case = lower_case
self.encoding = encoding
Expand All @@ -89,6 +98,7 @@ def __init__(self, lower_case=False, encoding="utf-8", glob='*.txt',
self.max_tokens = max_tokens
self.ngrams = ngrams
self.logger = logging.getLogger(__name__)
self.parallel = parallel

def __repr__(self):
return type(self).__name__ + '(' + \
Expand Down Expand Up @@ -217,11 +227,24 @@ def process_directory(self, directory):
len(filenames),
self.glob,
directory)
data = (self.process_file(filename)
for filename in filenames
if self.skip is None or not(fnmatch(filename, self.skip)))
used_filenames = [filename for filename in filenames if self.skip is None or not(fnmatch(filename, self.skip))]
parallel = self._get_parallel_executor()
if parallel:
data = parallel(delayed(self.process_file)(filename) for filename in used_filenames)
else:
data = (self.process_file(filename) for filename in used_filenames)
return {series.name: series for series in data}

def _get_parallel_executor(self) -> Parallel:
if self.parallel:
if isinstance(self.parallel, Mapping):
return Parallel(**self.parallel)
elif self.parallel == True:
return Parallel(-1)
elif isinstance(self.parallel, int):
return Parallel(self.parallel)
return None

def __call__(self, directory):
"""
Runs the feature extraction using :meth:`process_directory` for the
Expand Down
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -24,6 +24,7 @@
'pandas>=0.13.1', # https://github.com/cophi-wue/pydelta/issues/6
# 'profig>=0.2.8',
'scikit-learn>=0.16.0',
'joblib>=1.0.1',
'regex'
],
test_suite = 'nose.collector'
Expand Down

0 comments on commit 1a6856e

Please sign in to comment.