Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support additional_experiments for AutoML Tables and AutoML Forecasting #428

Merged
merged 1 commit into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 29 additions & 0 deletions google/cloud/aiplatform/training_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2420,6 +2420,8 @@ def __init__(
optimization_objective_precision_value
)

self._additional_experiments = []

def run(
self,
dataset: datasets.TabularDataset,
Expand Down Expand Up @@ -2664,6 +2666,11 @@ def _run(
"optimizationObjectivePrecisionValue": self._optimization_objective_precision_value,
}

if self._additional_experiments:
training_task_inputs_dict[
"additionalExperiments"
] = self._additional_experiments

if model_display_name is None:
model_display_name = self._display_name

Expand Down Expand Up @@ -2691,6 +2698,14 @@ def _model_upload_fail_string(self) -> str:
"Model."
)

def _add_additional_experiments(self, additional_experiments: List[str]):
"""Add experiment flags to the training job.
Args:
additional_experiments (List[str]):
Experiment flags that can enable some experimental training features.
"""
self._additional_experiments.extend(additional_experiments)


class AutoMLForecastingTrainingJob(_TrainingJob):
_supported_training_schemas = (schema.training_job.definition.automl_forecasting,)
Expand Down Expand Up @@ -2746,6 +2761,7 @@ def __init__(
)
self._column_transformations = column_transformations
self._optimization_objective = optimization_objective
self._additional_experiments = []

def run(
self,
Expand Down Expand Up @@ -3119,6 +3135,11 @@ def _run(
"overrideExistingTable": export_evaluated_data_items_override_destination,
}

if self._additional_experiments:
training_task_inputs_dict[
"additionalExperiments"
] = self._additional_experiments

if model_display_name is None:
model_display_name = self._display_name

Expand All @@ -3143,6 +3164,14 @@ def _model_upload_fail_string(self) -> str:
"Model."
)

def _add_additional_experiments(self, additional_experiments: List[str]):
"""Add experiment flags to the training job.
Args:
additional_experiments (List[str]):
Experiment flags that can enable some experimental training features.
"""
self._additional_experiments.extend(additional_experiments)


class AutoMLImageTrainingJob(_TrainingJob):
_supported_training_schemas = (
Expand Down
127 changes: 103 additions & 24 deletions tests/unit/aiplatform/test_automl_forecasting_training_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,32 +62,40 @@
_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS = 1000
_TEST_TRAINING_WEIGHT_COLUMN = "weight"
_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME = "minimize-rmse"
_TEST_ADDITIONAL_EXPERIMENTS = ["exp1", "exp2"]
_TEST_TRAINING_TASK_INPUTS_DICT = {
# required inputs
"targetColumn": _TEST_TRAINING_TARGET_COLUMN,
"timeColumn": _TEST_TRAINING_TIME_COLUMN,
"timeSeriesIdentifierColumn": _TEST_TRAINING_TIME_SERIES_IDENTIFIER_COLUMN,
"timeSeriesAttributeColumns": _TEST_TRAINING_TIME_SERIES_ATTRIBUTE_COLUMNS,
"unavailableAtForecastColumns": _TEST_TRAINING_UNAVAILABLE_AT_FORECAST_COLUMNS,
"availableAtForecastColumns": _TEST_TRAINING_AVAILABLE_AT_FORECAST_COLUMNS,
"forecastHorizon": _TEST_TRAINING_FORECAST_HORIZON,
"dataGranularity": {
"unit": _TEST_TRAINING_DATA_GRANULARITY_UNIT,
"quantity": _TEST_TRAINING_DATA_GRANULARITY_COUNT,
},
"transformations": _TEST_TRAINING_COLUMN_TRANSFORMATIONS,
"trainBudgetMilliNodeHours": _TEST_TRAINING_BUDGET_MILLI_NODE_HOURS,
# optional inputs
"weightColumn": _TEST_TRAINING_WEIGHT_COLUMN,
"contextWindow": _TEST_TRAINING_CONTEXT_WINDOW,
"exportEvaluatedDataItemsConfig": {
"destinationBigqueryUri": _TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI,
"overrideExistingTable": _TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION,
},
"quantiles": _TEST_TRAINING_QUANTILES,
"validationOptions": _TEST_TRAINING_VALIDATION_OPTIONS,
"optimizationObjective": _TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME,
}
_TEST_TRAINING_TASK_INPUTS = json_format.ParseDict(
_TEST_TRAINING_TASK_INPUTS_DICT, struct_pb2.Value(),
)
_TEST_TRAINING_TASK_INPUTS_WITH_ADDITIONAL_EXPERIMENTS = json_format.ParseDict(
{
# required inputs
"targetColumn": _TEST_TRAINING_TARGET_COLUMN,
"timeColumn": _TEST_TRAINING_TIME_COLUMN,
"timeSeriesIdentifierColumn": _TEST_TRAINING_TIME_SERIES_IDENTIFIER_COLUMN,
"timeSeriesAttributeColumns": _TEST_TRAINING_TIME_SERIES_ATTRIBUTE_COLUMNS,
"unavailableAtForecastColumns": _TEST_TRAINING_UNAVAILABLE_AT_FORECAST_COLUMNS,
"availableAtForecastColumns": _TEST_TRAINING_AVAILABLE_AT_FORECAST_COLUMNS,
"forecastHorizon": _TEST_TRAINING_FORECAST_HORIZON,
"dataGranularity": {
"unit": _TEST_TRAINING_DATA_GRANULARITY_UNIT,
"quantity": _TEST_TRAINING_DATA_GRANULARITY_COUNT,
},
"transformations": _TEST_TRAINING_COLUMN_TRANSFORMATIONS,
"trainBudgetMilliNodeHours": _TEST_TRAINING_BUDGET_MILLI_NODE_HOURS,
# optional inputs
"weightColumn": _TEST_TRAINING_WEIGHT_COLUMN,
"contextWindow": _TEST_TRAINING_CONTEXT_WINDOW,
"exportEvaluatedDataItemsConfig": {
"destinationBigqueryUri": _TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI,
"overrideExistingTable": _TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION,
},
"quantiles": _TEST_TRAINING_QUANTILES,
"validationOptions": _TEST_TRAINING_VALIDATION_OPTIONS,
"optimizationObjective": _TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME,
**_TEST_TRAINING_TASK_INPUTS_DICT,
"additionalExperiments": _TEST_ADDITIONAL_EXPERIMENTS,
},
struct_pb2.Value(),
)
Expand Down Expand Up @@ -359,6 +367,77 @@ def test_run_call_pipeline_if_no_model_display_name(
training_pipeline=true_training_pipeline,
)

@pytest.mark.usefixtures("mock_pipeline_service_get")
@pytest.mark.parametrize("sync", [True, False])
def test_run_call_pipeline_if_set_additional_experiments(
self,
mock_pipeline_service_create,
mock_dataset_time_series,
mock_model_service_get,
sync,
):
aiplatform.init(project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME)

job = AutoMLForecastingTrainingJob(
display_name=_TEST_DISPLAY_NAME,
optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME,
column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS,
)

job._add_additional_experiments(_TEST_ADDITIONAL_EXPERIMENTS)

model_from_job = job.run(
dataset=mock_dataset_time_series,
target_column=_TEST_TRAINING_TARGET_COLUMN,
time_column=_TEST_TRAINING_TIME_COLUMN,
time_series_identifier_column=_TEST_TRAINING_TIME_SERIES_IDENTIFIER_COLUMN,
unavailable_at_forecast_columns=_TEST_TRAINING_UNAVAILABLE_AT_FORECAST_COLUMNS,
available_at_forecast_columns=_TEST_TRAINING_AVAILABLE_AT_FORECAST_COLUMNS,
forecast_horizon=_TEST_TRAINING_FORECAST_HORIZON,
data_granularity_unit=_TEST_TRAINING_DATA_GRANULARITY_UNIT,
data_granularity_count=_TEST_TRAINING_DATA_GRANULARITY_COUNT,
weight_column=_TEST_TRAINING_WEIGHT_COLUMN,
time_series_attribute_columns=_TEST_TRAINING_TIME_SERIES_ATTRIBUTE_COLUMNS,
context_window=_TEST_TRAINING_CONTEXT_WINDOW,
budget_milli_node_hours=_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS,
export_evaluated_data_items=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS,
export_evaluated_data_items_bigquery_destination_uri=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI,
export_evaluated_data_items_override_destination=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION,
quantiles=_TEST_TRAINING_QUANTILES,
validation_options=_TEST_TRAINING_VALIDATION_OPTIONS,
sync=sync,
)

if not sync:
model_from_job.wait()

true_fraction_split = gca_training_pipeline.FractionSplit(
training_fraction=_TEST_TRAINING_FRACTION_SPLIT,
validation_fraction=_TEST_VALIDATION_FRACTION_SPLIT,
test_fraction=_TEST_TEST_FRACTION_SPLIT,
)

# Test that if defaults to the job display name
true_managed_model = gca_model.Model(display_name=_TEST_DISPLAY_NAME)

true_input_data_config = gca_training_pipeline.InputDataConfig(
fraction_split=true_fraction_split,
dataset_id=mock_dataset_time_series.name,
)

true_training_pipeline = gca_training_pipeline.TrainingPipeline(
display_name=_TEST_DISPLAY_NAME,
training_task_definition=schema.training_job.definition.automl_forecasting,
training_task_inputs=_TEST_TRAINING_TASK_INPUTS_WITH_ADDITIONAL_EXPERIMENTS,
model_to_upload=true_managed_model,
input_data_config=true_input_data_config,
)

mock_pipeline_service_create.assert_called_once_with(
parent=initializer.global_config.common_location_path(),
training_pipeline=true_training_pipeline,
)

@pytest.mark.usefixtures(
"mock_pipeline_service_create",
"mock_pipeline_service_get",
Expand Down
107 changes: 96 additions & 11 deletions tests/unit/aiplatform/test_automl_tabular_training_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,27 @@
_TEST_TRAINING_DISABLE_EARLY_STOPPING = True
_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME = "minimize-log-loss"
_TEST_TRAINING_OPTIMIZATION_PREDICTION_TYPE = "classification"
_TEST_ADDITIONAL_EXPERIMENTS = ["exp1", "exp2"]
_TEST_TRAINING_TASK_INPUTS_DICT = {
# required inputs
"targetColumn": _TEST_TRAINING_TARGET_COLUMN,
"transformations": _TEST_TRAINING_COLUMN_TRANSFORMATIONS,
"trainBudgetMilliNodeHours": _TEST_TRAINING_BUDGET_MILLI_NODE_HOURS,
# optional inputs
"weightColumnName": _TEST_TRAINING_WEIGHT_COLUMN,
"disableEarlyStopping": _TEST_TRAINING_DISABLE_EARLY_STOPPING,
"predictionType": _TEST_TRAINING_OPTIMIZATION_PREDICTION_TYPE,
"optimizationObjective": _TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME,
"optimizationObjectiveRecallValue": None,
"optimizationObjectivePrecisionValue": None,
}
_TEST_TRAINING_TASK_INPUTS = json_format.ParseDict(
_TEST_TRAINING_TASK_INPUTS_DICT, struct_pb2.Value(),
)
_TEST_TRAINING_TASK_INPUTS_WITH_ADDITIONAL_EXPERIMENTS = json_format.ParseDict(
{
# required inputs
"targetColumn": _TEST_TRAINING_TARGET_COLUMN,
"transformations": _TEST_TRAINING_COLUMN_TRANSFORMATIONS,
"trainBudgetMilliNodeHours": _TEST_TRAINING_BUDGET_MILLI_NODE_HOURS,
# optional inputs
"weightColumnName": _TEST_TRAINING_WEIGHT_COLUMN,
"disableEarlyStopping": _TEST_TRAINING_DISABLE_EARLY_STOPPING,
"predictionType": _TEST_TRAINING_OPTIMIZATION_PREDICTION_TYPE,
"optimizationObjective": _TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME,
"optimizationObjectiveRecallValue": None,
"optimizationObjectivePrecisionValue": None,
**_TEST_TRAINING_TASK_INPUTS_DICT,
"additionalExperiments": _TEST_ADDITIONAL_EXPERIMENTS,
},
struct_pb2.Value(),
)
Expand Down Expand Up @@ -430,6 +438,83 @@ def test_run_call_pipeline_service_create_if_no_column_transformations(
training_pipeline=true_training_pipeline,
)

@pytest.mark.parametrize("sync", [True, False])
# This test checks that default transformations are used if no columns transformations are provided
def test_run_call_pipeline_service_create_if_set_additional_experiments(
self,
mock_pipeline_service_create,
mock_pipeline_service_get,
mock_dataset_tabular,
mock_model_service_get,
sync,
):
aiplatform.init(
project=_TEST_PROJECT,
staging_bucket=_TEST_BUCKET_NAME,
encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME,
)

job = training_jobs.AutoMLTabularTrainingJob(
display_name=_TEST_DISPLAY_NAME,
optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME,
optimization_prediction_type=_TEST_TRAINING_OPTIMIZATION_PREDICTION_TYPE,
column_transformations=None,
optimization_objective_recall_value=None,
optimization_objective_precision_value=None,
)

job._add_additional_experiments(_TEST_ADDITIONAL_EXPERIMENTS)

model_from_job = job.run(
dataset=mock_dataset_tabular,
target_column=_TEST_TRAINING_TARGET_COLUMN,
model_display_name=_TEST_MODEL_DISPLAY_NAME,
training_fraction_split=_TEST_TRAINING_FRACTION_SPLIT,
validation_fraction_split=_TEST_VALIDATION_FRACTION_SPLIT,
test_fraction_split=_TEST_TEST_FRACTION_SPLIT,
predefined_split_column_name=_TEST_PREDEFINED_SPLIT_COLUMN_NAME,
weight_column=_TEST_TRAINING_WEIGHT_COLUMN,
budget_milli_node_hours=_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS,
disable_early_stopping=_TEST_TRAINING_DISABLE_EARLY_STOPPING,
sync=sync,
)

if not sync:
model_from_job.wait()

true_fraction_split = gca_training_pipeline.FractionSplit(
training_fraction=_TEST_TRAINING_FRACTION_SPLIT,
validation_fraction=_TEST_VALIDATION_FRACTION_SPLIT,
test_fraction=_TEST_TEST_FRACTION_SPLIT,
)

true_managed_model = gca_model.Model(
display_name=_TEST_MODEL_DISPLAY_NAME,
encryption_spec=_TEST_DEFAULT_ENCRYPTION_SPEC,
)

true_input_data_config = gca_training_pipeline.InputDataConfig(
fraction_split=true_fraction_split,
predefined_split=gca_training_pipeline.PredefinedSplit(
key=_TEST_PREDEFINED_SPLIT_COLUMN_NAME
),
dataset_id=mock_dataset_tabular.name,
)

true_training_pipeline = gca_training_pipeline.TrainingPipeline(
display_name=_TEST_DISPLAY_NAME,
training_task_definition=schema.training_job.definition.automl_tabular,
training_task_inputs=_TEST_TRAINING_TASK_INPUTS_WITH_ADDITIONAL_EXPERIMENTS,
model_to_upload=true_managed_model,
input_data_config=true_input_data_config,
encryption_spec=_TEST_DEFAULT_ENCRYPTION_SPEC,
)

mock_pipeline_service_create.assert_called_once_with(
parent=initializer.global_config.common_location_path(),
training_pipeline=true_training_pipeline,
)

@pytest.mark.usefixtures(
"mock_pipeline_service_create",
"mock_pipeline_service_get",
Expand Down