Skip to content

Commit

Permalink
Separate CV for Stacked Ensembler in AutoMLSearch (#1814)
Browse files Browse the repository at this point in the history
* implementation for separate cv split for ensembling

* adding release

* update release notes

* fix doc

* in progress

* fix test

* comment

* change value

* fix train best pipeline

* fix tests and simplify impl

* add arg to automl and fix tests

* fix best pipeline training

* track indices instead of datatables

* clean up code

* fix spacing

* linting

* fix tests

* use split_data util instead
  • Loading branch information
bchen1116 committed Mar 3, 2021
1 parent 79e4f75 commit c499006
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 24 deletions.
1 change: 1 addition & 0 deletions docs/source/release_notes.rst
Expand Up @@ -2,6 +2,7 @@ Release Notes
-------------
**Future Releases**
* Enhancements
* Create a separate CV split to train stacked ensembler on for AutoMLSearch :pr:`1814`
* Added a GitHub Action for Linux unit tests :pr:`1846`
* Added ``DataCheckAction`` class and ``DataCheckActionCode`` enum :pr:`1896`
* Updated ``Woodwork`` requirement to ``v0.0.10`` :pr:`1900`
Expand Down
35 changes: 27 additions & 8 deletions evalml/automl/automl_search.py
Expand Up @@ -5,6 +5,7 @@
import cloudpickle
import numpy as np
import pandas as pd
import woodwork as ww
from sklearn.model_selection import BaseCrossValidator

from .pipeline_search_plots import PipelineSearchPlots
Expand Down Expand Up @@ -94,6 +95,7 @@ def __init__(self,
problem_configuration=None,
train_best_pipeline=True,
pipeline_parameters=None,
_ensembling_split_size=0.2,
_pipelines_per_batch=5):
"""Automated pipeline search
Expand Down Expand Up @@ -170,6 +172,9 @@ def __init__(self,
train_best_pipeline (boolean): Whether or not to train the best pipeline before returning it. Defaults to True
_ensembling_split_size (float): The amount of the training data we'll set aside for training ensemble metalearners. Only used when ensembling is True.
Must be between 0 and 1, exclusive. Defaults to 0.2
_pipelines_per_batch (int): The number of pipelines to train for every batch after the first one.
The first batch will train a baseline pipline + one of each pipeline family allowed in the search.
"""
Expand Down Expand Up @@ -265,6 +270,7 @@ def __init__(self,

self.X_train = infer_feature_types(X_train)
self.y_train = infer_feature_types(y_train)
self.ensembling_indices = None

default_data_splitter = make_data_splitter(self.X_train, self.y_train, self.problem_type, self.problem_configuration,
n_splits=3, shuffle=True, random_seed=self.random_seed)
Expand All @@ -273,13 +279,6 @@ def __init__(self,
self.search_iteration_plot = None
self._interrupted = False

self._engine = SequentialEngine(self.X_train,
self.y_train,
self,
should_continue_callback=self._should_continue,
pre_evaluation_callback=self._pre_evaluation_callback,
post_evaluation_callback=self._post_evaluation_callback)

if self.allowed_pipelines is None:
logger.info("Generating pipelines to search over...")
allowed_estimators = get_estimators(self.problem_type, self.allowed_model_families)
Expand Down Expand Up @@ -309,6 +308,7 @@ def __init__(self,
ensemble_nth_batch = len(self.allowed_pipelines) + 1
num_ensemble_batches = (self.max_batches - 1) // ensemble_nth_batch
if num_ensemble_batches == 0:
run_ensembling = False
logger.warning(f"Ensembling is set to True, but max_batches is too small, so ensembling will not run. Set max_batches >= {ensemble_nth_batch + 1} to run ensembling.")
else:
logger.info(f"Ensembling will run every {ensemble_nth_batch} batches.")
Expand All @@ -318,6 +318,21 @@ def __init__(self,
num_ensemble_batches)
else:
self.max_iterations = 1 + len(self.allowed_pipelines) + (self._pipelines_per_batch * (self.max_batches - 1))
if run_ensembling:
if not (0 < _ensembling_split_size < 1):
raise ValueError(f"Ensembling split size must be between 0 and 1 exclusive, received {_ensembling_split_size}")
X_shape = ww.DataTable(np.arange(self.X_train.shape[0]))
_, ensembling_indices, _, _ = split_data(X_shape, self.y_train, problem_type=self.problem_type, test_size=_ensembling_split_size, random_seed=self.random_seed)
self.ensembling_indices = ensembling_indices.to_dataframe()[0].tolist()

self._engine = SequentialEngine(self.X_train,
self.y_train,
self.ensembling_indices,
self,
should_continue_callback=self._should_continue,
pre_evaluation_callback=self._pre_evaluation_callback,
post_evaluation_callback=self._post_evaluation_callback)

self.allowed_model_families = list(set([p.model_family for p in (self.allowed_pipelines)]))

logger.debug(f"allowed_pipelines set to {[pipeline.name for pipeline in self.allowed_pipelines]}")
Expand Down Expand Up @@ -575,7 +590,11 @@ def _find_best_pipeline(self):
if self._train_best_pipeline:
X_threshold_tuning = None
y_threshold_tuning = None
X_train, y_train = self.X_train, self.y_train
if self._best_pipeline.model_family == ModelFamily.ENSEMBLE:
X_train, y_train = self.X_train.iloc[self.ensembling_indices], self.y_train.iloc[self.ensembling_indices]
else:
X_train = self.X_train
y_train = self.y_train
if is_binary(self.problem_type) and self.objective.is_defined_for_problem_type(self.problem_type) \
and self.optimize_thresholds and self.objective.can_optimize_threshold:
X_train, X_threshold_tuning, y_train, y_threshold_tuning = split_data(X_train, y_train, self.problem_type,
Expand Down
28 changes: 15 additions & 13 deletions evalml/automl/engine/engine_base.py
Expand Up @@ -21,23 +21,25 @@
class EngineBase(ABC):
"""Base class for the engine API which handles the fitting and evaluation of pipelines during AutoML."""

def __init__(self, X_train=None, y_train=None, automl=None, should_continue_callback=None, pre_evaluation_callback=None, post_evaluation_callback=None):
def __init__(self, X_train=None, y_train=None, ensembling_indices=None, automl=None, should_continue_callback=None, pre_evaluation_callback=None, post_evaluation_callback=None):
"""Base class for the engine API which handles the fitting and evaluation of pipelines during AutoML.
Arguments:
X_train (ww.DataTable): training features
y_train (ww.DataColumn): training target
automl (AutoMLSearch): a reference to the AutoML search. Used to access configuration and by the error callback.
should_continue_callback (function): returns True if another pipeline from the list should be evaluated, False otherwise.
pre_evaluation_callback (function): optional callback invoked before pipeline evaluation.
post_evaluation_callback (function): optional callback invoked after pipeline evaluation, with args pipeline and evaluation results. Expected to return a list of pipeline IDs corresponding to each pipeline evaluation.
X_train (ww.DataTable): Training features
y_train (ww.DataColumn): Training target
ensembling_indices (list): Ensembling indices for ensembling data
automl (AutoMLSearch): A reference to the AutoML search. Used to access configuration and by the error callback.
should_continue_callback (function): Returns True if another pipeline from the list should be evaluated, False otherwise.
pre_evaluation_callback (function): Optional callback invoked before pipeline evaluation.
post_evaluation_callback (function): Optional callback invoked after pipeline evaluation, with args pipeline and evaluation results. Expected to return a list of pipeline IDs corresponding to each pipeline evaluation.
"""
self.X_train = X_train
self.y_train = y_train
self.automl = automl
self._should_continue_callback = should_continue_callback
self._pre_evaluation_callback = pre_evaluation_callback
self._post_evaluation_callback = post_evaluation_callback
self.ensembling_indices = ensembling_indices

@abstractmethod
def evaluate_batch(self, pipelines):
Expand All @@ -47,21 +49,21 @@ def evaluate_batch(self, pipelines):
pipeline_batch (list(PipelineBase)): A batch of pipelines to be fitted and evaluated
Returns:
list (int): a list of the new pipeline IDs which were created by the AutoML search.
list (int): A list of the new pipeline IDs which were created by the AutoML search.
"""

@staticmethod
def train_and_score_pipeline(pipeline, automl, full_X_train, full_y_train):
"""Given a pipeline, config and data, train and score the pipeline and return the CV or TV scores
Arguments:
pipeline (PipelineBase): the pipeline to score
automl (AutoMLSearch): the AutoML search, used to access config and for the error callback
full_X_train (ww.DataTable): training features
full_y_train (ww.DataColumn): training target
pipeline (PipelineBase): The pipeline to score
automl (AutoMLSearch): The AutoML search, used to access config and for the error callback
full_X_train (ww.DataTable): Training features
full_y_train (ww.DataColumn): Training target
Returns:
dict: a dict containing cv_score_mean, cv_scores, training_time and a cv_data structure with details.
dict: A dict containing cv_score_mean, cv_scores, training_time and a cv_data structure with details.
"""
start = time.time()
cv_data = []
Expand Down
10 changes: 9 additions & 1 deletion evalml/automl/engine/sequential_engine.py
@@ -1,4 +1,5 @@
from evalml.automl.engine import EngineBase
from evalml.model_family import ModelFamily


class SequentialEngine(EngineBase):
Expand All @@ -20,7 +21,14 @@ def evaluate_batch(self, pipelines):
while self._should_continue_callback() and index < len(pipelines):
pipeline = pipelines[index]
self._pre_evaluation_callback(pipeline)
evaluation_result = EngineBase.train_and_score_pipeline(pipeline, self.automl, self.X_train, self.y_train)
X, y = self.X_train, self.y_train
if pipeline.model_family == ModelFamily.ENSEMBLE:
X, y = self.X_train.iloc[self.ensembling_indices], self.y_train.iloc[self.ensembling_indices]
elif self.ensembling_indices is not None:
training_indices = [i for i in range(len(self.X_train)) if i not in self.ensembling_indices]
X = self.X_train.iloc[training_indices]
y = self.y_train.iloc[training_indices]
evaluation_result = EngineBase.train_and_score_pipeline(pipeline, self.automl, X, y)
new_pipeline_ids.append(self._post_evaluation_callback(pipeline, evaluation_result))
index += 1
return new_pipeline_ids

0 comments on commit c499006

Please sign in to comment.