New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Joblib scatter #1022

Merged
merged 4 commits into from Apr 27, 2017

Conversation

Projects
None yet
4 participants
@jcrist
Member

jcrist commented Apr 17, 2017

Allows to prescatter data to the cluster. For large arguments (big arrays, etc...) that are used in more than one task, this can be more efficient as it avoids serializing the data for every task.

Also fix to ensure distributed backend is an instance of the ParallelBackendBase for both scikit-learn and joblib modules.

@jcrist

This comment has been minimized.

Member

jcrist commented Apr 17, 2017

My benchmarks show this helps, but only a little bit. Have yet to find a scikit-learn task where the distributed backend is noticeably more performant than the existing threading/multiprocessing backends (except for grid-search, which is still beat by dask-searchcv). This PR helps though, but only for code where the serialization time is large compared to compute time.

One issue is that internal scikit-learn code may transform the input data, so the objects that make it to joblib aren't the same as the input ones (e.g. the y that reaches the joblib step isn't the same as was input to fit). This seems to be mainly true for y, and less so for X (which is usually larger). I can't think of a simple workaround for this, and am not convinced that the possible speedup is worth the code complication.

@mrocklin

This comment has been minimized.

Member

mrocklin commented Apr 18, 2017

Are the y's different objects or are they changing in place?

@mrocklin

This comment has been minimized.

Member

mrocklin commented Apr 18, 2017

I wouldn't be surprised if joblib was near full speed on a single machine. This might have more of an impact on a cluster where communication costs are larger.

@jcrist

This comment has been minimized.

Member

jcrist commented Apr 18, 2017

Are the y's different objects or are they changing in place?

Different objects. Usually it's a small thing like a reshape or dtype change.

@mrocklin

This comment has been minimized.

Member

mrocklin commented Apr 26, 2017

What is the status here? Is this code helpful?

@jcrist

This comment has been minimized.

Member

jcrist commented Apr 26, 2017

It's useful, but still fails to beat single machine performance for anything except grid_search (which is still beat by dask_searchcv). It does improve performance though. Fix merge conflicts and merge?

@mrocklin

This comment has been minimized.

Member

mrocklin commented Apr 26, 2017

It's useful, but still fails to beat single machine performance for anything except grid_search (which is still beat by dask_searchcv). It does improve performance though. Fix merge conflicts and merge?

Do you mean to say that if we have a cluster we should still prefer to use a single machine or that on a single machine one should use the threaded/multiprocessing joblib backends? If the former then did you get a sense for what the limiting factor was?

@jcrist

This comment has been minimized.

Member

jcrist commented Apr 26, 2017

The latter. The former depends on the work:serialization-cost ratio, and how many tasks there are. Grid-Search is a good use here, as fitting individual estimators is usually pretty expensive, and we're using the same data multiple times (which can be cached).

@mrocklin

This comment has been minimized.

Member

mrocklin commented Apr 26, 2017

Are there joblib-accelerated algorithms within sklearn that have a high work-to-serialization-cost ratio other than grid search? cc @ogrisel

@jcrist

This comment has been minimized.

Member

jcrist commented Apr 26, 2017

Ok, I ran a benchmark of fitting using a modified version of this scikit-learn benchmark (see here for the code). This fits a RandomForestClassifier and a ExtraTreesClassifier on the same data using both the threading and dask.distributed backends.

The threading backend was run on a single m4.2xlarge instance with 8 cores. The distributed was run with three of these as workers, for a total of 24 cores. Here are the results:

ubuntu@ip-172-31-43-228:~$ python bench.py
Loading dataset...
Creating train-test split...
Dataset statistics:
===================
number of features:       54
number of classes:        2
data type:                float32
number of train samples:  522911 (pos=332178, neg=190733, size=112MB)
number of test samples:   58101 (pos=36994, neg=21107, size=12MB)

Training Classifiers
====================
Training ExtraTreesClassifier with threading backend... done
Training ExtraTreesClassifier with dask.distributed backend... done
Training RandomForest with threading backend... done
Training RandomForest with dask.distributed backend... done

Classification performance:
===========================
Classifier   train-time test-time error-rate
--------------------------------------------
RandomForest, threading 35.2176s 0.5228s 0.0296
RandomForest, dask.distributed 11.4347s 3.0020s 0.0296
ExtraTreesClassifier, threading 33.4432s 0.7229s 0.0325
ExtraTreesClassifier, dask.distributed 16.7369s 6.5222s 0.0325

As you can see here, train time for RandomForestClassifier is roughly 1/3 when run distributed (which makes sense). ExtraTreesClassifier is roughly 1/2. Note that the predict time is slower though, which also makes sense as predict is a pretty fast operation.

I tried running without the scatter option and ran into some memory errors in pickle - not sure the cause.

As such, I think this is a net win. I have a patch for scikit-learn that I haven't pushed yet to allow overriding the backend for fitting on these classes, which would avoid the need for the monkeypatch in the benchmark above.

(cc @amueller, who recommended this benchmark a few months ago :))

@mrocklin

This comment has been minimized.

Member

mrocklin commented Apr 26, 2017

To my naive understanding this seems pretty great?

Hijacking joblib like this seems like the least intrusive way to accelerate scikit-learn with dask.distributed. It would be interesting to see both how well this scales and also what other common computations in scikit-learn might benefit.

also cc'ing @GaelVaroquaux

For the perspective of this PR I think it's clear that it provides an improvement. Working towards merging sounds good to me.

@amueller

This comment has been minimized.

amueller commented Apr 26, 2017

That looks pretty good indeed. Maybe get some error bars? It looks like the speedup for RF is slightly more than 3x which is a bit strange ;)
But I believe that you can get nearly 3x. I'm a bit surprised by the difference between RF and ET (though error bars again?).
I haven't looked at the code but it sounds like you're monkey patching? Surely joblib could add the proper interface?

@jcrist

This comment has been minimized.

Member

jcrist commented Apr 26, 2017

To my naive understanding this seems pretty great?

Yeah, not sure why previous numbers were less great. Might have grabbed the wrong commit when running them.

MIN_IDEAL_BATCH_DURATION = 0.2
MAX_IDEAL_BATCH_DURATION = 1.0
MIN_IDEAL_BATCH_DURATION = 0.5
MAX_IDEAL_BATCH_DURATION = 5.0

This comment has been minimized.

@mrocklin

mrocklin Apr 26, 2017

Member

How did you come by these values?

This comment has been minimized.

@jcrist

jcrist Apr 26, 2017

Member

Oop, leftover from debugging.

self.client = Client(scheduler_host, loop=loop)
if scatter is not None:
# Keep a reference to the scattered data to keep the ids the same
self._scatter = list(scatter)

This comment has been minimized.

@mrocklin

mrocklin Apr 26, 2017

Member

What happens if someone gives us a single numpy array?

This comment has been minimized.

@jcrist

jcrist Apr 26, 2017

Member

Then it's bad. We should only accept lists here because technically we can pre-scatter any object so it'd be hard to know if someone wanted us to scatter the object or elements inside it. Will fix to error if not list/tuple.

return Batch(tasks), args2
def apply_async(self, func, callback=None):
key = '%s-%s' % (joblib_funcname(func), uuid4().hex)

This comment has been minimized.

@mrocklin

mrocklin Apr 26, 2017

Member

Should this add "%s-batch-%s" or something to signify that this is more than one function call?

This comment has been minimized.

@jcrist

jcrist Apr 26, 2017

Member

Sure.

@jcrist

This comment has been minimized.

Member

jcrist commented Apr 26, 2017

I haven't looked at the code but it sounds like you're monkey patching? Surely joblib could add the proper interface?

Was just monkey-patching in the benchmark. Scikit-learn hardcodes the backend into the Parallel calls for these classes - I have a patch for scikit-learn that checks for a global backend setting first, and if unset then uses the locally provided default. Looks like:

Parallel(n_jobs=self.n_jobs, verbose=self.verbose,
         backend=get_joblib_backend(default="threading"))(...)

There might be a better way though.

jcrist added some commits Apr 10, 2017

Add `scatter` option to joblib backend
Allows to prescatter data to the cluster. For large arguments (big
arrays, etc...) that are used in more than one task, this can be more
efficient as it avoids serializing the data for every task.

Also fix to ensure distributed backend is an instance of the
`ParallelBackendBase` for both scikit-learn and joblib modules.
@jcrist

This comment has been minimized.

Member

jcrist commented Apr 27, 2017

I think this should be good to go.

sols = [func(*args, **kwargs) for func, args, kwargs in tasks]
results = Parallel()(tasks)
ba.terminate()

This comment has been minimized.

@mrocklin

mrocklin Apr 27, 2017

Member

Why is this call necessary? Should this be called as part of __exit__?

This comment has been minimized.

@jcrist

jcrist Apr 27, 2017

Member

Backends aren't contextmanagers, there is no __exit__ to the backend. The call was already there for the other tests (e.g. https://github.com/dask/distributed/blob/master/distributed/tests/test_joblib.py#L42), I moved it to use terminate which is more standard to the other joblib backends.

@@ -45,42 +46,108 @@ def joblib_funcname(x):
return funcname(x)
class Batch(object):

This comment has been minimized.

@mrocklin

mrocklin Apr 27, 2017

Member

Will users ever see this object? If so should it get a one-line docstring?

This comment has been minimized.

@jcrist

jcrist Apr 27, 2017

Member

No, this is completely internal.

@mrocklin

This comment has been minimized.

Member

mrocklin commented Apr 27, 2017

+1 from me

@jcrist jcrist merged commit 8140a74 into dask:master Apr 27, 2017

2 checks passed

continuous-integration/appveyor/pr AppVeyor build succeeded
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details

@jcrist jcrist deleted the jcrist:joblib-scatter branch Apr 27, 2017

@GaelVaroquaux

This comment has been minimized.

GaelVaroquaux commented Apr 27, 2017

@amueller

This comment has been minimized.

amueller commented May 17, 2017

@jcrist can you raise a PR with the patch to sklearn?

@jcrist

This comment has been minimized.

Member

jcrist commented May 17, 2017

I could, but wanted to wait until a decision on how best to override scikit-learn estimators was reached (see relevant issue: scikit-learn/scikit-learn#8804).

@jcrist

This comment has been minimized.

Member

jcrist commented May 23, 2017

@jcrist can you raise a PR with the patch to sklearn?

See simple patch to scikit-learn here or more complicated but probably better proposal to joblib here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment