Skip to content

Commit

Permalink
Merge d9aca85 into 15234fa
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed Aug 8, 2018
2 parents 15234fa + d9aca85 commit 40fc6b3
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 33 deletions.
2 changes: 1 addition & 1 deletion .appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ install:
- conda info -a
- conda create -q -n test-environment python=%PYTHON_VERSION% numpy scipy scikit-learn nose cython pandas
- activate test-environment
- pip install deap tqdm update_checker pypiwin32 stopit
- pip install deap tqdm update_checker pypiwin32 stopit dask[delayed]


test_script:
Expand Down
1 change: 1 addition & 0 deletions ci/.travis_install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pip install update_checker
pip install tqdm
pip install stopit
pip install xgboost
pip install dask[delayed]

if [[ "$COVERAGE" == "true" ]]; then
pip install coverage coveralls
Expand Down
47 changes: 35 additions & 12 deletions tpot/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ def __init__(self, generations=100, population_size=100, offspring_size=None,
random_state=None, config_dict=None,
warm_start=False, memory=None,
periodic_checkpoint_folder=None, early_stop=None,
verbosity=0, disable_update_check=False):
verbosity=0, disable_update_check=False,
use_dask=False):
"""Set up the genetic programming algorithm for pipeline optimization.
Parameters
Expand Down Expand Up @@ -286,6 +287,7 @@ def __init__(self, generations=100, population_size=100, offspring_size=None,
self._last_optimized_pareto_front_n_gens = 0
self.memory = memory
self._memory = None # initial Memory setting for sklearn pipeline
self.use_dask = use_dask

# dont save periodic pipelines more often than this
self._output_best_pipeline_period_seconds = 30
Expand Down Expand Up @@ -1190,28 +1192,49 @@ def _evaluate_individuals(self, individuals, features, target, sample_weight=Non
scoring_function=self.scoring_function,
sample_weight=sample_weight,
groups=groups,
timeout=self.max_eval_time_seconds
timeout=self.max_eval_time_seconds,
use_dask=self.use_dask,
)

result_score_list = []
# Don't use parallelization if n_jobs==1
if self.n_jobs == 1:
for sklearn_pipeline in sklearn_pipeline_list:
self._stop_by_max_time_mins()
val = partial_wrapped_cross_val_score(sklearn_pipeline=sklearn_pipeline)
val = partial_wrapped_cross_val_score(sklearn_pipeline=sklearn_pipeline, use_dask=self.use_dask)
result_score_list = self._update_val(val, result_score_list)
else:
# chunk size for pbar update
# chunk size is min of cpu_count * 2 and n_jobs * 4
chunk_size = min(cpu_count()*2, self.n_jobs*4)
for chunk_idx in range(0, len(sklearn_pipeline_list), chunk_size):
self._stop_by_max_time_mins()
parallel = Parallel(n_jobs=self.n_jobs, verbose=0, pre_dispatch='2*n_jobs')
tmp_result_scores = parallel(delayed(partial_wrapped_cross_val_score)(sklearn_pipeline=sklearn_pipeline)
for sklearn_pipeline in sklearn_pipeline_list[chunk_idx:chunk_idx + chunk_size])
# update pbar
for val in tmp_result_scores:
result_score_list = self._update_val(val, result_score_list)
if self.use_dask:
import dask

result_score_list = [
partial_wrapped_cross_val_score(sklearn_pipeline=sklearn_pipeline)
for sklearn_pipeline in sklearn_pipeline_list
]

self.dask_graphs_ = result_score_list
with warnings.catch_warnings():
warnings.simplefilter('ignore')
dask.compute(*result_score_list)

self._update_pbar(len(result_score_list))

else:
chunk_size = min(cpu_count()*2, self.n_jobs*4)

for chunk_idx in range(0, len(sklearn_pipeline_list), chunk_size):
self._stop_by_max_time_mins()

if not self.use_dask:
parallel = Parallel(n_jobs=self.n_jobs, verbose=0, pre_dispatch='2*n_jobs')
tmp_result_scores = parallel(
delayed(partial_wrapped_cross_val_score)(sklearn_pipeline=sklearn_pipeline)
for sklearn_pipeline in sklearn_pipeline_list[chunk_idx:chunk_idx + chunk_size])
# update pbar
for val in tmp_result_scores:
result_score_list = self._update_val(val, result_score_list)

self._update_evaluated_individuals_(result_score_list, eval_individuals_str, operator_counts, stats_dicts)

Expand Down
71 changes: 51 additions & 20 deletions tpot/gp_deap.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"""

import dask
import numpy as np
from deap import tools, gp
from inspect import isclass
Expand Down Expand Up @@ -395,8 +396,10 @@ def mutNodeReplacement(individual, pset):

@threading_timeoutable(default="Timeout")
def _wrapped_cross_val_score(sklearn_pipeline, features, target,
cv, scoring_function, sample_weight=None, groups=None):
cv, scoring_function, sample_weight=None,
groups=None, use_dask=False):
"""Fit estimator and compute scores for a given dataset split.
Parameters
----------
sklearn_pipeline : pipeline object implementing 'fit'
Expand All @@ -418,6 +421,8 @@ def _wrapped_cross_val_score(sklearn_pipeline, features, target,
List of sample weights to balance (or un-balanace) the dataset target as needed
groups: array-like {n_samples, }, optional
Group labels for the samples used while splitting the dataset into train/test set
use_dask : bool, default False
Whether to use dask
"""
sample_weight_dict = set_sample_weight(sklearn_pipeline.steps, sample_weight)

Expand All @@ -427,22 +432,48 @@ def _wrapped_cross_val_score(sklearn_pipeline, features, target,
cv_iter = list(cv.split(features, target, groups))
scorer = check_scoring(sklearn_pipeline, scoring=scoring_function)

try:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
scores = [_fit_and_score(estimator=clone(sklearn_pipeline),
X=features,
y=target,
scorer=scorer,
train=train,
test=test,
verbose=0,
parameters=None,
fit_params=sample_weight_dict)
for train, test in cv_iter]
CV_score = np.array(scores)[:, 0]
return np.nanmean(CV_score)
except TimeoutException:
return "Timeout"
except Exception as e:
return -float('inf')
if use_dask:
try:
import dask_ml.model_selection
import dask
from dask.delayed import Delayed
except ImportError:
msg = "'use_dask' requires the optional dask and dask-ml depedencies."
raise ImportError(msg)

dsk, keys, n_splits = dask_ml.model_selection._search.build_graph(
estimator=sklearn_pipeline,
cv=cv,
scorer=scorer,
candidate_params=[{}],
X=features,
y=target,
groups=groups,
fit_params=sample_weight_dict,
refit=False,
error_score=float('-inf'),
)
cv_results = Delayed(keys[0], dsk)
scores = [cv_results['split{}_test_score'.format(i)] for i in range(n_splits)]
CV_score = dask.delayed(np.array)(scores)[:, 0]
return dask.delayed(np.nanmean)(CV_score)
else:
try:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
scores = [_fit_and_score(estimator=clone(sklearn_pipeline),
X=features,
y=target,
scorer=scorer,
train=train,
test=test,
verbose=0,
parameters=None,
fit_params=sample_weight_dict)
for train, test in cv_iter]
CV_score = np.array(scores)[:, 0]
return np.nanmean(CV_score)
except TimeoutException:
return "Timeout"
except Exception as e:
return -float('inf')

0 comments on commit 40fc6b3

Please sign in to comment.