Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Use dask.delayed within fit #730

Merged
merged 34 commits into from Aug 29, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
8cb1fae
[WIP] Use dask.delayed within fit
mrocklin Jul 15, 2018
2e1b373
add dask[delayed] to ci configurations
mrocklin Jul 16, 2018
51cf0cd
Fixup fail side
TomAugspurger Jul 25, 2018
d1d10a0
Fixed return shape
TomAugspurger Aug 6, 2018
7e7e68f
Reuse dask-ml
TomAugspurger Aug 7, 2018
d06edd6
configurable delayed
TomAugspurger Aug 7, 2018
6d370ec
typo
TomAugspurger Aug 7, 2018
8808c77
Merge remote-tracking branch 'upstream/development' into mrocklin-dask
TomAugspurger Aug 7, 2018
5d21024
toggle approach
TomAugspurger Aug 8, 2018
439ae5c
wip
TomAugspurger Aug 8, 2018
d9aca85
chunking
TomAugspurger Aug 8, 2018
b919bed
assign
TomAugspurger Aug 8, 2018
54011ff
push debugging code
TomAugspurger Aug 9, 2018
2050b2d
some cleanup, tests
TomAugspurger Aug 15, 2018
2577d08
some cleanup, tests
TomAugspurger Aug 15, 2018
36b2d23
Docs
TomAugspurger Aug 15, 2018
3ba6082
dependencies
TomAugspurger Aug 15, 2018
c378519
Bump dask-ml version
TomAugspurger Aug 15, 2018
2eb71dd
debugging CI
TomAugspurger Aug 16, 2018
4ca3b95
debugging CI
TomAugspurger Aug 16, 2018
ef325b4
debugging
TomAugspurger Aug 16, 2018
a3102ac
Handle training errors
TomAugspurger Aug 21, 2018
3144977
Try dask-ml master
TomAugspurger Aug 21, 2018
a80888f
test
TomAugspurger Aug 21, 2018
6a85646
Trigger CI
TomAugspurger Aug 21, 2018
22226f2
Trigger CI
TomAugspurger Aug 21, 2018
84b4474
remove pythonhashseed
TomAugspurger Aug 21, 2018
ea3a1bd
print debug
TomAugspurger Aug 22, 2018
d9b4d9a
Try single-threaded
TomAugspurger Aug 29, 2018
4342853
smaller
TomAugspurger Aug 29, 2018
d279253
Handle failure on master
TomAugspurger Aug 29, 2018
ac6b770
skip that test
TomAugspurger Aug 29, 2018
b3342fb
Install from git
TomAugspurger Aug 29, 2018
3d2fcd1
Doc / cleanup
TomAugspurger Aug 29, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 12 additions & 10 deletions tpot/base.py
Expand Up @@ -46,6 +46,7 @@
from tqdm import tqdm
from copy import copy, deepcopy

import dask
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add installation of dask into ci/.travis_install.sh and .appveyor.yml for unit tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Though, just to reiterate, I'm not trying to get tests to work here at all. This is only up here for conversation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, thanks. But it seems that it passed almost all the unit tests. Great!

from sklearn.base import BaseEstimator
from sklearn.utils import check_X_y
from sklearn.externals.joblib import Parallel, delayed, Memory
Expand Down Expand Up @@ -526,9 +527,9 @@ def fit(self, features, target, sample_weight=None, groups=None):
target: array-like {n_samples}
List of class labels for prediction
sample_weight: array-like {n_samples}, optional
Per-sample weights. Higher weights indicate more importance. If specified,
sample_weight will be passed to any pipeline element whose fit() function accepts
a sample_weight argument. By default, using sample_weight does not affect tpot's
Per-sample weights. Higher weights indicate more importance. If specified,
sample_weight will be passed to any pipeline element whose fit() function accepts
a sample_weight argument. By default, using sample_weight does not affect tpot's
scoring functions, which determine preferences between pipelines.
groups: array-like, with shape {n_samples, }, optional
Group labels for the samples used when performing cross-validation.
Expand Down Expand Up @@ -1154,15 +1155,15 @@ def _evaluate_individuals(self, individuals, features, target, sample_weight=Non
scoring_function=self.scoring_function,
sample_weight=sample_weight,
groups=groups,
timeout=self.max_eval_time_seconds
timeout=self.max_eval_time_seconds,
)

result_score_list = []
# Don't use parallelization if n_jobs==1
if self.n_jobs == 1:
for sklearn_pipeline in sklearn_pipeline_list:
self._stop_by_max_time_mins()
val = partial_wrapped_cross_val_score(sklearn_pipeline=sklearn_pipeline)
val = partial_wrapped_cross_val_score(sklearn_pipeline=sklearn_pipeline, delayed=lambda x: x)
result_score_list = self._update_val(val, result_score_list)
else:
# chunk size for pbar update
Expand All @@ -1171,12 +1172,13 @@ def _evaluate_individuals(self, individuals, features, target, sample_weight=Non
for chunk_idx in range(0, len(sklearn_pipeline_list), chunk_size):
self._stop_by_max_time_mins()
parallel = Parallel(n_jobs=self.n_jobs, verbose=0, pre_dispatch='2*n_jobs')
tmp_result_scores = parallel(delayed(partial_wrapped_cross_val_score)(sklearn_pipeline=sklearn_pipeline)
for sklearn_pipeline in sklearn_pipeline_list[chunk_idx:chunk_idx + chunk_size])
# update pbar
for val in tmp_result_scores:
result_score_list = self._update_val(val, result_score_list)
tmp_result_scores = [partial_wrapped_cross_val_score(sklearn_pipeline=sklearn_pipeline,
delayed=dask.delayed)
for sklearn_pipeline in sklearn_pipeline_list[chunk_idx:chunk_idx + chunk_size]]
result_score_list.extend(tmp_result_scores)

result_score_list = dask.compute(*result_score_list)
self._update_pbar(len(result_score_list))
self._update_evaluated_individuals_(result_score_list, eval_individuals_str, operator_counts, stats_dicts)

"""Look up the operator count and cross validation score to use in the optimization"""
Expand Down
50 changes: 28 additions & 22 deletions tpot/gp_deap.py
Expand Up @@ -23,6 +23,7 @@

"""

import dask
import numpy as np
from deap import tools, gp
from inspect import isclass
Expand Down Expand Up @@ -395,7 +396,8 @@ def mutNodeReplacement(individual, pset):

@threading_timeoutable(default="Timeout")
def _wrapped_cross_val_score(sklearn_pipeline, features, target,
cv, scoring_function, sample_weight=None, groups=None):
cv, scoring_function, sample_weight=None,
groups=None, delayed=lambda x: x):
"""Fit estimator and compute scores for a given dataset split.
Parameters
----------
Expand Down Expand Up @@ -425,24 +427,28 @@ def _wrapped_cross_val_score(sklearn_pipeline, features, target,

cv = check_cv(cv, target, classifier=is_classifier(sklearn_pipeline))
cv_iter = list(cv.split(features, target, groups))
scorer = check_scoring(sklearn_pipeline, scoring=scoring_function)

try:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
scores = [_fit_and_score(estimator=clone(sklearn_pipeline),
X=features,
y=target,
scorer=scorer,
train=train,
test=test,
verbose=0,
parameters=None,
fit_params=sample_weight_dict)
for train, test in cv_iter]
CV_score = np.array(scores)[:, 0]
return np.nanmean(CV_score)
except TimeoutException:
return "Timeout"
except Exception as e:
return -float('inf')
scorer = delayed(check_scoring)(sklearn_pipeline, scoring=scoring_function)

def safe_fit_and_score(*args, **kwargs):
try:
return _fit_and_score(*args, **kwargs)
except Exception:
return -float('inf')

with warnings.catch_warnings():
warnings.simplefilter('ignore')
# TODO: dive into and delay fit/transform calls on sklearn_pipeline.steps appropriately
# This will help with shared intermediate results, profiling, etc..
# It looks like the dask_ml.model_selection._search.do_fit_and_score might have good logic here
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TomAugspurger is this task easy for you by any chance?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, can take a look today I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TPOT is a fun problem to play with :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively @jcrist if you're around and have time you've probably done this before :)

scores = [delayed(safe_fit_and_score)(estimator=delayed(clone)(sklearn_pipeline),
X=features,
y=target,
scorer=scorer,
train=train,
test=test,
verbose=0,
parameters=None,
fit_params=sample_weight_dict)
for train, test in cv_iter]
CV_score = delayed(np.array)(scores)[:, 0]
return delayed(np.nanmean)(CV_score)