Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Joblib frontend #211

Merged
merged 3 commits into from
Apr 7, 2016
Merged

Add Joblib frontend #211

merged 3 commits into from
Apr 7, 2016

Conversation

mrocklin
Copy link
Member

Fixes #124 cc @ogrisel

It looks like there are issues with cleanly shutting down the executor. I'll look at these in a bit. This also needs docs.

Test cases to ensure proper Joblib operation are welcome.

@mrocklin
Copy link
Member Author

@ogrisel it appears that terminate gets called after a Parallel() call, rather than when we leave the context manager. Where should I tear down the Executor?

This appears when I do something like the following:

with parallel_backend('distributed', ...):
    result = Parallel()(delayed(func)(arg) for arg in seq)
    result2 = Parallel()(delayed(func)(arg) for arg in seq2)

@mrocklin mrocklin force-pushed the joblib branch 2 times, most recently from 5c7091a to 3926d15 Compare April 5, 2016 01:08
@mrocklin
Copy link
Member Author

mrocklin commented Apr 5, 2016

OK, I realize now that terminate is called after every call to Parallel. I guess I'm looking for something to be called here:

@contextmanager
def parallel_backend(backend, n_jobs=-1, **backend_params):
    if isinstance(backend, _basestring):
        backend = BACKENDS[backend](**backend_params)
    old_backend_and_jobs = getattr(_backend, 'backend_and_jobs', None)
    try:
        _backend.backend_and_jobs = (backend, n_jobs)
        yield
    finally:
        # --WANT TO CALL CLEANUP CODE HERE--
        if old_backend_and_jobs is None:
            if getattr(_backend, 'backend_and_jobs', None) is not None:
                del _backend.backend_and_jobs
        else:
            _backend.backend_and_jobs = old_backend_and_jobs

However for the time being I don't particularly need this. The executor should clean itself up on garbage collection in the common case.

@mrocklin mrocklin changed the title [WIP] Add Joblib frontend Add Joblib frontend Apr 5, 2016
@mrocklin
Copy link
Member Author

mrocklin commented Apr 5, 2016

OK, this seems fine to me as a first step. @ogrisel I've stolen your example from https://github.com/ogrisel/docker-distributed/blob/fec913938a67902a6477bb367dd76433babc3b47/examples/sklearn_parameter_search_joblib.ipynb and put it into the documentation. I've also cleaned up a couple things in the web UI to improve diagnostic visualization (I see from your commit history that you may have been playing with this.)

Some thoughts for the future: It would be good to benchmark real applications using this. I suspect that we're doing some things inefficiently. In particular I'm curious about the following:

  1. The cost of sending the data each time, rather than referring to data already on the cluster. This might be easier to support if joblib used apply_async(func, *args, **kwargs) rather than apply_async(closure).
  2. If AutoBatching is worth it for the distributed scheduler and sklearn applications. How long are typical tasks?
  3. Are there nice examples of nesting joblib calls? What is the right way to handle this?
  4. Are there cases where we want to keep data on the cluster and refer to it repeatedly, rather than scatter-compute-gather each time.

@mrocklin
Copy link
Member Author

mrocklin commented Apr 5, 2016

@ogrisel is there a reason why this might fail on Python 2.7?

    from joblib._parallel_backends import ParallelBackendBase, AutoBatchingMixin
E   ImportError: No module named _parallel_backends

@mrocklin
Copy link
Member Author

mrocklin commented Apr 5, 2016

Ah, nevermind. I suspect an absolute_import issue.


def apply_async(self, func, *args, **kwargs):
callback = kwargs.pop('callback', None)
kwargs['pure'] = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this could be passed as an additional pure=False argument to DistributedBackend.__init__ to make it possible to enable pure function caching explicitly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will only become useful if we have a way to intelligently hash the batched-function-call objects. I think there are some more things that need to happen before this happens.

  1. We should probably remove the AutoBatchingMixin here
  2. Joblib should send straight functions and args, rather than a wrapped object

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, let's keep pure=False for now.

@mrocklin
Copy link
Member Author

mrocklin commented Apr 7, 2016

OK, merging for now.

@ogrisel
Copy link
Contributor

ogrisel commented Apr 8, 2016

BTW, I think your comment in #211 (comment) highlights a real bug in how joblib currently manages the backend. I opened: joblib/joblib#335 to track this (not a big deal for distributed as the GC should do the right thing anyway).

@ogrisel
Copy link
Contributor

ogrisel commented Apr 8, 2016

@mrocklin sorry for not having replied to your previous comments.

The cost of sending the data each time, rather than referring to data already on the cluster. This might be easier to support if joblib used apply_async(func, _args, *_kwargs) rather than apply_async(closure).

Yes I agree the closure-based syntax is making things complex. We could introduce a map and map_combine alternative syntax at some point. Probably not in the short term though.

If AutoBatching is worth it for the distributed scheduler and sklearn applications. How long are typical tasks?

It's hard to tell in general. AutoBatching in joblib was primarily implemented as a protection for naive users that would enable (multiprocessing-based) parallelism on a very large number of very short tasks and would see their program run much much slower than in sequential mode. For multiprocessnig, autobatching makes joblib parallel never be slower than sequential mode in practice. But it's not a magic bullet and won't guaranty linear scalability either. I don't know how useful it can be with the distributed backend.

Are there nice examples of nesting joblib calls? What is the right way to handle this?

You can call cross-validation of a RandomizedSearchCV object for instance (nested cross-validation), here is a snippet to give you the gist of it (untested):

with parallel_backend('distributed', scheduler='localhost:8786'):
    model = RandomizedSeachCV(model, params, n_iter=30, cv=5)
    scores = cross_val_score(model, X, y, cv=5)

print("CV score: %0.3f +/- %0.3f" % (np.mean(scores), np.std(scores)))

Are there cases where we want to keep data on the cluster and refer to it repeatedly, rather than scatter-compute-gather each time.

Yes ideally this is what we want but the scikit-learn API is really not helping us in that case. Note that joblib has a call_and_shelve feature that could be rendered plug-able at some point put scikit-learn does not use it at this point.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants