Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use dask_ml GridSearchCV on HPC cluster client #3463

Open
ArthurVINCENT opened this issue Feb 10, 2020 · 14 comments
Open

Use dask_ml GridSearchCV on HPC cluster client #3463

ArthurVINCENT opened this issue Feb 10, 2020 · 14 comments

Comments

@ArthurVINCENT
Copy link

ArthurVINCENT commented Feb 10, 2020

Hi,
I'm working on a large data-set and I try to find my model hyperparameters thanks to GridSearchCV from dask_ml as presented in the dask_ml tutorial

here is my python code :

import pandas as pd
from sklearn.model_selection import GroupKFold
from dask_ml.model_selection import GridSearchCV
from dask.distributed import Client, progress
from dask_jobqueue import PBSCluster
from sklearn.externals import joblib

dataset_path = "/data_test.sqlite"
data_field = "code"
layer_name = "data_test"
cv_folds = 5
cv_parameters = {'n_estimators': [50, 100, 150]}

features_labels = [
    'sentinel2_ndvi_20180101', 'sentinel2_ndvi_20180111',
    'sentinel2_ndvi_20180121', 'sentinel2_ndvi_20180131',
    'sentinel2_ndvi_20180210', 'sentinel2_ndvi_20180220',
    'sentinel2_ndvi_20180302', 'sentinel2_ndvi_20180312',
    'sentinel2_ndvi_20180322', 'sentinel2_ndvi_20180401',
    'sentinel2_ndvi_20180411', 'sentinel2_ndvi_20180421',
    'sentinel2_ndvi_20180501', 'sentinel2_ndvi_20180511',
    'sentinel2_ndvi_20180521', 'sentinel2_ndvi_20180531',
    'sentinel2_ndvi_20180610', 'sentinel2_ndvi_20180620',
    'sentinel2_ndvi_20180630', 'sentinel2_ndvi_20180710',
    'sentinel2_ndvi_20180720', 'sentinel2_ndvi_20180730',
    'sentinel2_ndvi_20180809', 'sentinel2_ndvi_20180819',
    'sentinel2_ndvi_20180829', 'sentinel2_ndvi_20180908',
    'sentinel2_ndvi_20180918', 'sentinel2_ndvi_20180928',
    'sentinel2_ndvi_20181008', 'sentinel2_ndvi_20181018',
    'sentinel2_ndvi_20181028', 'sentinel2_ndvi_20181107',
    'sentinel2_ndvi_20181117', 'sentinel2_ndvi_20181127',
    'sentinel2_ndvi_20181207', 'sentinel2_ndvi_20181217',
    'sentinel2_ndvi_20181227'
]

conn = sqlite3.connect(dataset_path)

df_features = pd.read_sql_query("select {} from {}".format(",".join(features_labels), layer_name), conn)
df_labels = pd.read_sql_query("select {} from {}".format(data_field, layer_name), conn)
df_groups = pd.read_sql_query("select {} from {}".format("originfid", layer_name), conn)
splitter = list(GroupKFold(n_splits=cv_folds).split(df_features,
                                                    df_labels,
                                                    df_groups))
clf = RandomForestClassifier()
clf = GridSearchCV(clf,
                   cv_parameters,
                   cv=splitter,
                   return_train_score=True)

cluster = PBSCluster(cores=12,
                     memory="60GB")
cluster.adapt(minimum_jobs=5, maximum_jobs=10)
client = Client(cluster)
with joblib.parallel_backend('dask'):
    clf.fit(df_features, df_labels)

then dask asked me to use client.scatter to deploy data on workers as the following :

(_CVIterableWrapper(cv=[(array([     3,      4, .. ... e, False, True)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  % (format_bytes(len(b)), s)

But if I use the backend like in the tutorial (with scatter) :

with joblib.parallel_backend('dask', scatter=[df_features, df_labels]):
    clf.fit(features_values, labels_values)

then no workers can be found :

distributed.core - ERROR - No workers found
Traceback (most recent call last):
  File "/.../lib/python3.6/site-packages/distributed/core.py", line 412, in handle_comm
    result = await result
  File "/.../lib/python3.6/site-packages/distributed/scheduler.py", line 2703, in scatter
    raise gen.TimeoutError("No workers found")
tornado.util.TimeoutError: No workers found

Any suggestions will be welcome.

Note : If I use a smaller dataset, everything works.

@TomAugspurger
Copy link
Member

Thanks for the report. I'd need to double check, but I don't think you need to use both joblib.parallel_backend and dask_ml.model_selection.GridSearchCV. Dask-ML's GridSearchCV doesn't use joblib.

Do you have a minimal example that produces the warning about scattering large objects?

I notice that you're using Adaptive. If you switch to a fixed set of workers and wait for them before starting the joblib.parallel_backend, do you see the exception about "No workers found"?

@ArthurVINCENT
Copy link
Author

Thanks you for you quick reaction.

Do you have a minimal example that produces the warning about scattering large objects?

you can download a minimal example here 130Mo.

Also, the issue occurs even if I set the number of workers by hand cluster.scale(5).

I updated the code snippet to fit the provided data-set.

@TomAugspurger
Copy link
Member

http://matthewrocklin.com/blog/work/2018/02/28/minimal-bug-reports gives some recommendations are writing minimal bug reports. Ideally, it would be a small snippet with extraneous details removed. Does the data need to be read from sqlite to reproduce the issue, or can you use in-memory dataframes?

Also, the issue occurs even if I set the number of workers by hand cluster.scale(5).

Even if you wait for the works to arrive with client.wait_for_workers(5)?

@ArthurVINCENT
Copy link
Author

thanks you for sharing these recommendations.

here is the minimal code to produce the issue :

columns = [str(col) for col in range(40)]

cv_folds = 5
cv_parameters = {'n_estimators': [50, 100, 150]}

df = pd.DataFrame(index=range(600000), columns=columns)
df = df.fillna(0)
df[columns[0]] = np.random.randint(1, 6, df.shape[0])
df[columns[1]] = np.random.randint(1, 6, df.shape[0])

labels_values = df[columns[0]]
groups = df[columns[1]]
features_values = df[columns[2:-1]]

splitter = list(
    GroupKFold(n_splits=cv_folds).split(features_values, labels_values,
                                        groups))
clf = RandomForestClassifier()
clf = GridSearchCV(clf, cv_parameters, cv=splitter, return_train_score=True)

cluster = PBSCluster(cores=12, memory="60GB")

cluster.scale(5)
client = Client(cluster)
client.wait_for_workers(5)

with joblib.parallel_backend('dask', scatter=[features_values, labels_values]):
    clf.fit(features_values, labels_values)

Indeed, thanks to client.wait_for_workers(5), workers are found.

@TomAugspurger
Copy link
Member

Indeed, thanks to client.wait_for_workers(5), workers are found.

Good, so we're just deailing with the warning about scattering large bits of data.

I don't have access to an HPC cluster. Are you able to swap out the PBCluster for a LocalCluster and still reproduce the problem?

This is starting to sound like dask/dask-ml#516. One problem with pre-scattering is that the large pieces of data in the graph are generated by dask-ml itself, not the user. So we need to make sure that the scattered pieces are just keys to slices of the data, rather than concrete ndarrays.

@ArthurVINCENT
Copy link
Author

by replacing the PBSCluster by LocalCluster then the following error occurs :

File ".../lib/python3.6/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File ".../lib/python3.6/multiprocessing/process.py", line 105, in start
    self._popen = self._Popen(self)
  File ".../lib/python3.6/multiprocessing/context.py", line 291, in _Popen
    return Popen(process_obj)
  File ".../lib/python3.6/multiprocessing/popen_forkserver.py", line 35, in __init__
    super().__init__(process_obj)
  File ".../lib/python3.6/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File ".../lib/python3.6/multiprocessing/popen_forkserver.py", line 42, in _launch
    prep_data = spawn.get_preparation_data(process_obj._name)
  File ".../lib/python3.6/multiprocessing/spawn.py", line 143, in get_preparation_data
    _check_not_importing_main()
  File ".../lib/python3.6/multiprocessing/spawn.py", line 136, in _check_not_importing_main
    is not going to be frozen to produce an executable.''')
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

and the program still stuck.

@mrocklin
Copy link
Member

mrocklin commented Feb 14, 2020 via email

@mrocklin
Copy link
Member

mrocklin commented Feb 14, 2020 via email

@ArthurVINCENT
Copy link
Author

thanks you, @mrocklin. the issue can be reproduce with the LocalCluster(processes=False,...)

@mrocklin
Copy link
Member

That's very surprising. I encourage you to reduce that down to a minimal reproducer and share it here if you have the time.

@TomAugspurger
Copy link
Member

@ArthurVINCENT have you been able to look at this anymore? I'm not seeing any issues with the following

import joblib
import numpy as np
import pandas as pd
from sklearn.model_selection import GroupKFold
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
from distributed import LocalCluster, Client

if __name__ == "__main__":
    columns = [str(col) for col in range(40)]

    cv_folds = 5
    cv_parameters = {'n_estimators': [50, 100, 150]}

    df = pd.DataFrame(index=range(60000), columns=columns)
    df = df.fillna(0)
    df[columns[0]] = np.random.randint(1, 6, df.shape[0])
    df[columns[1]] = np.random.randint(1, 6, df.shape[0])

    labels_values = df[columns[0]]
    groups = df[columns[1]]
    features_values = df[columns[2:-1]]

    splitter = list(
        GroupKFold(n_splits=cv_folds).split(features_values, labels_values,
                                            groups))
    clf = RandomForestClassifier()
    clf = GridSearchCV(clf, cv_parameters, cv=splitter, return_train_score=True)

    cluster = LocalCluster()
    client = Client(cluster)

    with joblib.parallel_backend('dask', scatter=[features_values, labels_values]):
        clf.fit(features_values, labels_values)

@ArthurVINCENT
Copy link
Author

actually the issue/warning appear if you replace the import

from sklearn.model_selection import GridSearchCV

by

from dask_ml.model_selection import GridSearchCV

Is there any reasons of using sklearn instead of dask_ml ?

I will launch on my huge data-set and I will keep you informed if everything works.

@TomAugspurger
Copy link
Member

Is there any reasons of using sklearn instead of dask_ml ?

That's documented in https://ml.dask.org/hyper-parameter-search.html#hyperparameter-drop-in.

@ArthurVINCENT
Copy link
Author

it seems to work as expect on my large dataset. So, I will use GridSearchCV coming from sklearn instead of dask_ml.

Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants