From ebb2a0665f25c26d1c730b87bf255a6a30f2db98 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Fri, 1 Jun 2018 08:51:34 -0700 Subject: [PATCH 1/8] ENH: Blockwise Metaestimator --- dask_ml/wrappers.py | 63 ++++++++++++++++++++ docs/source/changelog.rst | 5 ++ docs/source/incremental.rst | 112 +++++++++++++++++++++++------------- docs/source/modules/api.rst | 11 +++- tests/test_blockwise.py | 40 +++++++++++++ 5 files changed, 190 insertions(+), 41 deletions(-) create mode 100644 tests/test_blockwise.py diff --git a/dask_ml/wrappers.py b/dask_ml/wrappers.py index 28bb0f52f..654bf37a4 100644 --- a/dask_ml/wrappers.py +++ b/dask_ml/wrappers.py @@ -199,6 +199,69 @@ def _check_method(self, method): return getattr(self.estimator, method) +class Blockwise(ParallelPostFit): + """Metaestimator for feeding Dask Arrays to an estimator blockwise. + + This wrapper is provides a bridge between Dask objects and estimators + implementing the ``partial_fit`` API. These estimators can train on + batches of data, but simply passing a Dask array to their ``fit`` or + ``partial_fit`` methods would materialize the large Dask Array on a single + machine. + + Calling :meth:`Streamable.fit` with a Dask Array will cause each block of + the Dask array to be passed to ``estimator.partial_fit`` *sequentially*. + + Like :class:`ParallelPostFit`, the methods available after fitting (e.g. + `score`, `predcit`, etc.) are all parallel and delayed. + + Parameters + ---------- + estimator : Estimator + Any object supporting the scikit-learn `parital_fit` API. + **kwargs + Additional keyword arguments passed through the the underlying + estimator's `partial_fit` method. + + Examples + -------- + """ + + def __init__(self, estimator, **kwargs): + self.estimator = estimator + self.fit_kwargs = kwargs + + def fit(self, X, y=None): + from ._partial import fit + + fit_kwargs = self.fit_kwargs or {} + result = fit(self.estimator, X, y, **fit_kwargs) + + # Copy the learned attributes over to self + attrs = {k: v for k, v in vars(result).items() if k.endswith('_')} + for k, v in attrs.items(): + setattr(self, k, v) + return self + + +def make_blockwise(estimator, **kwargs): + """Helper function for creating a :class:`Stremable` estimator. + + Parameters + ---------- + estimator : Estimator + Any object supporting the scikit-learn `parital_fit` API. + **kwargs + Additional keyword arguments passed through the the underlying + estimator's `partial_fit` method. + + Returns + ------- + Blockwise + """ + blockwise = Blockwise(estimator, **kwargs) + return blockwise + + def _first_block(dask_object): """Extract the first block / partition from a dask object """ diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 4c16c9653..f5d327b60 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -9,6 +9,11 @@ API Breaking Changes - Removed the `get` keyword from the incremental learner ``fit`` methods (:pr:`187`) +Enhancements +------------ + +- Added a new meta-estimator :class:`dask_ml.wrappers.Blockwise` for wrapping any estimator with a `partial_fit` method (). See :ref:`incremental.blockwise-metaestimator` for more. + Version 0.5.0 ~~~~~~~~~~~~~ diff --git a/docs/source/incremental.rst b/docs/source/incremental.rst index b8af9513f..580d14c66 100644 --- a/docs/source/incremental.rst +++ b/docs/source/incremental.rst @@ -3,55 +3,87 @@ Incremental Learning ==================== +Some estimators can be trained incrementally -- without seeing the entire dataset at once. +Scikit-Learn provdes the ``partial_fit`` API to let you stream batches of data +to an estimator that can be fit in batches. + +Normally, if you pass a Dask Array to an estimator expecting a NumPy array, +the Dask Array will be converted to a single, large NumPy array. On a single +machine, you'll likely run out of RAM and crash the program. On a distributed +cluster, all the workers will send their data to a single machine and crash it. + +The incremental learning tools in Dask-ML provide a bridge between Dask and +Scikit-Learn estimators supporting the ``partial_fit`` API. Each individual +chunk in a Dask Array can be passed to the estimator's ``partial_fit`` method. + +Dask-ML provides two ways to achieve this: :ref:`incremental.blockwise-metaestimator`, for wrapping any estimator with a `partial_fit` method, and :ref:`incremental.pre-wrapped` for estimators that have already been wrapped in the meta-estimator. + +.. _incremental.blockwise-metaestimator: + +Blockwise Metaestimator +----------------------- + .. currentmodule:: dask_ml .. autosummary:: - naive_bayes.PartialBernoulliNB - naive_bayes.PartialMultinomialNB - linear_model.PartialSGDRegressor - linear_model.PartialSGDClassifier - linear_model.PartialPerceptron - linear_model.PartialPassiveAggressiveClassifier - linear_model.PartialPassiveAggressiveRegressor - cluster.PartialMiniBatchKMeans - base._BigPartialFitMixin - -Scikit-Learn's Partial Fit --------------------------- - -Some scikit-learn models support `incremental learning`_ with the -``.partial_fit`` API. These models can see small batches of dataset and update -their parameters as new data arrives. - -.. code-block:: python - - for X_block, y_block in iterator_of_numpy_arrays: - est.partial_fit(X_block, y_block) - -This block-wise learning fits nicely with Dask's block-wise nature: Dask -arrays are composed of many smaller NumPy arrays. Dask dataframes and arrays -provides an intuitive way to preprocess your data and then intuitively send -that data to an incremental model piece by piece. Dask-ML will hide the -``.partial_fit`` mechanics from you, so that the usual ``.fit`` API will work -on larger-than-memory datasets. These wrappers can be dropped into a -:class:`sklearn.pipeline.Pipeline` just like normal. In Dask-ml, all of these -estimators are prefixed with ``Partial``, e.g. :class:`PartialSGDClassifier`. - -.. note:: - - While these wrappers are useful for fitting on larger than memory datasets - they do not offer any kind of parallelism while training. Calls to - ``.fit()`` will be entirely sequential. - -Example -------- + wrappers.Blockwise + +:class:`dask_ml.wrappers.Blockwise` is a meta-estimator (an estimator that +takes another estimator) that bridges scikit-learn estimators expecting +NumPy arrays, and users with large Dask Arrays. + +Each *block* of a Dask Array is fed to the underlying estiamtor's +``partial_fit`` method. The training is entirely sequential, so you won't +notice massive training time speedups from parallelism. In a distributed +environment, you should notice some speeds from avoiding extra IO, and the +fact that models are typically much smaller than data, and so faster to move +between machines. + + +.. ipython:: python + + from dask_ml.datasets import make_classification + from dask_ml.wrappers import Blockwise + from sklearn.linear_model import SGDClassifier + + X, y = make_classification(chunks=25) + X + + estimator = SGDClassifier(random_state=10) + clf = Blockwise(estimator, classes=[0, 1]) + clf.fit(X, y) + +In this example, we make a (small) random Dask Array. It has 100 samples, +broken in the 4 blocks of 25 samples each. The chunking is only along the +first axis (the samples). There is no chunking along the features. + +You instantite the underlying estimator as usual. It really is just a +scikit-learn compatible estimator, and will be trained normally via its +``partial_fit``. + +When wrapping the estimator in :class:`Blockwise`, you need to pass any +keyword arguments that are expected by the underlying ``partial_fit`` method. +With :class:`sklearn.linear_model.SGDClassifier`, we're required to provide +the list of unique ``classes`` in ``y``. + +Notice that we call the regular ``.fit`` method for training. Dask-ML takes +care of passing each block to the underlying estimator for you. + +.. _incremental.pre-wrapped: + +Pre-Wrapped Incremental Learners +================================ + +Dask-ML provides pre-wrapped versions of some common estimators. They'll be +found in the same namespace as their scikit-learn counterparts they wrap. .. ipython:: python from dask_ml.linear_model import PartialSGDRegressor from dask_ml.datasets import make_classification - X, y = make_classification(n_samples=1000, chunks=500) est = PartialSGDRegressor() est.fit(X, y) +See :ref:`api.incremental` for a full list of the wrapped estimators. + .. _incremental learning: http://scikit-learn.org/stable/modules/scaling_strategies.html#incremental-learning diff --git a/docs/source/modules/api.rst b/docs/source/modules/api.rst index c778e850f..36cfa7994 100644 --- a/docs/source/modules/api.rst +++ b/docs/source/modules/api.rst @@ -63,7 +63,8 @@ Dask-ML provides drop-in replacements for grid and randomized search. Meta-estimators for scikit-learn ================================ -dask-ml provides some meta-estimators parallelize certain components. +dask-ml provides some meta-estimators that help use regular scikit-learn +compatible estimators with Dask arrays. .. currentmodule:: dask_ml @@ -72,7 +73,15 @@ dask-ml provides some meta-estimators parallelize certain components. :template: class.rst wrappers.ParallelPostFit + wrappers.Blockwise +.. autosummary:: + :toctree: generated/ + + wrappers.make_blockwise + + +.. _api.incremental-learning: Incremental Learning ==================== diff --git a/tests/test_blockwise.py b/tests/test_blockwise.py new file mode 100644 index 000000000..833c63041 --- /dev/null +++ b/tests/test_blockwise.py @@ -0,0 +1,40 @@ +import dask.array as da +from dask.array.utils import assert_eq +import numpy as np +import pytest +from sklearn.base import clone +from sklearn.linear_model import SGDClassifier + +from dask_ml.wrappers import Blockwise, make_blockwise +from dask_ml.utils import assert_estimator_equal + + +@pytest.mark.parametrize('maker', [Blockwise, make_blockwise]) +def test_blockwise_basic(xy_classification, maker): + X, y = xy_classification + est1 = SGDClassifier(random_state=0) + est2 = clone(est1) + + clf = maker(est1, classes=[0, 1]) + result = clf.fit(X, y) + for slice_ in da.core.slices_from_chunks(X.chunks): + est2.partial_fit(X[slice_], y[slice_[0]], classes=[0, 1]) + + assert result is clf + + assert isinstance(result.estimator.coef_, np.ndarray) + np.testing.assert_array_almost_equal(result.estimator.coef_, est2.coef_) + + assert_estimator_equal(clf.estimator, est2, exclude=['loss_function_']) + + # Predict + result = clf.predict(X) + expected = est2.predict(X) + assert isinstance(result, da.Array) + assert_eq(result, expected) + + # score + result = clf.score(X, y) + expected = est2.score(X, y) + # assert isinstance(result, da.Array) + assert_eq(result, expected) From d98c1a8db6bddcd866a1e8a371b381ed9ef60c19 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Sat, 2 Jun 2018 13:51:06 -0500 Subject: [PATCH 2/8] Update docs --- dask_ml/wrappers.py | 39 +++++++++++++++++++++++++++++++------ docs/source/incremental.rst | 22 ++++++++++++++------- 2 files changed, 48 insertions(+), 13 deletions(-) diff --git a/dask_ml/wrappers.py b/dask_ml/wrappers.py index 654bf37a4..b6a4ffc71 100644 --- a/dask_ml/wrappers.py +++ b/dask_ml/wrappers.py @@ -125,7 +125,27 @@ def transform(self, X): return transform(X) def score(self, X, y): - # TODO: re-implement some scoring functions. + """Returns the score on the given data. + + This uses the scoring defined by ``estimator.score``. This is + currently immediate and sequential. In the future, this will be + delayed and parallel. + + Parameters + ---------- + X : array-like, shape = [n_samples, n_features] + Input data, where n_samples is the number of samples and + n_features is the number of features. + + y : array-like, shape = [n_samples] or [n_samples, n_output], optional + Target relative to X for classification or regression; + None for unsupervised learning. + + Returns + ------- + score : float + return self.estimator.score(X, y) + """ return self.estimator.score(X, y) def predict(self, X): @@ -202,17 +222,18 @@ def _check_method(self, method): class Blockwise(ParallelPostFit): """Metaestimator for feeding Dask Arrays to an estimator blockwise. - This wrapper is provides a bridge between Dask objects and estimators + This wrapper provides a bridge between Dask objects and estimators implementing the ``partial_fit`` API. These estimators can train on batches of data, but simply passing a Dask array to their ``fit`` or ``partial_fit`` methods would materialize the large Dask Array on a single machine. - Calling :meth:`Streamable.fit` with a Dask Array will cause each block of - the Dask array to be passed to ``estimator.partial_fit`` *sequentially*. + Calling :meth:`Streamable.fit` with a Dask Array will pass each block of + the Dask array to to ``estimator.partial_fit`` *sequentially*. Like :class:`ParallelPostFit`, the methods available after fitting (e.g. - `score`, `predcit`, etc.) are all parallel and delayed. + :meth:`Blockwise.score`, :meth:`Blockwise.predcit`, etc.) are all parallel + and delayed. Parameters ---------- @@ -224,8 +245,14 @@ class Blockwise(ParallelPostFit): Examples -------- + >>> from dask_ml.wrappers import Blockwise + >>> from dask_ml.datasets import make_classification + >>> import sklearn.linear_model + >>> X, y = make_classification(chunks=25) + >>> est = sklearn.linear_model.SGDClassifier() + >>> clf = Blockwise(est, classes=[0, 1]) + >>> clf.fit(X, y) """ - def __init__(self, estimator, **kwargs): self.estimator = estimator self.fit_kwargs = kwargs diff --git a/docs/source/incremental.rst b/docs/source/incremental.rst index 580d14c66..19d6970be 100644 --- a/docs/source/incremental.rst +++ b/docs/source/incremental.rst @@ -16,7 +16,7 @@ The incremental learning tools in Dask-ML provide a bridge between Dask and Scikit-Learn estimators supporting the ``partial_fit`` API. Each individual chunk in a Dask Array can be passed to the estimator's ``partial_fit`` method. -Dask-ML provides two ways to achieve this: :ref:`incremental.blockwise-metaestimator`, for wrapping any estimator with a `partial_fit` method, and :ref:`incremental.pre-wrapped` for estimators that have already been wrapped in the meta-estimator. +Dask-ML provides two ways to achieve this: :ref:`incremental.blockwise-metaestimator`, for wrapping any estimator with a `partial_fit` method, and some pre-daskified :ref:`incremental.dask-friendly` incremental. .. _incremental.blockwise-metaestimator: @@ -71,17 +71,25 @@ care of passing each block to the underlying estimator for you. .. _incremental.pre-wrapped: -Pre-Wrapped Incremental Learners -================================ +Daskified Incremental Learners +=============================== -Dask-ML provides pre-wrapped versions of some common estimators. They'll be -found in the same namespace as their scikit-learn counterparts they wrap. +Dask-ML provides a few estimators that effectively do the wrapping for you. +The main differences from :class:`dask_ml.wrappers.Blockwise` is that + +1. They're found in the corresponding Dask-ML namespace (e.g. + :class:`dask_ml.linear_model.SGDClassifier`). +2. They're regular estimators (not meta-estimators). + +Calling them is just like calling the scikit-learn estimator, except that you +use Dask Arrays instead of NumPy arrays, and you pass all the ``fit_kwargs`` to +the estimator itself. .. ipython:: python - from dask_ml.linear_model import PartialSGDRegressor + from dask_ml.linear_model import PartialSGDClassifier from dask_ml.datasets import make_classification - est = PartialSGDRegressor() + est = PartialSGDClassifier(classes=[0, 1]) est.fit(X, y) See :ref:`api.incremental` for a full list of the wrapped estimators. From eb40dc071f9de05b2dc0b971287b8f315e291af6 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Mon, 4 Jun 2018 09:09:02 -0500 Subject: [PATCH 3/8] Rename to incremental. Removed make_blockwise helper. --- dask_ml/wrappers.py | 47 +++++++------------ docs/source/changelog.rst | 2 +- docs/source/incremental.rst | 39 +++------------ docs/source/modules/api.rst | 7 +-- ...{test_blockwise.py => test_incremental.py} | 8 ++-- 5 files changed, 29 insertions(+), 74 deletions(-) rename tests/{test_blockwise.py => test_incremental.py} (81%) diff --git a/dask_ml/wrappers.py b/dask_ml/wrappers.py index b6a4ffc71..22ee4adb1 100644 --- a/dask_ml/wrappers.py +++ b/dask_ml/wrappers.py @@ -219,21 +219,27 @@ def _check_method(self, method): return getattr(self.estimator, method) -class Blockwise(ParallelPostFit): +class Incremental(ParallelPostFit): """Metaestimator for feeding Dask Arrays to an estimator blockwise. This wrapper provides a bridge between Dask objects and estimators - implementing the ``partial_fit`` API. These estimators can train on - batches of data, but simply passing a Dask array to their ``fit`` or - ``partial_fit`` methods would materialize the large Dask Array on a single - machine. + implementing the ``partial_fit`` API. These *incremental learners* can + train on batches of data. This fits well with Dask's blocked data + structures. - Calling :meth:`Streamable.fit` with a Dask Array will pass each block of - the Dask array to to ``estimator.partial_fit`` *sequentially*. + See the `list of incremental learners`_ in the scikit-learn documentation + for a list of estimators that implement the ``partial_fit`` API. Note that + `Incremental` is not limited to just these classes, it will work on any + estimator implementing ``partial_fit``, including those defined outside of + scikit-learn itself. + + Calling :meth:`Incremental.fit` with a Dask Array will pass each block of + the Dask array or arrays to ``estimator.partial_fit`` *sequentially*. Like :class:`ParallelPostFit`, the methods available after fitting (e.g. - :meth:`Blockwise.score`, :meth:`Blockwise.predcit`, etc.) are all parallel - and delayed. + :meth:`Incremental.predict`, etc.) are all parallel and delayed. + + .. _list of incremental learners: http://scikit-learn.org/stable/modules/scaling_strategies.html#incremental-learning # noqa Parameters ---------- @@ -245,12 +251,12 @@ class Blockwise(ParallelPostFit): Examples -------- - >>> from dask_ml.wrappers import Blockwise + >>> from dask_ml.wrappers import Incremental >>> from dask_ml.datasets import make_classification >>> import sklearn.linear_model >>> X, y = make_classification(chunks=25) >>> est = sklearn.linear_model.SGDClassifier() - >>> clf = Blockwise(est, classes=[0, 1]) + >>> clf = Incremental(est, classes=[0, 1]) >>> clf.fit(X, y) """ def __init__(self, estimator, **kwargs): @@ -270,25 +276,6 @@ def fit(self, X, y=None): return self -def make_blockwise(estimator, **kwargs): - """Helper function for creating a :class:`Stremable` estimator. - - Parameters - ---------- - estimator : Estimator - Any object supporting the scikit-learn `parital_fit` API. - **kwargs - Additional keyword arguments passed through the the underlying - estimator's `partial_fit` method. - - Returns - ------- - Blockwise - """ - blockwise = Blockwise(estimator, **kwargs) - return blockwise - - def _first_block(dask_object): """Extract the first block / partition from a dask object """ diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index f5d327b60..b04e3775f 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -12,7 +12,7 @@ API Breaking Changes Enhancements ------------ -- Added a new meta-estimator :class:`dask_ml.wrappers.Blockwise` for wrapping any estimator with a `partial_fit` method (). See :ref:`incremental.blockwise-metaestimator` for more. +- Added a new meta-estimator :class:`dask_ml.wrappers.Incremental` for wrapping any estimator with a `partial_fit` method (). See :ref:`incremental.blockwise-metaestimator` for more. Version 0.5.0 ~~~~~~~~~~~~~ diff --git a/docs/source/incremental.rst b/docs/source/incremental.rst index 19d6970be..c5e28cc31 100644 --- a/docs/source/incremental.rst +++ b/docs/source/incremental.rst @@ -20,15 +20,15 @@ Dask-ML provides two ways to achieve this: :ref:`incremental.blockwise-metaestim .. _incremental.blockwise-metaestimator: -Blockwise Metaestimator ------------------------ +Incremental Metaestimator +------------------------- .. currentmodule:: dask_ml .. autosummary:: - wrappers.Blockwise + wrappers.Incremental -:class:`dask_ml.wrappers.Blockwise` is a meta-estimator (an estimator that +:class:`dask_ml.wrappers.Incremental` is a meta-estimator (an estimator that takes another estimator) that bridges scikit-learn estimators expecting NumPy arrays, and users with large Dask Arrays. @@ -43,14 +43,14 @@ between machines. .. ipython:: python from dask_ml.datasets import make_classification - from dask_ml.wrappers import Blockwise + from dask_ml.wrappers import Incremental from sklearn.linear_model import SGDClassifier X, y = make_classification(chunks=25) X estimator = SGDClassifier(random_state=10) - clf = Blockwise(estimator, classes=[0, 1]) + clf = Incremental(estimator, classes=[0, 1]) clf.fit(X, y) In this example, we make a (small) random Dask Array. It has 100 samples, @@ -61,7 +61,7 @@ You instantite the underlying estimator as usual. It really is just a scikit-learn compatible estimator, and will be trained normally via its ``partial_fit``. -When wrapping the estimator in :class:`Blockwise`, you need to pass any +When wrapping the estimator in :class:`Incremental`, you need to pass any keyword arguments that are expected by the underlying ``partial_fit`` method. With :class:`sklearn.linear_model.SGDClassifier`, we're required to provide the list of unique ``classes`` in ``y``. @@ -69,29 +69,4 @@ the list of unique ``classes`` in ``y``. Notice that we call the regular ``.fit`` method for training. Dask-ML takes care of passing each block to the underlying estimator for you. -.. _incremental.pre-wrapped: - -Daskified Incremental Learners -=============================== - -Dask-ML provides a few estimators that effectively do the wrapping for you. -The main differences from :class:`dask_ml.wrappers.Blockwise` is that - -1. They're found in the corresponding Dask-ML namespace (e.g. - :class:`dask_ml.linear_model.SGDClassifier`). -2. They're regular estimators (not meta-estimators). - -Calling them is just like calling the scikit-learn estimator, except that you -use Dask Arrays instead of NumPy arrays, and you pass all the ``fit_kwargs`` to -the estimator itself. - -.. ipython:: python - - from dask_ml.linear_model import PartialSGDClassifier - from dask_ml.datasets import make_classification - est = PartialSGDClassifier(classes=[0, 1]) - est.fit(X, y) - -See :ref:`api.incremental` for a full list of the wrapped estimators. - .. _incremental learning: http://scikit-learn.org/stable/modules/scaling_strategies.html#incremental-learning diff --git a/docs/source/modules/api.rst b/docs/source/modules/api.rst index 36cfa7994..82f4852e1 100644 --- a/docs/source/modules/api.rst +++ b/docs/source/modules/api.rst @@ -73,12 +73,7 @@ compatible estimators with Dask arrays. :template: class.rst wrappers.ParallelPostFit - wrappers.Blockwise - -.. autosummary:: - :toctree: generated/ - - wrappers.make_blockwise + wrappers.Incremental .. _api.incremental-learning: diff --git a/tests/test_blockwise.py b/tests/test_incremental.py similarity index 81% rename from tests/test_blockwise.py rename to tests/test_incremental.py index 833c63041..22df59b1e 100644 --- a/tests/test_blockwise.py +++ b/tests/test_incremental.py @@ -1,21 +1,19 @@ import dask.array as da from dask.array.utils import assert_eq import numpy as np -import pytest from sklearn.base import clone from sklearn.linear_model import SGDClassifier -from dask_ml.wrappers import Blockwise, make_blockwise +from dask_ml.wrappers import Incremental from dask_ml.utils import assert_estimator_equal -@pytest.mark.parametrize('maker', [Blockwise, make_blockwise]) -def test_blockwise_basic(xy_classification, maker): +def test_incremental_basic(xy_classification): X, y = xy_classification est1 = SGDClassifier(random_state=0) est2 = clone(est1) - clf = maker(est1, classes=[0, 1]) + clf = Incremental(est1, classes=[0, 1]) result = clf.fit(X, y) for slice_ in da.core.slices_from_chunks(X.chunks): est2.partial_fit(X[slice_], y[slice_[0]], classes=[0, 1]) From 0f3dab715c215b37b484a0f08774ded49a621f8b Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Mon, 4 Jun 2018 10:47:43 -0500 Subject: [PATCH 4/8] Deprecate the old --- dask_ml/_partial.py | 17 +++++++++++++++ dask_ml/wrappers.py | 17 ++++++++++++++- docs/source/changelog.rst | 7 ++++--- docs/source/incremental.rst | 21 +++++++++---------- tests/linear_model/test_neural_network.py | 2 ++ tests/linear_model/test_passive_aggressive.py | 4 ++++ tests/linear_model/test_perceptron.py | 3 +++ .../linear_model/test_stochastic_gradient.py | 15 +++++++++++++ tests/test_minibatch.py | 3 +++ tests/test_naive_bayes.py | 3 +++ 10 files changed, 77 insertions(+), 15 deletions(-) diff --git a/dask_ml/_partial.py b/dask_ml/_partial.py index 402aeb505..1edc7fcbd 100644 --- a/dask_ml/_partial.py +++ b/dask_ml/_partial.py @@ -1,6 +1,7 @@ from __future__ import absolute_import, division, print_function import os +import warnings from abc import ABCMeta import numpy as np @@ -22,6 +23,12 @@ class _WritableDoc(ABCMeta): # TODO: Py2: remove all this +_partial_deprecation = ( + "'{cls.__name__}' is deprecated. Use " + "'dask_ml.wrappers.Incremental({base.__name__}(), **kwargs)' " + "instead." +) + @six.add_metaclass(_WritableDoc) class _BigPartialFitMixin(object): """ Wraps a partial_fit enabled estimator for use with Dask arrays """ @@ -30,6 +37,7 @@ class _BigPartialFitMixin(object): _fit_kwargs = [] def __init__(self, **kwargs): + self._deprecated() missing = set(self._init_kwargs) - set(kwargs) if missing: @@ -40,6 +48,15 @@ def __init__(self, **kwargs): setattr(self, kwarg, kwargs.pop(kwarg)) super(_BigPartialFitMixin, self).__init__(**kwargs) + @classmethod + def _deprecated(cls): + for base in cls.mro(): + if base.__module__.startswith('sklearn'): + break + + warnings.warn(_partial_deprecation.format(cls=cls, base=base), + FutureWarning) + @classmethod def _get_param_names(cls): # Evil hack to make sure repr, get_params work diff --git a/dask_ml/wrappers.py b/dask_ml/wrappers.py index 22ee4adb1..77c40dd0d 100644 --- a/dask_ml/wrappers.py +++ b/dask_ml/wrappers.py @@ -244,7 +244,7 @@ class Incremental(ParallelPostFit): Parameters ---------- estimator : Estimator - Any object supporting the scikit-learn `parital_fit` API. + Any object supporting the scikit-learn ``parital_fit`` API. **kwargs Additional keyword arguments passed through the the underlying estimator's `partial_fit` method. @@ -275,6 +275,21 @@ def fit(self, X, y=None): setattr(self, k, v) return self + def partial_fit(self, X, y=None): + """Fit the underlying estimator. + + This is identical to ``fit``. + + Parameters + ---------- + X, y : array-like + + Returns + ------- + self : object + """ + return self.fit(X, y) + def _first_block(dask_object): """Extract the first block / partition from a dask object diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index b04e3775f..e58f7f857 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -7,12 +7,13 @@ Version 0.6.0 API Breaking Changes -------------------- -- Removed the `get` keyword from the incremental learner ``fit`` methods (:pr:`187`) - +- Removed the `get` keyword from the incremental learner ``fit`` methods. (:pr:`187`) +- Deprecated the various ``Partial*`` estimators in favor of the :class:`dask_ml.wrappers.Incremental` meta-estimator (:pr:`190`) + Enhancements ------------ -- Added a new meta-estimator :class:`dask_ml.wrappers.Incremental` for wrapping any estimator with a `partial_fit` method (). See :ref:`incremental.blockwise-metaestimator` for more. +- Added a new meta-estimator :class:`dask_ml.wrappers.Incremental` for wrapping any estimator with a `partial_fit` method. See :ref:`incremental.blockwise-metaestimator` for more. (:pr:`190`) Version 0.5.0 ~~~~~~~~~~~~~ diff --git a/docs/source/incremental.rst b/docs/source/incremental.rst index c5e28cc31..1ac8c0e82 100644 --- a/docs/source/incremental.rst +++ b/docs/source/incremental.rst @@ -3,25 +3,24 @@ Incremental Learning ==================== -Some estimators can be trained incrementally -- without seeing the entire dataset at once. -Scikit-Learn provdes the ``partial_fit`` API to let you stream batches of data -to an estimator that can be fit in batches. +Some estimators can be trained incrementally -- without seeing the entire +dataset at once. Scikit-Learn provdes the ``partial_fit`` API to stream batches +of data to an estimator that can be fit in batches. Normally, if you pass a Dask Array to an estimator expecting a NumPy array, the Dask Array will be converted to a single, large NumPy array. On a single machine, you'll likely run out of RAM and crash the program. On a distributed cluster, all the workers will send their data to a single machine and crash it. -The incremental learning tools in Dask-ML provide a bridge between Dask and -Scikit-Learn estimators supporting the ``partial_fit`` API. Each individual -chunk in a Dask Array can be passed to the estimator's ``partial_fit`` method. - -Dask-ML provides two ways to achieve this: :ref:`incremental.blockwise-metaestimator`, for wrapping any estimator with a `partial_fit` method, and some pre-daskified :ref:`incremental.dask-friendly` incremental. +:class:`dask_ml.wrappers.Incremental` provides a bridge between Dask and +Scikit-Learn estimators supporting the ``partial_fit`` API. You wrap the +underlying estimator in ``Incremental``. Dask-ML will sequentially pass each +block of a Dask Array to the underlying estimator's ``partial_fit`` method. .. _incremental.blockwise-metaestimator: -Incremental Metaestimator -------------------------- +Incremental Meta-estimator +-------------------------- .. currentmodule:: dask_ml @@ -35,7 +34,7 @@ NumPy arrays, and users with large Dask Arrays. Each *block* of a Dask Array is fed to the underlying estiamtor's ``partial_fit`` method. The training is entirely sequential, so you won't notice massive training time speedups from parallelism. In a distributed -environment, you should notice some speeds from avoiding extra IO, and the +environment, you should notice some speedup from avoiding extra IO, and the fact that models are typically much smaller than data, and so faster to move between machines. diff --git a/tests/linear_model/test_neural_network.py b/tests/linear_model/test_neural_network.py index 63a20c614..336023699 100644 --- a/tests/linear_model/test_neural_network.py +++ b/tests/linear_model/test_neural_network.py @@ -4,6 +4,7 @@ from dask_ml.utils import assert_estimator_equal +@pytest.mark.filterwarnings("ignore:'Partial:FutureWarning") class TestMLPClassifier(object): def test_basic(self, single_chunk_classification): @@ -15,6 +16,7 @@ def test_basic(self, single_chunk_classification): assert_estimator_equal(a, b) +@pytest.mark.filterwarnings("ignore:'Partial:FutureWarning") class TestMLPRegressor(object): def test_basic(self, single_chunk_classification): diff --git a/tests/linear_model/test_passive_aggressive.py b/tests/linear_model/test_passive_aggressive.py index 823d15aa3..6ad994dce 100644 --- a/tests/linear_model/test_passive_aggressive.py +++ b/tests/linear_model/test_passive_aggressive.py @@ -1,9 +1,12 @@ +import pytest + from sklearn import linear_model as lm_ from dask_ml import linear_model as lm from dask_ml.utils import assert_estimator_equal +@pytest.mark.filterwarnings("ignore:'Partial:FutureWarning") class TestPassiveAggressiveClassifier(object): def test_basic(self, single_chunk_classification): @@ -18,6 +21,7 @@ def test_basic(self, single_chunk_classification): assert_estimator_equal(a, b, exclude=['loss_function_']) +@pytest.mark.filterwarnings("ignore:'Partial:FutureWarning") class TestPassiveAggressiveRegressor(object): def test_basic(self, single_chunk_regression): diff --git a/tests/linear_model/test_perceptron.py b/tests/linear_model/test_perceptron.py index 7083c84dc..bf050198c 100644 --- a/tests/linear_model/test_perceptron.py +++ b/tests/linear_model/test_perceptron.py @@ -1,9 +1,12 @@ +import pytest + from sklearn.linear_model import Perceptron from dask_ml.linear_model import PartialPerceptron from dask_ml.utils import assert_estimator_equal +@pytest.mark.filterwarnings("ignore:'Partial:FutureWarning") class TestPerceptron(object): def test_basic(self, single_chunk_classification): diff --git a/tests/linear_model/test_stochastic_gradient.py b/tests/linear_model/test_stochastic_gradient.py index 66724f49c..f732b7a64 100644 --- a/tests/linear_model/test_stochastic_gradient.py +++ b/tests/linear_model/test_stochastic_gradient.py @@ -1,3 +1,4 @@ +import pytest from dask.delayed import Delayed from sklearn import linear_model as lm_ from dask_ml import linear_model as lm @@ -5,6 +6,7 @@ from dask_ml.utils import assert_estimator_equal +@pytest.mark.filterwarnings("ignore:'Partial:FutureWarning") class TestStochasticGradientClassifier(object): def test_basic(self, single_chunk_classification): @@ -32,6 +34,7 @@ def test_numpy_arrays(self, single_chunk_classification): a.score(X, y) +@pytest.mark.filterwarnings("ignore:'Partial:FutureWarning") class TestStochasticGradientRegressor(object): def test_basic(self, single_chunk_regression): @@ -56,6 +59,7 @@ def test_numpy_arrays(self, single_chunk_regression): a.score(X, y) +@pytest.mark.filterwarnings("ignore:'Partial:FutureWarning") def test_lazy(xy_classification): X, y = xy_classification sgd = lm.PartialSGDClassifier(classes=[0, 1]) @@ -63,3 +67,14 @@ def test_lazy(xy_classification): assert isinstance(r, Delayed) result = r.compute() assert isinstance(result, lm_.SGDClassifier) + + +def test_deprecated(): + expected = ( + r"'PartialSGDClassifier' is deprecated. Use " + r"'dask_ml.wrappers.Incremental.*SGDClassifier.*" + r"instead." + ) + + with pytest.warns(FutureWarning, match=expected): + lm.PartialSGDClassifier(classes=[0, 1]) diff --git a/tests/test_minibatch.py b/tests/test_minibatch.py index b4478ba02..a3601e76d 100644 --- a/tests/test_minibatch.py +++ b/tests/test_minibatch.py @@ -1,9 +1,12 @@ +import pytest + from sklearn import cluster as cluster_ from dask_ml import cluster from dask_ml.utils import assert_estimator_equal +@pytest.mark.filterwarnings("ignore:'Partial:FutureWarning") class TestMiniBatchKMeans(object): def test_basic(self, single_chunk_blobs): diff --git a/tests/test_naive_bayes.py b/tests/test_naive_bayes.py index 00d2b7239..6dc4fef84 100644 --- a/tests/test_naive_bayes.py +++ b/tests/test_naive_bayes.py @@ -1,3 +1,4 @@ +import pytest from dask.array.utils import assert_eq from dask_ml.datasets import make_classification from dask_ml import naive_bayes as nb @@ -24,6 +25,7 @@ def test_smoke(): assert_eq(a.predict_log_proba(X).compute(), b.predict_log_proba(X_)) +@pytest.mark.filterwarnings("ignore:'Partial:FutureWarning") class TestPartialMultinomialNB(object): def test_basic(self, single_chunk_count_classification): X, y = single_chunk_count_classification @@ -34,6 +36,7 @@ def test_basic(self, single_chunk_count_classification): assert_eq(a.coef_, b.coef_) +@pytest.mark.filterwarnings("ignore:'Partial:FutureWarning") class TestPartialBernoulliNB(object): def test_basic(self, single_chunk_binary_classification): X, y = single_chunk_binary_classification From c264dbb7c836609172fa2074189b5b8f74c5a0df Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Mon, 4 Jun 2018 10:50:14 -0500 Subject: [PATCH 5/8] document deprecation --- dask_ml/_partial.py | 3 +++ docs/source/modules/api.rst | 28 ---------------------------- 2 files changed, 3 insertions(+), 28 deletions(-) diff --git a/dask_ml/_partial.py b/dask_ml/_partial.py index 1edc7fcbd..bddc61483 100644 --- a/dask_ml/_partial.py +++ b/dask_ml/_partial.py @@ -211,6 +211,9 @@ def _copy_partial_doc(cls): insert = """ + .. deprecated:: 0.6.0 + Use the :class:`dask_ml.wrappers.Incremental` meta-estimator instead. + This class wraps scikit-learn's {classname}. When a dask-array is passed to our ``fit`` method, the array is passed block-wise to the scikit-learn class' ``partial_fit`` method. This will allow you to fit the estimator diff --git a/docs/source/modules/api.rst b/docs/source/modules/api.rst index 82f4852e1..13dd2637e 100644 --- a/docs/source/modules/api.rst +++ b/docs/source/modules/api.rst @@ -75,34 +75,6 @@ compatible estimators with Dask arrays. wrappers.ParallelPostFit wrappers.Incremental - -.. _api.incremental-learning: - -Incremental Learning -==================== - -.. currentmodule:: dask_ml - -Some scikit-learn estimators support out-of-core training through the -``partial_fit`` method. The following estimators wrap those scikit-learn -estimators, allowing them to be used in Pipelines and on Dask arrays and -dataframes. Training will still be serial, so these will not benefit from -a parallel or distributed training any more than the underlying estimator. - -.. autosummary:: - :toctree: generated/ - :template: class.rst - - cluster.PartialMiniBatchKMeans - linear_model.PartialPassiveAggressiveClassifier - linear_model.PartialPassiveAggressiveRegressor - linear_model.PartialPerceptron - linear_model.PartialSGDClassifier - linear_model.PartialSGDRegressor - naive_bayes.PartialBernoulliNB - naive_bayes.PartialMultinomialNB - - :mod:`dask_ml.cluster`: Clustering ================================== From aaba0504ff222108517df2008b530d709f5d7e23 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Mon, 4 Jun 2018 10:55:58 -0500 Subject: [PATCH 6/8] partial fit test --- tests/test_incremental.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/test_incremental.py b/tests/test_incremental.py index 22df59b1e..6b32761bc 100644 --- a/tests/test_incremental.py +++ b/tests/test_incremental.py @@ -36,3 +36,7 @@ def test_incremental_basic(xy_classification): expected = est2.score(X, y) # assert isinstance(result, da.Array) assert_eq(result, expected) + + clf = Incremental(SGDClassifier(random_state=0), classes=[0, 1]) + clf.partial_fit(X, y) + assert_estimator_equal(clf.estimator, est2, exclude=['loss_function_']) From ccc858c038b4fbd4b0f42efd417558c1a17030b1 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Mon, 4 Jun 2018 11:04:32 -0500 Subject: [PATCH 7/8] linting --- dask_ml/_partial.py | 1 + tests/linear_model/test_neural_network.py | 2 ++ 2 files changed, 3 insertions(+) diff --git a/dask_ml/_partial.py b/dask_ml/_partial.py index bddc61483..65949cdac 100644 --- a/dask_ml/_partial.py +++ b/dask_ml/_partial.py @@ -29,6 +29,7 @@ class _WritableDoc(ABCMeta): "instead." ) + @six.add_metaclass(_WritableDoc) class _BigPartialFitMixin(object): """ Wraps a partial_fit enabled estimator for use with Dask arrays """ diff --git a/tests/linear_model/test_neural_network.py b/tests/linear_model/test_neural_network.py index 336023699..aaa895b6c 100644 --- a/tests/linear_model/test_neural_network.py +++ b/tests/linear_model/test_neural_network.py @@ -1,3 +1,5 @@ +import pytest + from sklearn import neural_network as nn_ from dask_ml import neural_network as nn From 0f368c8abae49760b1710b989cc46382db7e886c Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Mon, 4 Jun 2018 12:19:16 -0500 Subject: [PATCH 8/8] CI fixup --- tests/linear_model/test_neural_network.py | 4 ++-- tests/linear_model/test_stochastic_gradient.py | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/linear_model/test_neural_network.py b/tests/linear_model/test_neural_network.py index aaa895b6c..79b0efa56 100644 --- a/tests/linear_model/test_neural_network.py +++ b/tests/linear_model/test_neural_network.py @@ -6,7 +6,7 @@ from dask_ml.utils import assert_estimator_equal -@pytest.mark.filterwarnings("ignore:'Partial:FutureWarning") +@pytest.mark.filterwarnings("ignore::FutureWarning") class TestMLPClassifier(object): def test_basic(self, single_chunk_classification): @@ -18,7 +18,7 @@ def test_basic(self, single_chunk_classification): assert_estimator_equal(a, b) -@pytest.mark.filterwarnings("ignore:'Partial:FutureWarning") +@pytest.mark.filterwarnings("ignore::FutureWarning") class TestMLPRegressor(object): def test_basic(self, single_chunk_classification): diff --git a/tests/linear_model/test_stochastic_gradient.py b/tests/linear_model/test_stochastic_gradient.py index f732b7a64..6f8634d0b 100644 --- a/tests/linear_model/test_stochastic_gradient.py +++ b/tests/linear_model/test_stochastic_gradient.py @@ -1,4 +1,5 @@ import pytest +import six from dask.delayed import Delayed from sklearn import linear_model as lm_ from dask_ml import linear_model as lm @@ -69,6 +70,7 @@ def test_lazy(xy_classification): assert isinstance(result, lm_.SGDClassifier) +@pytest.mark.skipif(six.PY2, reason="Python 2 failure.") def test_deprecated(): expected = ( r"'PartialSGDClassifier' is deprecated. Use "