-
-
Notifications
You must be signed in to change notification settings - Fork 254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Blockwise Metaestimator #190
Conversation
This also doesn't yet work with grid search. I'm doing that this week in another PR. |
I agree with avoiding the term Partial and Streamable. How about Incremental or Sequential? To me Blockwise doesn't capture the sequential nature of the computation. We have other blockwise operations like map_blocks that operate quite differently. |
I see the example uses Maybe fuse them somehow like |
This would close #188, correct? Ditto for avoiding Partial and Streamable. How 'bout |
dask_ml/wrappers.py
Outdated
machine. | ||
|
||
Calling :meth:`Streamable.fit` with a Dask Array will pass each block of | ||
the Dask array to to ``estimator.partial_fit`` *sequentially*. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should expand on "sequentially". Maybe something like
More concretely, when :meth:
Blockwise.fit
is called it callsestimator.partial_fit
on each set of blocks in the data arraysX
andy
. It waits forestimator.partial_fit
to complete before calling it again on the next block.
I like Incremental. I avoided that in the old class-based approach because that would lead to name like IncrementalIncrementalPCA :)
________________________________
From: Scott Sievert <notifications@github.com>
Sent: Saturday, June 2, 2018 6:52:15 PM
To: dask/dask-ml
Cc: Tom Augspurger; Author
Subject: Re: [dask/dask-ml] [WIP] Blockwise Metaestimator (#190)
@stsievert requested changes on this pull request.
________________________________
In dask_ml/wrappers.py<#190 (comment)>:
@@ -199,6 +219,76 @@ def _check_method(self, method):
return getattr(self.estimator, method)
+class Blockwise(ParallelPostFit):
+ """Metaestimator for feeding Dask Arrays to an estimator blockwise.
+
+ This wrapper provides a bridge between Dask objects and estimators
+ implementing the ``partial_fit`` API. These estimators can train on
+ batches of data, but simply passing a Dask array to their ``fit`` or
+ ``partial_fit`` methods would materialize the large Dask Array on a single
+ machine.
+
+ Calling :meth:`Streamable.fit` with a Dask Array will pass each block of
+ the Dask array to to ``estimator.partial_fit`` *sequentially*.
I think we need to expand upon "sequentially". Maybe something like
More concretely, when :meth:Blockwise.fit is called it calls estimator.partial_fit on each set of blocks in the data arrays X and y. It waits for estimator.partial_fit to complete before calling it again on the next block.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub<#190 (review)>, or mute the thread<https://github.com/notifications/unsubscribe-auth/ABQHIj-9vSD5CItr6uK_F-5815IhB4Cqks5t4yUvgaJpZM4UXyq3>.
|
dask_ml/wrappers.py
Outdated
``partial_fit`` methods would materialize the large Dask Array on a single | ||
machine. | ||
|
||
Calling :meth:`Streamable.fit` with a Dask Array will pass each block of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is Streamable.fit
here intended?
dask_ml/wrappers.py
Outdated
Blockwise | ||
""" | ||
blockwise = Blockwise(estimator, **kwargs) | ||
return blockwise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this function is an alias for Blockwise
. Why do we want it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ogrisel mentioned a make_
helper. Do you have thoughts here?
Given the simplicity of the class-API, I'm inclined to just remove the function helper.
docs/source/incremental.rst
Outdated
Scikit-Learn estimators supporting the ``partial_fit`` API. Each individual | ||
chunk in a Dask Array can be passed to the estimator's ``partial_fit`` method. | ||
|
||
Dask-ML provides two ways to achieve this: :ref:`incremental.blockwise-metaestimator`, for wrapping any estimator with a `partial_fit` method, and some pre-daskified :ref:`incremental.dask-friendly` incremental. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider deleting these pre-daskified versions? I wonder if we can expect users to build these themselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been mulling this over today. I think we should remove the pre-daskified versions. The meta-estimator approach is clearly more flexible. It works with estimators outside scikit-learn implementing the partial_fit
API.
The downside of the meta-estimator is (the poor) discoverability. People scanning the API docs could miss the meta-estimator and think "oh, Dask-ML doesn't do dictionary learning". But let's expect the best of our users and guide them to the preferred solution with better documentation and examples.
This will work with any class implementing
Yep, updated. |
Does this PR change |
Could you clarify? for |
Perfect, that’s exactly what I meant. Do we want to implement a test for this? There is a test for fit, but not for partial fit. |
Removed make_blockwise helper.
@stsievert do you mean |
Removed the WIP status. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me.
@@ -22,6 +23,13 @@ class _WritableDoc(ABCMeta): | |||
# TODO: Py2: remove all this | |||
|
|||
|
|||
_partial_deprecation = ( | |||
"'{cls.__name__}' is deprecated. Use " | |||
"'dask_ml.wrappers.Incremental({base.__name__}(), **kwargs)' " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should Incremental
be top-level? This might be a broader discussion though about namespaces.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dislike the .wrappers
namespace, but I haven't put much thought into a replacement.
Last CI failure came from a python2. I've verified that the warning appears manually, and am not inclined to spend time debugging the test, so I've skipped it on py2. |
Merging this, since it'll be working on hyperparameter optimization around it next. |
I've thought more about the naming, EDIT: ...and you mentioned the same goal (what, not how) in #194. Go figure. |
How would we go about pinning an |
At the moment, we haven't thought much about designing an API for that. All
else equal, if the data is indeed cheaper to move than the model, then the
scheduler should choose that route.
…On Mon, Jun 4, 2018 at 1:25 PM, jakirkham ***@***.***> wrote:
How would we go about pinning an Incremental learner to a particular
worker? Expect this will come up when dealing with matrix decomposition
problems where the learner is more expensive to move than the blocks.
—
You are receiving this because you modified the open/close state.
Reply to this email directly, view it on GitHub
<#190 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/ABQHInVC98fPpEHIoVL-Vn1oItATbdPmks5t5Xu3gaJpZM4UXyq3>
.
|
Agreed. Do we know if the scheduler has enough information to make that decision? In particular, will it inspect the attributes of the model when estimating size? If the answer is no to these, what sort of workarounds (like manual pinning) are available to us? |
This is proposed here: scikit-learn/scikit-learn#8642 SKLearn devs seemed amenable to it. If @stsievert has time and interest this might be a good issue to start interacting with the Scikit-Learn community. I suspect that it will be important for proper distributed scheduling. |
Thanks for linking that issue. Sounds like that will likely be solved by the next scikit-learn release, correct? In the interim, what should we be doing to keep the model from moving? Open even to hacky solutions in the short term. :) |
I think that my preferred way would be to implement it in scikit-learn and then use the master version of that short-term. If that approach concerns people then we could also implement this in the distributed.sizeof function. |
Maybe our concepts of short term in this context differ. :) I'm thinking, "what could one do today?" ;) |
Make sure you're on the latest version of dask-ml.
…On Thu, Feb 7, 2019 at 4:04 AM Soumyajaganathan ***@***.***> wrote:
Here, I am just trying to fit the large dataset with Incremental in DASK.
But I am getting error like this,
from dask_ml.datasets import make_classification
from dask_ml.wrappers import Incremental
from sklearn.linear_model import SGDClassifier
from dask.distributed import client
import dask
X, y = make_classification(chunks=25)
estimator = SGDClassifier(random_state=10)
clf = Incremental(estimator,shuffle_blocks=True,random_state=0)
clf.fit(X, y)
*Error*:
--> 197 new_dsk = dask.sharedict.merge((name, dsk), x.dask, getattr(y,
"dask", {}))
198 value = Delayed((name, nblocks - 1), new_dsk)
199
AttributeError: module 'dask' has no attribute 'sharedict'
Then I imported sharedict from DASK to fix this,
from dask_ml.datasets
import make_classification
from dask_ml.wrappers import Incremental
from sklearn.linear_model import SGDClassifier
from dask.distributed
import client import dask.sharedict
import dask import dask.delayed
from toolz import merge
from toolz import partial
rom dask.delayed import Delayed
X, y = make_classification(chunks=25)
estimator = SGDClassifier(random_state=10)
clf = Incremental(estimator,shuffle_blocks=True,random_state=0)
clf.fit(X, y)
*Error*:
~/.local/lib/python3.6/site-packages/dask/base.py in
_extract_graph_and_keys(vals)
210 graph = HighLevelGraph.merge(*graphs)
211 else:
--> 212 graph = merge(*graphs)
213
214 return graph, keys
~/.local/lib/python3.6/site-packages/toolz/dicttoolz.py in merge(*dicts,
**kwargs)
37 rv = factory()
38 for d in dicts:
---> 39 rv.update(d)
40 return rv
41
ValueError: dictionary update sequence element #0 has length 36; 2 is
required
Help me to fix this,
—
You are receiving this because you modified the open/close state.
Reply to this email directly, view it on GitHub
<#190 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/ABQHInCWTJieuY-OqsPd7qrOxFxCNYu1ks5vK_pKgaJpZM4UXyq3>
.
|
Yes, got it. Thanks |
Adds a meta-estimator for wrapping estimators that implement
partial_fit
.A few notes:
Blockwise
. We could also doStreamable
, but I worry people will avoid using it if they think "I don't have streaming data, so this isn't for me.". Any thoughts on the name?Partial*
estimators scattered throughout dask-ml in terms of code and use. I can / will reduce the code duplication before merging. API-wise, there are benefits to the both. The meta-estimator is nice since it can wrap any sklearn-compatible estimator that implementspartial_fit
(e.g. from modl). But the "pre-wrapped" versions are maybe nicer for discovery?**kwargs
for the signature instead of afit_params
dict. I could see either working but**kwargs
felt a bit more natural.cc @jakirkham @ogrisel
Closes #188