Skip to content
This repository has been archived by the owner on Oct 14, 2018. It is now read-only.

Multi-threading or -processing doesn't work for simple sklearn Pipeline #70

Closed
mattvan83 opened this issue Mar 15, 2018 · 12 comments
Closed

Comments

@mattvan83
Copy link

Hello,

I am in trouble using this nice tool dask-searchcv on simple Pipeline.

Given the fact I tried it on simple sklearn Pipeline (StandardScaler + SVC_rbf):

Pipeline(memory=None,
     steps=[('scaler', StandardScaler(copy=True, with_mean=True, with_std=True)), ('svc', SVC(C=1.0, cache_size=200, class_weight=None, coef0=0.0,
  decision_function_shape='ovr', degree=3, gamma='auto', kernel='rbf',
  max_iter=-1, probability=False, random_state=42, shrinking=True,
  tol=0.001, verbose=False))])

with n_jobs=-1 and scheduler="threading" or scheduler="multiprocessing" and search grid on C and gamma parameters, in all time I got only one process used (on my 16 available).

Whereas when I used dask-searchcv on composed Pipeline including moreover PCA, I got as expected one process used at 1600 % CPU or 16 processes launched.

I don't understand why dask-searchcv multi-threading or -processing doesn't work for the first case...

Any explanation ?

Matt

@TomAugspurger
Copy link
Member

Could you post a full example?

With

from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.datasets import make_classification
from sklearn.pipeline import make_pipeline
import dask_ml.model_selection

param_grid = {
    'svc__C': [0.1, 1, 10],
    'svc__gamma': [0.01, 0.1, 0.5],
}

X, y = make_classification(n_samples=10_000)

pipe = make_pipeline(
    StandardScaler(),
    SVC()
)
gs = dask_ml.model_selection.GridSearchCV(pipe, param_grid, scheduler='multiprocessing')

With

%time gs.fit(X, y)
# 20.3 s

I observe parallelism. Compare with

%time gs.set_params(n_jobs=1).fit(X, y, )
# 1min 5s

@mattvan83
Copy link
Author

I tried same experience with code below:

from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.datasets import make_classification
from sklearn.pipeline import make_pipeline
import dask_searchcv as dcv
from time import time

param_grid = {
    'svc__C': [0.1, 1, 10],
    'svc__gamma': [0.01, 0.1, 0.5],
}

X, y = make_classification(n_samples=10_000)

pipe = make_pipeline(
    StandardScaler(),
    SVC()
)
gs = dcv.GridSearchCV(pipe, param_grid, scheduler='multiprocessing')

start = time()
gs.fit(X, y)
print("Time:", (time() - start))

I got: Time: 10.385440349578857

and with:

from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.datasets import make_classification
from sklearn.pipeline import make_pipeline
import dask_searchcv as dcv
from time import time

param_grid = {
    'svc__C': [0.1, 1, 10],
    'svc__gamma': [0.01, 0.1, 0.5],
}

X, y = make_classification(n_samples=10_000)

pipe = make_pipeline(
    StandardScaler(),
    SVC()
)
gs = dcv.GridSearchCV(pipe, param_grid, scheduler='multiprocessing')

start = time()
gs.set_params(n_jobs=1).fit(X, y, )
print("Time:", (time() - start))

I got: Time: 65.83787727355957

I guess this works, but actually not with my own code or data. So I tried to adapt this example to my case and reached what I got with code and data:

from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.datasets import make_classification
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import LeaveOneOut
import dask_searchcv as dcv
from time import time
import numpy as np

C_range = np.logspace(-25, 15, 41, base = 2)
Gamma_range = np.logspace(-15, 5, 21, base = 2)

param_grid = {
    'svc__C': C_range,
    'svc__gamma': Gamma_range,
}

X, y = make_classification(n_samples=10_000)

pipe = make_pipeline(
    StandardScaler(),
    SVC()
)
gs = dcv.GridSearchCV(pipe, param_grid, cv=LeaveOneOut(), scoring="accuracy",
                        n_jobs=-1, scheduler="threading")

start = time()
gs.fit(X, y)
print("Time:", (time() - start))

Executing this code, I can see that with top command only one process is launched with increasing %Mem used.

@mattvan83
Copy link
Author

I think I identified what was the source of error taking your first example and running it with Leave-One-Out cross-validation in grid search loop:


from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.datasets import make_classification
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import LeaveOneOut
import dask_searchcv as dcv
from time import time
import numpy as np

param_grid = {
    'svc__C': [0.1, 1, 10],
    'svc__gamma': [0.01, 0.1, 0.5],
}

X, y = make_classification(n_samples=10_000)

pipe = make_pipeline(
    StandardScaler(),
    SVC()
)
gs = dcv.GridSearchCV(pipe, param_grid, cv=LeaveOneOut(), scheduler='threading')

start = time()
gs.fit(X, y)
print("Time:", (time() - start))

LOOCV seems to cause this effect.

@mattvan83
Copy link
Author

Another thing causing this effect: the use of numpy logspace to define C and gamma range

from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.datasets import make_classification
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import LeaveOneOut
import dask_searchcv as dcv
from time import time
import numpy as np

C_range = np.logspace(-25, 15, 41, base = 2)
Gamma_range = np.logspace(-15, 5, 21, base = 2)

param_grid = {
    'svc__C': C_range,
    'svc__gamma': Gamma_range,
}

X, y = make_classification(n_samples=10_000)

pipe = make_pipeline(
    StandardScaler(),
    PCA(),
    SVC()
)
gs = dcv.GridSearchCV(pipe, param_grid, cv=10, scheduler="threading")

start = time()
gs.fit(X, y)
print("Time:", (time() - start))

Only one process up to 100% CPU. If you simplify param_grid with simple list values as you suggest it works !

@TomAugspurger
Copy link
Member

That's awfully strange. If you're comfortable trying to debug this, let me know.

Otherwise, I'm busy through the end of this week and most of next week on other projects. Will look afterwards.

@mattvan83
Copy link
Author

I am novice in Python programming, so unfortunately I am not comfortable to debug this.

I let you look at this whenever you could or anyone that feels comfortable with this.

@mattvan83
Copy link
Author

It seems to be due to incompatibility of last version of distributed 1.21.4 or 1.21.3 with last version of sklearn 0.19.1. When downgrading to distributed 1.20.2, it seems to work.

@TomAugspurger
Copy link
Member

TomAugspurger commented Mar 22, 2018 via email

@lionfish0
Copy link

I wonder if this is linked to the problem I've got.
I've found GridSearchCV seems to just run locally (ignoring the Client scheduler). I tested with delayed immediately below, and that does still use the remote scheduler successfully.

from sklearn.datasets import load_digits
from sklearn.svm import SVC

# Fit with dask-searchcv
from dask.distributed import Client
#renamed just to make sure I wasn't using sklearn!
from dask_searchcv import GridSearchCV as DaskGridSearchCV

url = '34.242.217.81'
client = Client(url+':8786')

param_space = {'C': [1e-4, 1, 1e4],
               'gamma': [1e-3, 1, 1e3],
               'class_weight': [None, 'balanced']}

model = SVC(kernel='rbf')

digits = load_digits()

search = DaskGridSearchCV(model, param_space, cv=3)
search.fit(digits.data, digits.target)

#This bit works:
from dask.distributed import Client
from dask import compute, delayed
import numpy as np
import GPy

def test(X,Y):
    k = GPy.kern.RBF(1)
    m = GPy.models.GPRegression(X,Y,k)
    return m.predict(X)

X = np.arange(0,10)[:,None]
Y = np.sin(X)
values = [delayed(test)(X,Y)]
results = compute(*values)

@lionfish0
Copy link

also - I found I couldn't revert to older distributed, as it causes an error (you'd probably also have to revert dask etc).

@TomAugspurger
Copy link
Member

I think this is the same as dask/dask-ml#249. Tracking it there.

@TomAugspurger
Copy link
Member

This was fixed over in dask/dask-ml#260

It'll be included in the next release of dask-ml which is probably sometime in the next week or two.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants