Skip to content

Commit

Permalink
Groups are acceptable input to ParallelProcessing
Browse files Browse the repository at this point in the history
This commit makes it possible to pass a group instance directly to
a ParallelProcessing manager. Also dictates that the ``run``
function requires learners with preprocessing dependencies to be
estimated with via a Group.
  • Loading branch information
flennerhag committed Oct 31, 2017
1 parent 64e45dc commit 2000cb6
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 78 deletions.
2 changes: 1 addition & 1 deletion mlens/parallel/_base_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def replace(source_files):


def mold_objects(learners, transformers):
"""Utility for enforcing list"""
"""Utility for enforcing compatible setup"""
out = []
for objects in [learners, transformers]:
if objects:
Expand Down
24 changes: 14 additions & 10 deletions mlens/parallel/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@


###############################################################################
def _dtype(a, b=None):
"""Utility for getting a dtype"""
return getattr(a, 'dtype', getattr(b, 'dtype', None))


def dump_array(array, name, path):
"""Dump array for memmapping."""
# First check if the array is on file
Expand Down Expand Up @@ -538,12 +543,12 @@ def process(self, caller, out, **kwargs):
self._partial_process(task, parallel, **kwargs)

if task.name in return_names:
out.append(self.get_preds(getattr(task, 'dtype', None)))
out.append(self.get_preds(dtype=_dtype(task)))

self.job.update()

if return_final:
out = self.get_preds(dtype=getattr(task, 'dtype', None))
out = self.get_preds(dtype=_dtype(task))
return out

def _partial_process(self, task, parallel, **kwargs):
Expand Down Expand Up @@ -582,19 +587,18 @@ def _gen_prediction_array(self, task, job, threading):
"""Generate prediction array either in-memory or persist to disk."""
shape = task.shape(job)
if threading:
self.job.predict_out = np.empty(
shape, dtype=getattr(task, 'dtype', config.get_dtype()))
self.job.predict_out = np.empty(shape, dtype=_dtype(task))
else:
f = os.path.join(self.job.dir, '%s_out_array.mmap' % task.name)
try:
self.job.predict_out = np.memmap(
filename=f, dtype=task.dtype, mode='w+', shape=shape)
filename=f, dtype=_dtype(task), mode='w+', shape=shape)
except Exception as exc:
raise OSError("Cannot create prediction matrix of shape ("
"%i, %i), size %i MBs, for %s.\n Details:\n%r" %
(shape[0], shape[1],
8 * shape[0] * shape[1] / (1024 ** 2),
task.name, exc))
raise OSError(
"Cannot create prediction matrix of shape ("
"%i, %i), size %i MBs, for %s.\n Details:\n%r" %
(shape[0], shape[1], 8 * shape[0] * shape[1] / (1024 ** 2),
task.name, exc))

def get_preds(self, dtype=None, order='C'):
"""Return prediction matrix.
Expand Down
7 changes: 3 additions & 4 deletions mlens/parallel/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,7 @@ def get_params(self, deep=True):
@property
def __fitted__(self):
"""Fitted status"""
if not self.stack:
# all on an empty list yields True
if not self.stack or not self._check_static_params():
return False
return all([g.__fitted__ for g in self.stack])

Expand All @@ -376,6 +375,6 @@ def verbose(self):
@verbose.setter
def verbose(self, verbose):
"""Set verbosity"""
self._verbose = verbose
for g in self.stack:
for obj in g:
obj.verbose = verbose
g.verbose = verbose
49 changes: 38 additions & 11 deletions mlens/parallel/handles.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@
Handles for mlens.parallel objects.
"""
from .base import BaseEstimator
from .learner import Learner, Transformer
from ._base_functions import mold_objects, transform
from ..utils import format_name, check_instances
from ..utils.formatting import _check_instances
from ..externals.sklearn.base import BaseEstimator, clone
from ..externals.sklearn.base import clone, BaseEstimator as _BaseEstimator

GLOBAL_GROUP_NAMES = list()
GLOBAL_PIPELINE_NAMES = list()


class Pipeline(BaseEstimator):
class Pipeline(_BaseEstimator):

"""Transformer pipeline
Expand All @@ -36,7 +37,7 @@ class Pipeline(BaseEstimator):
name of pipeline.
return_y: bool (default=False)
If True, both X and y will be returned in a transform call.
If True, both X and y will be returned in a :method:`transform` call.
"""

def __init__(self, pipeline, name=None, return_y=False):
Expand Down Expand Up @@ -106,8 +107,8 @@ def transform(self, X, y=None):
"""Transform pipeline.
Note that the :class:`Pipeline` accepts both X and y arguments, and
can return both X and y, depending on the transformers. The
pipeline itself does no checks on the input.
can return both X and y, depending on the transformers.
Pipeline itself does not checks the input.
Parameters
----------
Expand All @@ -123,7 +124,7 @@ def transform(self, X, y=None):
Preprocessed input data
y: array-like of shape [n_samples, ], optional
Preprocessed targets
Original or preprocessed targets, depending on the transformers.
"""
return self._run(False, True, X, y)

Expand Down Expand Up @@ -172,25 +173,38 @@ class Group(BaseEstimator):
a set of transformers that all share the same cross-validation strategy.
All instances will share *the same* indexer. Allows cloning.
Acceptable caller to :class:`ParallelProcessing`.
Parameters
----------
indexer: inst
indexer: inst, optional
A :mod:`mlens.index` indexer to build learner and transformers on.
If not passed, the first indexer of the learners will be enforced
on all instances.
learners: list, inst, optional
:class:`Learner` instance(s) to build on indexer.
:class:`Learner` instance(s) attached to indexer. Note that
:class:`Group` overrides previous ``indexer`` parameter settings.
transformers: list, inst, optional
:class:`Transformer` instance(s) to build on indexer.
:class:`Transformer` instance(s) attached to indexer. Note that
:class:`Group` overrides previous ``indexer`` parameter settings.
name: str, optional
name of group
**kwargs: optional
Optional keyword arguments to :class:`BaseParallel` backend.
"""

def __init__(self, indexer, learners=None, transformers=None, name=None):
def __init__(self, indexer=None, learners=None, transformers=None,
name=None, **kwargs):
name = format_name(name, 'group', GLOBAL_GROUP_NAMES)
super(Group, self).__init__(name=name, **kwargs)

self.name = format_name(name, 'group', GLOBAL_GROUP_NAMES)
learners, transformers = mold_objects(learners, transformers)
if not indexer:
indexer = learners[0].indexer

# Enforce common indexer
self.indexer = indexer
Expand All @@ -200,15 +214,28 @@ def __init__(self, indexer, learners=None, transformers=None, name=None):
self.learners = learners
self.transformers = transformers

self.__static__.extend(['indexer', 'learners', 'transformers'])

def __iter__(self):
# We update optional backend kwargs that might have been passed
# to ensure these are passed to the instances
backend_kwargs = {
param: getattr(self, param)
for param in ['dtype', 'verbose', 'raise_on_exception']
if hasattr(self, param)
}
for tr in self.transformers:
tr.set_params(**backend_kwargs)
yield tr
for lr in self.learners:
lr.set_params(**backend_kwargs)
yield lr

@property
def __fitted__(self):
"""Fitted status"""
if not self._check_static_params():
return False
return all([o.__fitted__ for o in self.learners + self.transformers])

def get_params(self, deep=True):
Expand Down
30 changes: 20 additions & 10 deletions mlens/parallel/learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,15 @@ class _BaseEstimator(OutputMixin, IndexMixin, BaseEstimator):

"""Base estimator class
Common API for estimation objects.
Common API for job generators. A class that inherits the base
need to set a ``__subtype__`` in the constructor. The sub-type should be
the class that runs estimations and must implement a ``__call__``,
``fit``, ``transform`` and ``predict`` method.
"""

__meta_class__ = ABCMeta

# Reset subtype class attribute in any class that inherits
# Reset subtype class attribute in any class that inherits the base
__subtype__ = None

def __init__(self, name, estimator, indexer=None, verbose=False, **kwargs):
Expand All @@ -438,12 +441,14 @@ def __init__(self, name, estimator, indexer=None, verbose=False, **kwargs):
if self.indexer:
self.set_indexer(self.indexer)

self.cache_name = self.name
self.estimator = estimator
self.verbose = verbose
self.cache_name = None
self.output_columns = None
self.feature_span = None

self.__static__.extend(['estimator', 'name', 'indexer'])

def __iter__(self):
yield self

Expand Down Expand Up @@ -529,6 +534,15 @@ def gen_fit(self, X, y, P=None):
output array to populate. Must be writeable. Only pass if
predictions are desired.
"""
# We use a derived cache_name during estimation: if the name of the
# instance or the name of the preprocessing dependency changes, this
# allows us to pick up on that.
if hasattr(self, 'preprocess'):
self.cache_name = '%s.%s' % (
self.preprocess, self.name) if self.preprocess else self.name
else:
self.cache_name = self.name

if self.__subtype__ is None:
raise ParallelProcessingError(
"Class incorrectly constructed. Need to set class attribute "
Expand Down Expand Up @@ -826,11 +840,6 @@ def __init__(self, estimator, indexer=None, name=None, preprocess=None,
# Protect preprocess against later changes
self.__static__.append('preprocess')

def gen_fit(self, X, y, P=None):
self.cache_name = '%s.%s' % (
self.preprocess, self.name) if self.preprocess else self.name
return super(Learner, self).gen_fit(X, y, P)

@property
def scorer(self):
"""Copy of scorer"""
Expand Down Expand Up @@ -903,8 +912,9 @@ class EvalTransformer(Transformer):
See :class:`Transformer` for more details.
"""

def __init__(self, *args, **kwargs):
super(EvalTransformer, self).__init__(*args, **kwargs)
def __init__(self, estimator, indexer=None, name=None, **kwargs):
super(EvalTransformer, self).__init__(
estimator, indexer=indexer, name=name, **kwargs)
self.output_columns = {0: 0} # For compatibility with SubTransformer
self.__only_all__ = False
self.__only_sub__ = True
Expand Down
47 changes: 17 additions & 30 deletions mlens/parallel/tests/test_d_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
import numpy as np
from mlens.testing import Data, EstimatorContainer
from mlens.parallel import Learner, Transformer, Pipeline, run as _run
from mlens.parallel import Group, Learner, Transformer, Pipeline, run as _run
from mlens.utils.dummy import OLS, Scale
from mlens.externals.sklearn.base import clone

Expand All @@ -23,8 +23,9 @@
def scorer(p, y): return np.mean(p - y)


data = Data('stack', True, False)
data = Data('stack', False, True, True)
X, y = data.get_data((25, 4), 3)
(F, wf), (P, wp) = data.ground_truth(X, y,)


if SKLEARN:
Expand Down Expand Up @@ -165,42 +166,28 @@ def test_tr_subsemble():

def test_run_fit():
"""[Parallel | Wrapper] test fit with auxiliary"""
tr = Transformer(Pipeline(Scale(), return_y=True),
name='pr', indexer=data.indexer)
lr = Learner(OLS(),
indexer=data.indexer, name='lr', preprocess='pr',
scorer=scorer)
lr, tr = EstimatorContainer().get_learner('stack', False, True)
group = Group(learners=lr, transformers=tr, dtype=np.float64)

out = _run([tr, lr], 'fit', X, y)
assert out is None
A = _run(group, 'fit', X, y, return_preds=True)
np.testing.assert_array_equal(A, F)


def test_run_transform():
"""[Parallel | Wrapper] test transform with auxiliary"""
tr = Transformer(Pipeline(Scale(), return_y=True), indexer=data.indexer)
lr = Learner(OLS(), indexer=data.indexer, scorer=scorer)
lr, tr = EstimatorContainer().get_learner('stack', False, True)
group = Group(learners=lr, transformers=tr, dtype=np.float64)

# Indirect stack
X_ = _run(tr, 'fit', X, y, return_preds=True)
A = _run(lr, 'fit', X_, y, return_preds=True)

# Direct stack
B = _run([tr, lr], 'transform', X, y, map=False)

np.testing.assert_array_equal(A, B)
_run(group, 'fit', X, y)
A = _run(group, 'transform', X)
np.testing.assert_array_equal(A, F)


def test_run_predict():
"""[Parallel | Wrapper] test predict with auxiliary"""
tr = Transformer(Pipeline(Scale(), return_y=True),
name='pr', indexer=data.indexer)
lr = Learner(OLS(), indexer=data.indexer, name='lr', scorer=scorer)

_run([tr, lr], 'fit', X, y, map=False)
B = _run([tr, lr], 'predict', X, y, map=False)

X1 = _run(tr, 'transform', X)
X2 = _run(tr, 'predict', X)
A = OLS().fit(X1, y).predict(X2)
lr, tr = EstimatorContainer().get_learner('stack', False, True)
group = Group(learners=lr, transformers=tr, dtype=np.float64)

np.testing.assert_array_equal(A.astype(np.float32), B.ravel())
_run(group, 'fit', X, y)
A = _run(group, 'predict', X)
np.testing.assert_array_equal(A, P)

0 comments on commit 2000cb6

Please sign in to comment.