Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Joblib, parallel_backend, and performance #114

Closed
metasyn opened this issue Jan 10, 2018 · 5 comments
Closed

Joblib, parallel_backend, and performance #114

metasyn opened this issue Jan 10, 2018 · 5 comments

Comments

@metasyn
Copy link

metasyn commented Jan 10, 2018

hi ya'll - Matthew suggested I ask some questions here.

I'm a little confused on some results I'm seeing - and am wondering if you guys can help me figure out why I am seemingly executing grid searches in serial.

I modified dask-docker and am using containers on a docker swarm with 64GB RAM and 48 cores each.

search = RandomizedSearchCV(self.estimator, self.estimator.param_space, cv=3, n_iter=20, verbose=10)

if self.estimator.use_dask:

    address = 'tcp://dask-cluster:8786'
    c = Client(address)

    with parallel_backend('dask.distributed', scheduler_host=address,
                          scatter=[X.values, y.values]):
            search.fit(X.values, y.values)

where X is pd.DataFrame and y is pd.Series.

When I run the above with a small number of rows, I can see in the status page that lots of tasks get executed in parallel. The blocks of work on the task stream end up looking like a straight vertical since they all get dispatched nearly simultaneously.

Once I start increasing the number of rows, I seem to get more and more serial execution, where the status page shows that really one additional task gets added at a time.

In this screenshot, I started at the full dataset, and started subtracting 10k on each run to see the affect on the execution time / parallelism. For some reason occasionally, e.g. on 80k and 40k, the work gets distributed a little differently?

screen shot 2018-01-10 at 2 17 32 pm

When the number is higher, there is never more than one task active. When the number is lower, I see more (up to 4) getting triggered simultaneously.

Anyhow, my question ultimately is:

  • am I doing something in particular wong?
  • does this pattern look indicative of something I setup incorrectly?
@metasyn metasyn changed the title B Joblib, parallel_backend, and performance Jan 10, 2018
@TomAugspurger
Copy link
Member

Thanks, I'll try to debug this later today or tomorrow. Can you say a bit more about

  1. The graph backing X and y. Do they involve disk IO? Do all the workers share a file system?
  2. The estimator you're fitting? This probably isn't the issue, but it may help with debugging.

@metasyn
Copy link
Author

metasyn commented Jan 12, 2018

1.) X and y are already in memory - no additional disk IO after they've been read from std.io to a dataframe earlier in the process. The workers in the above are 10 containers spread across three physical hosts, so each worker on average shares a filesystem with 2 other workers.

2.) The estimator here was RandomForestClassifier

@TomAugspurger
Copy link
Member

Sorry for the delay on this! Let's try to narrow this down to see if it's just the scheduler that's not working properly. Could you setup a cluster / client and try out the following:

import dask.distributed
import pandas as pd
import numpy as np
import dask_ml.joblib

from sklearn.externals import joblib
from sklearn.model_selection import RandomizedSearchCV
from scipy import stats

from sklearn.base import BaseEstimator

class DummyEstimator(BaseEstimator):
    def __init__(self, parameter=None):
        self.parameter = parameter
        
    def fit(self, X, y=None):
        return self
    
    def predict(self, X):
        return np.zeros(len(X))
    
    def score(self, X, y=None):
        return 0

search = RandomizedSearchCV(DummyEstimator(), {"parameter": stats.uniform}, cv=3, n_iter=20, verbose=10)
%%time
N = 100_000
X = pd.DataFrame(np.random.randn(N, 10))
y = pd.Series(np.random.uniform(size=N))

addr = client.scheduler_info()['address'] 
with joblib.parallel_backend('dask.distributed', addr,
                             scatter=[X.values, y.values]) as pb:
    search.fit(X, y)

For me, that finishes in ~4 seconds, just using a local cluster.

@TomAugspurger
Copy link
Member

FYI, if you're able to try out and joblib master and dask.distributed master, things may have improved in the last couple weeks. Nothing specific to this issue, but we were making changes to that code and it might have fixed things magically :)

@metasyn
Copy link
Author

metasyn commented Mar 25, 2018

Hey Tom - Sorry I sorta dropped off the face of the planet. I appreciate your responses - I might not get around to re-checking this out for a bit - so I will close this issue for now :)

@metasyn metasyn closed this as completed Mar 25, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants