Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass X_train, y_train in Engine.submit_scoring_job for time series #2786

Merged
merged 4 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Release Notes
* Fixes
* Fixed bug where ``calculate_permutation_importance`` was not calculating the right value for pipelines with target transformers :pr:`2782`
* Fixed bug where transformed target values were not used in ``fit`` for time series pipelines :pr:`2780`
* Fixed bug where ``score_pipelines`` method of ``AutoMLSearch`` would not work for time series problems :pr:`2786`
* Changes
* Documentation Changes
* Testing Changes
Expand Down
11 changes: 10 additions & 1 deletion evalml/automl/automl_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -1659,9 +1659,18 @@ def score_pipelines(self, pipelines, X_holdout, y_holdout, objectives):

computations = []
for pipeline in pipelines:
X_train, y_train = None, None
if is_time_series(self.problem_type):
X_train, y_train = self.X_train, self.y_train
computations.append(
self._engine.submit_scoring_job(
self.automl_config, pipeline, X_holdout, y_holdout, objectives
self.automl_config,
pipeline,
X_holdout,
y_holdout,
objectives,
X_train=X_train,
y_train=y_train,
)
)

Expand Down
8 changes: 7 additions & 1 deletion evalml/automl/engine/cf_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,18 @@ def submit_training_job(self, automl_config, pipeline, X, y):
)
return CFComputation(future)

def submit_scoring_job(self, automl_config, pipeline, X, y, objectives):
def submit_scoring_job(
self, automl_config, pipeline, X, y, objectives, X_train=None, y_train=None
):
"""Send scoring job to cluster.

Args:
automl_config: Structure containing data passed from AutoMLSearch instance.
pipeline (pipeline.PipelineBase): Pipeline to train.
X (pd.DataFrame): Input data for modeling.
y (pd.Series): Target data for modeling.
X_train (pd.DataFrame): Training features. Used for feature engineering in time series.
y_train (pd.Series): Training target. Used for feature engineering in time series.
objectives (list[ObjectiveBase]): Objectives to score on.

Returns:
Expand All @@ -172,6 +176,8 @@ def submit_scoring_job(self, automl_config, pipeline, X, y, objectives):
objectives=objectives,
X_schema=X_schema,
y_schema=y_schema,
X_train=X_train,
y_train=y_train,
)
computation = CFComputation(future)
computation.meta_data["pipeline_name"] = pipeline.name
Expand Down
9 changes: 8 additions & 1 deletion evalml/automl/engine/dask_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,18 @@ def submit_training_job(self, automl_config, pipeline, X, y):
)
return DaskComputation(dask_future)

def submit_scoring_job(self, automl_config, pipeline, X, y, objectives):
def submit_scoring_job(
self, automl_config, pipeline, X, y, objectives, X_train=None, y_train=None
):
"""Send scoring job to cluster.

Args:
automl_config: Structure containing data passed from AutoMLSearch instance.
pipeline (pipeline.PipelineBase): Pipeline to train.
X (pd.DataFrame): Input data for modeling.
y (pd.Series): Target data for modeling.
X_train (pd.DataFrame): Training features. Used for feature engineering in time series.
y_train (pd.Series): Training target. Used for feature engineering in time series.
objectives (list[ObjectiveBase]): List of objectives to score on.

Returns:
Expand All @@ -159,6 +163,7 @@ def submit_scoring_job(self, automl_config, pipeline, X, y, objectives):
X_schema = X.ww.schema
y_schema = y.ww.schema
X, y = self.send_data_to_cluster(X, y)
X_train, y_train = self.send_data_to_cluster(X_train, y_train)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my own curiosity: theoretically, if send_data_to_cluster supported more arguments, this could have been combined with the line above, right? 🤔

dask_future = self.client.submit(
score_pipeline,
pipeline=pipeline,
Expand All @@ -167,6 +172,8 @@ def submit_scoring_job(self, automl_config, pipeline, X, y, objectives):
objectives=objectives,
X_schema=X_schema,
y_schema=y_schema,
X_train=X_train,
y_train=y_train,
)
computation = DaskComputation(dask_future)
computation.meta_data["pipeline_name"] = pipeline.name
Expand Down
14 changes: 10 additions & 4 deletions evalml/automl/engine/engine_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ def submit_training_job(self, automl_config, pipeline, X, y):
"""Submit job for pipeline training."""

@abstractmethod
def submit_scoring_job(self, automl_config, pipeline, X, y, objectives):
def submit_scoring_job(
self, automl_config, pipeline, X, y, objectives, X_train=None, y_train=None
):
"""Submit job for pipeline scoring."""


Expand Down Expand Up @@ -326,14 +328,18 @@ def evaluate_pipeline(pipeline, automl_config, X, y, logger):
)


def score_pipeline(pipeline, X, y, objectives, X_schema=None, y_schema=None):
def score_pipeline(
pipeline, X, y, objectives, X_train=None, y_train=None, X_schema=None, y_schema=None
):
"""Wrap around pipeline.score method to make it easy to score pipelines with dask.

Args:
pipeline (PipelineBase): The pipeline to score.
X (pd.DataFrame): Features to score on.
y (pd.Series): Target used to calcualte scores.
y (pd.Series): Target used to calculate scores.
objectives (list[ObjectiveBase]): List of objectives to score on.
X_train (pd.DataFrame): Training features. Used for feature engineering in time series.
y_train (pd.Series): Training target. Used for feature engineering in time series.
X_schema (ww.TableSchema): Schema for features. Defaults to None.
y_schema (ww.ColumnSchema): Schema for columns. Defaults to None.

Expand All @@ -344,4 +350,4 @@ def score_pipeline(pipeline, X, y, objectives, X_schema=None, y_schema=None):
X.ww.init(schema=X_schema)
if y_schema:
y.ww.init(schema=y_schema)
return pipeline.score(X, y, objectives)
return pipeline.score(X, y, objectives, X_train=X_train, y_train=y_train)
8 changes: 7 additions & 1 deletion evalml/automl/engine/sequential_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,18 @@ def submit_training_job(self, automl_config, pipeline, X, y):
schema=False,
)

def submit_scoring_job(self, automl_config, pipeline, X, y, objectives):
def submit_scoring_job(
self, automl_config, pipeline, X, y, objectives, X_train=None, y_train=None
):
"""Submit a job to score a pipeline.

Args:
automl_config: Structure containing data passed from AutoMLSearch instance.
pipeline (pipeline.PipelineBase): Pipeline to train.
X (pd.DataFrame): Input data for modeling.
y (pd.Series): Target data for modeling.
X_train (pd.DataFrame): Training features. Used for feature engineering in time series.
y_train (pd.Series): Training target. Used for feature engineering in time series.
objectives (list[ObjectiveBase]): List of objectives to score on.

Returns:
Expand All @@ -121,6 +125,8 @@ def submit_scoring_job(self, automl_config, pipeline, X, y, objectives):
objectives=objectives,
X_schema=X.ww.schema,
y_schema=y.ww.schema,
X_train=X_train,
y_train=y_train,
)
computation.meta_data["pipeline_name"] = pipeline.name
return computation
Expand Down
4 changes: 2 additions & 2 deletions evalml/tests/automl_tests/dask_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def __init__(self, parameters, random_seed=0):
random_seed=random_seed,
)

def score(self, X, y, objectives):
def score(self, X, y, objectives, X_train=None, y_train=None):
raise PipelineScoreError(
exceptions={"AUC": (Exception(), []), "Log Loss Binary": (Exception(), [])},
scored_successfully={
Expand Down Expand Up @@ -183,7 +183,7 @@ def fit(self, X, y):
assert y.ww.schema == self.y_schema_to_check
return super().fit(X, y)

def score(self, X, y, objectives):
def score(self, X, y, objectives, X_train=None, y_train=None):
assert X.ww.schema == self.X_schema_to_check
assert y.ww.schema == self.y_schema_to_check
return super().score(X, y, objectives)
66 changes: 66 additions & 0 deletions evalml/tests/automl_tests/parallel_tests/test_automl_dask.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import numpy as np
import pandas as pd
import pytest

from evalml.automl import AutoMLSearch
from evalml.automl.automl_algorithm import IterativeAlgorithm
from evalml.automl.callbacks import raise_error_callback
from evalml.automl.engine import CFEngine, DaskEngine, SequentialEngine
from evalml.problem_types import (
ProblemTypes,
is_binary,
is_multiclass,
is_time_series,
)
from evalml.tests.automl_tests.dask_test_utils import (
DaskPipelineFast,
DaskPipelineSlow,
Expand Down Expand Up @@ -266,3 +273,62 @@ def test_automl_closes_engines(engine_str, X_y_binary_cls):
)
automl.close_engine()
assert automl._engine.is_closed


@pytest.mark.parametrize(
"engine_str",
engine_strs + ["sequential"],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chukarsten Check it out - threaded engines respect mocks

)
@pytest.mark.parametrize("problem_type", ProblemTypes.all_problem_types)
def test_score_pipelines_passes_X_train_y_train(
problem_type, engine_str, X_y_binary, X_y_regression, X_y_multi, AutoMLTestEnv
):
if is_binary(problem_type):
X, y = X_y_binary
elif is_multiclass(problem_type):
X, y = X_y_multi
else:
X, y = X_y_regression

X_train, y_train = pd.DataFrame(X[:50]), pd.Series(y[:50])
X_test, y_test = pd.DataFrame(X[50:]), pd.Series(y[50:])

if is_multiclass(problem_type) or is_binary(problem_type):
y_train = y_train.astype("int64")
y_test = y_test.astype("int64")

automl = AutoMLSearch(
X_train=X_train,
y_train=y_train,
problem_type=problem_type,
max_iterations=5,
optimize_thresholds=False,
problem_configuration={
"date_index": None,
"gap": 0,
"forecast_horizon": 1,
"max_delay": 2,
},
engine=engine_str,
)

env = AutoMLTestEnv(problem_type)
with env.test_context(score_return_value={automl.objective.name: 3.12}):
automl.search()

with env.test_context(score_return_value={automl.objective.name: 3.12}):
automl.score_pipelines(
automl.allowed_pipelines, X_test, y_test, [automl.objective]
)

expected_X_train, expected_y_train = None, None
if is_time_series(problem_type):
expected_X_train, expected_y_train = X_train, y_train
assert len(env.mock_score.mock_calls) == len(automl.allowed_pipelines)
for mock_call in env.mock_score.mock_calls:
if expected_X_train is not None:
pd.testing.assert_frame_equal(mock_call[2]["X_train"], expected_X_train)
pd.testing.assert_series_equal(mock_call[2]["y_train"], expected_y_train)
else:
assert mock_call[2]["X_train"] == expected_X_train
assert mock_call[2]["y_train"] == expected_y_train