diff --git a/mlens/parallel/_base_functions.py b/mlens/parallel/_base_functions.py index b0f77725..3f23e5c1 100644 --- a/mlens/parallel/_base_functions.py +++ b/mlens/parallel/_base_functions.py @@ -79,7 +79,7 @@ def replace(source_files): def mold_objects(learners, transformers): - """Utility for enforcing list""" + """Utility for enforcing compatible setup""" out = [] for objects in [learners, transformers]: if objects: diff --git a/mlens/parallel/backend.py b/mlens/parallel/backend.py index 01731709..c19b4696 100644 --- a/mlens/parallel/backend.py +++ b/mlens/parallel/backend.py @@ -34,6 +34,11 @@ ############################################################################### +def _dtype(a, b=None): + """Utility for getting a dtype""" + return getattr(a, 'dtype', getattr(b, 'dtype', None)) + + def dump_array(array, name, path): """Dump array for memmapping.""" # First check if the array is on file @@ -538,12 +543,12 @@ def process(self, caller, out, **kwargs): self._partial_process(task, parallel, **kwargs) if task.name in return_names: - out.append(self.get_preds(getattr(task, 'dtype', None))) + out.append(self.get_preds(dtype=_dtype(task))) self.job.update() if return_final: - out = self.get_preds(dtype=getattr(task, 'dtype', None)) + out = self.get_preds(dtype=_dtype(task)) return out def _partial_process(self, task, parallel, **kwargs): @@ -582,19 +587,18 @@ def _gen_prediction_array(self, task, job, threading): """Generate prediction array either in-memory or persist to disk.""" shape = task.shape(job) if threading: - self.job.predict_out = np.empty( - shape, dtype=getattr(task, 'dtype', config.get_dtype())) + self.job.predict_out = np.empty(shape, dtype=_dtype(task)) else: f = os.path.join(self.job.dir, '%s_out_array.mmap' % task.name) try: self.job.predict_out = np.memmap( - filename=f, dtype=task.dtype, mode='w+', shape=shape) + filename=f, dtype=_dtype(task), mode='w+', shape=shape) except Exception as exc: - raise OSError("Cannot create prediction matrix of shape (" - "%i, %i), size %i MBs, for %s.\n Details:\n%r" % - (shape[0], shape[1], - 8 * shape[0] * shape[1] / (1024 ** 2), - task.name, exc)) + raise OSError( + "Cannot create prediction matrix of shape (" + "%i, %i), size %i MBs, for %s.\n Details:\n%r" % + (shape[0], shape[1], 8 * shape[0] * shape[1] / (1024 ** 2), + task.name, exc)) def get_preds(self, dtype=None, order='C'): """Return prediction matrix. diff --git a/mlens/parallel/base.py b/mlens/parallel/base.py index 549d8d92..cf97020c 100644 --- a/mlens/parallel/base.py +++ b/mlens/parallel/base.py @@ -355,8 +355,7 @@ def get_params(self, deep=True): @property def __fitted__(self): """Fitted status""" - if not self.stack: - # all on an empty list yields True + if not self.stack or not self._check_static_params(): return False return all([g.__fitted__ for g in self.stack]) @@ -376,6 +375,6 @@ def verbose(self): @verbose.setter def verbose(self, verbose): """Set verbosity""" + self._verbose = verbose for g in self.stack: - for obj in g: - obj.verbose = verbose + g.verbose = verbose diff --git a/mlens/parallel/handles.py b/mlens/parallel/handles.py index 16fb6cc0..403b244c 100644 --- a/mlens/parallel/handles.py +++ b/mlens/parallel/handles.py @@ -6,17 +6,18 @@ Handles for mlens.parallel objects. """ +from .base import BaseEstimator from .learner import Learner, Transformer from ._base_functions import mold_objects, transform from ..utils import format_name, check_instances from ..utils.formatting import _check_instances -from ..externals.sklearn.base import BaseEstimator, clone +from ..externals.sklearn.base import clone, BaseEstimator as _BaseEstimator GLOBAL_GROUP_NAMES = list() GLOBAL_PIPELINE_NAMES = list() -class Pipeline(BaseEstimator): +class Pipeline(_BaseEstimator): """Transformer pipeline @@ -36,7 +37,7 @@ class Pipeline(BaseEstimator): name of pipeline. return_y: bool (default=False) - If True, both X and y will be returned in a transform call. + If True, both X and y will be returned in a :method:`transform` call. """ def __init__(self, pipeline, name=None, return_y=False): @@ -106,8 +107,8 @@ def transform(self, X, y=None): """Transform pipeline. Note that the :class:`Pipeline` accepts both X and y arguments, and - can return both X and y, depending on the transformers. The - pipeline itself does no checks on the input. + can return both X and y, depending on the transformers. + Pipeline itself does not checks the input. Parameters ---------- @@ -123,7 +124,7 @@ def transform(self, X, y=None): Preprocessed input data y: array-like of shape [n_samples, ], optional - Preprocessed targets + Original or preprocessed targets, depending on the transformers. """ return self._run(False, True, X, y) @@ -172,25 +173,38 @@ class Group(BaseEstimator): a set of transformers that all share the same cross-validation strategy. All instances will share *the same* indexer. Allows cloning. + Acceptable caller to :class:`ParallelProcessing`. + Parameters ---------- - indexer: inst + indexer: inst, optional A :mod:`mlens.index` indexer to build learner and transformers on. + If not passed, the first indexer of the learners will be enforced + on all instances. learners: list, inst, optional - :class:`Learner` instance(s) to build on indexer. + :class:`Learner` instance(s) attached to indexer. Note that + :class:`Group` overrides previous ``indexer`` parameter settings. transformers: list, inst, optional - :class:`Transformer` instance(s) to build on indexer. + :class:`Transformer` instance(s) attached to indexer. Note that + :class:`Group` overrides previous ``indexer`` parameter settings. name: str, optional name of group + + **kwargs: optional + Optional keyword arguments to :class:`BaseParallel` backend. """ - def __init__(self, indexer, learners=None, transformers=None, name=None): + def __init__(self, indexer=None, learners=None, transformers=None, + name=None, **kwargs): + name = format_name(name, 'group', GLOBAL_GROUP_NAMES) + super(Group, self).__init__(name=name, **kwargs) - self.name = format_name(name, 'group', GLOBAL_GROUP_NAMES) learners, transformers = mold_objects(learners, transformers) + if not indexer: + indexer = learners[0].indexer # Enforce common indexer self.indexer = indexer @@ -200,15 +214,28 @@ def __init__(self, indexer, learners=None, transformers=None, name=None): self.learners = learners self.transformers = transformers + self.__static__.extend(['indexer', 'learners', 'transformers']) + def __iter__(self): + # We update optional backend kwargs that might have been passed + # to ensure these are passed to the instances + backend_kwargs = { + param: getattr(self, param) + for param in ['dtype', 'verbose', 'raise_on_exception'] + if hasattr(self, param) + } for tr in self.transformers: + tr.set_params(**backend_kwargs) yield tr for lr in self.learners: + lr.set_params(**backend_kwargs) yield lr @property def __fitted__(self): """Fitted status""" + if not self._check_static_params(): + return False return all([o.__fitted__ for o in self.learners + self.transformers]) def get_params(self, deep=True): diff --git a/mlens/parallel/learner.py b/mlens/parallel/learner.py index 79c687be..be18cd59 100644 --- a/mlens/parallel/learner.py +++ b/mlens/parallel/learner.py @@ -412,12 +412,15 @@ class _BaseEstimator(OutputMixin, IndexMixin, BaseEstimator): """Base estimator class - Common API for estimation objects. + Common API for job generators. A class that inherits the base + need to set a ``__subtype__`` in the constructor. The sub-type should be + the class that runs estimations and must implement a ``__call__``, + ``fit``, ``transform`` and ``predict`` method. """ __meta_class__ = ABCMeta - # Reset subtype class attribute in any class that inherits + # Reset subtype class attribute in any class that inherits the base __subtype__ = None def __init__(self, name, estimator, indexer=None, verbose=False, **kwargs): @@ -438,12 +441,14 @@ def __init__(self, name, estimator, indexer=None, verbose=False, **kwargs): if self.indexer: self.set_indexer(self.indexer) - self.cache_name = self.name self.estimator = estimator self.verbose = verbose + self.cache_name = None self.output_columns = None self.feature_span = None + self.__static__.extend(['estimator', 'name', 'indexer']) + def __iter__(self): yield self @@ -529,6 +534,15 @@ def gen_fit(self, X, y, P=None): output array to populate. Must be writeable. Only pass if predictions are desired. """ + # We use a derived cache_name during estimation: if the name of the + # instance or the name of the preprocessing dependency changes, this + # allows us to pick up on that. + if hasattr(self, 'preprocess'): + self.cache_name = '%s.%s' % ( + self.preprocess, self.name) if self.preprocess else self.name + else: + self.cache_name = self.name + if self.__subtype__ is None: raise ParallelProcessingError( "Class incorrectly constructed. Need to set class attribute " @@ -826,11 +840,6 @@ def __init__(self, estimator, indexer=None, name=None, preprocess=None, # Protect preprocess against later changes self.__static__.append('preprocess') - def gen_fit(self, X, y, P=None): - self.cache_name = '%s.%s' % ( - self.preprocess, self.name) if self.preprocess else self.name - return super(Learner, self).gen_fit(X, y, P) - @property def scorer(self): """Copy of scorer""" @@ -903,8 +912,9 @@ class EvalTransformer(Transformer): See :class:`Transformer` for more details. """ - def __init__(self, *args, **kwargs): - super(EvalTransformer, self).__init__(*args, **kwargs) + def __init__(self, estimator, indexer=None, name=None, **kwargs): + super(EvalTransformer, self).__init__( + estimator, indexer=indexer, name=name, **kwargs) self.output_columns = {0: 0} # For compatibility with SubTransformer self.__only_all__ = False self.__only_sub__ = True diff --git a/mlens/parallel/tests/test_d_wrapper.py b/mlens/parallel/tests/test_d_wrapper.py index 01865bea..c517dfef 100644 --- a/mlens/parallel/tests/test_d_wrapper.py +++ b/mlens/parallel/tests/test_d_wrapper.py @@ -4,7 +4,7 @@ """ import numpy as np from mlens.testing import Data, EstimatorContainer -from mlens.parallel import Learner, Transformer, Pipeline, run as _run +from mlens.parallel import Group, Learner, Transformer, Pipeline, run as _run from mlens.utils.dummy import OLS, Scale from mlens.externals.sklearn.base import clone @@ -23,8 +23,9 @@ def scorer(p, y): return np.mean(p - y) -data = Data('stack', True, False) +data = Data('stack', False, True, True) X, y = data.get_data((25, 4), 3) +(F, wf), (P, wp) = data.ground_truth(X, y,) if SKLEARN: @@ -165,42 +166,28 @@ def test_tr_subsemble(): def test_run_fit(): """[Parallel | Wrapper] test fit with auxiliary""" - tr = Transformer(Pipeline(Scale(), return_y=True), - name='pr', indexer=data.indexer) - lr = Learner(OLS(), - indexer=data.indexer, name='lr', preprocess='pr', - scorer=scorer) + lr, tr = EstimatorContainer().get_learner('stack', False, True) + group = Group(learners=lr, transformers=tr, dtype=np.float64) - out = _run([tr, lr], 'fit', X, y) - assert out is None + A = _run(group, 'fit', X, y, return_preds=True) + np.testing.assert_array_equal(A, F) def test_run_transform(): """[Parallel | Wrapper] test transform with auxiliary""" - tr = Transformer(Pipeline(Scale(), return_y=True), indexer=data.indexer) - lr = Learner(OLS(), indexer=data.indexer, scorer=scorer) + lr, tr = EstimatorContainer().get_learner('stack', False, True) + group = Group(learners=lr, transformers=tr, dtype=np.float64) - # Indirect stack - X_ = _run(tr, 'fit', X, y, return_preds=True) - A = _run(lr, 'fit', X_, y, return_preds=True) - - # Direct stack - B = _run([tr, lr], 'transform', X, y, map=False) - - np.testing.assert_array_equal(A, B) + _run(group, 'fit', X, y) + A = _run(group, 'transform', X) + np.testing.assert_array_equal(A, F) def test_run_predict(): """[Parallel | Wrapper] test predict with auxiliary""" - tr = Transformer(Pipeline(Scale(), return_y=True), - name='pr', indexer=data.indexer) - lr = Learner(OLS(), indexer=data.indexer, name='lr', scorer=scorer) - - _run([tr, lr], 'fit', X, y, map=False) - B = _run([tr, lr], 'predict', X, y, map=False) - - X1 = _run(tr, 'transform', X) - X2 = _run(tr, 'predict', X) - A = OLS().fit(X1, y).predict(X2) + lr, tr = EstimatorContainer().get_learner('stack', False, True) + group = Group(learners=lr, transformers=tr, dtype=np.float64) - np.testing.assert_array_equal(A.astype(np.float32), B.ravel()) + _run(group, 'fit', X, y) + A = _run(group, 'predict', X) + np.testing.assert_array_equal(A, P) diff --git a/mlens/parallel/wrapper.py b/mlens/parallel/wrapper.py index 4e67cc9b..15e3b7ac 100644 --- a/mlens/parallel/wrapper.py +++ b/mlens/parallel/wrapper.py @@ -165,9 +165,7 @@ def get_backend(instance): if _backend: instance = _backend - runnable = all([issubclass(instance.__class__, BaseParallel), - issubclass(instance.__class__, OutputMixin)]) - if runnable: + if issubclass(instance.__class__, BaseParallel): return instance raise ParallelProcessingError( @@ -179,6 +177,8 @@ def set_flags(backend, flags): resets = list() if 'layer' in backend.__class__.__name__.lower(): updates = [backend] + backend.learners + elif 'group' in backend.__class__.__name__.lower(): + updates = backend.learners elif not isinstance(backend, list): updates = [backend] else: @@ -237,16 +237,15 @@ def run(caller, job, X, y=None, map=True, **kwargs): out = mgr.map(caller, job, X, y, **kwargs) - Run allows dynamic parameter changing and resets, for instance running - a learner with ``proba=True`` on that previously had ``proba=False``. + :func:`run` handles temporary parameter changes, for instance running + a learner with ``proba=True`` that has ``proba=False`` as default. Similarly, instances destined to not produce output can be forced to yield predictions by passing ``return_preds=True`` as a keyword argument. - .. note:: Currently, :fun:`run` does not allow a :class:`Learner` to have a - preprocessing dependency. To achieve the same output, do:: - - caller([transformer, learner], map=False) + .. note:: To run a learner with a ``preprocessing`` dependency, the + instances need to be wrapped in a :class:`Group` :: + run(Group(learner, transformer), 'predict', X, y) Parameters ---------- @@ -282,9 +281,9 @@ def run(caller, job, X, y=None, map=True, **kwargs): try: verbose = max(getattr(caller, 'verbose', 0) - 4, 0) - backend = getattr(caller, 'backend', config.get_backend()) - n_jobs = getattr(backend, 'n_jobs', -1) - with ParallelProcessing(backend, n_jobs, verbose) as mgr: + _backend = getattr(caller, 'backend', config.get_backend()) + n_jobs = getattr(caller, 'n_jobs', -1) + with ParallelProcessing(_backend, n_jobs, verbose) as mgr: if map: out = mgr.map(caller, job, X, y, **kwargs) else: