Skip to content

Commit

Permalink
feat: add additional_experiement flag in the tables and forecasting t…
Browse files Browse the repository at this point in the history
…raining job (#979)

* Update training_jobs.py

* Update test_automl_forecasting_training_jobs.py

* Update training_jobs.py

* Update test_automl_tabular_training_jobs.py

* Update test_automl_forecasting_training_jobs.py

* Update test_automl_tabular_training_jobs.py

* Update google/cloud/aiplatform/training_jobs.py

Co-authored-by: sasha-gitg <44654632+sasha-gitg@users.noreply.github.com>

* Update google/cloud/aiplatform/training_jobs.py

Co-authored-by: sasha-gitg <44654632+sasha-gitg@users.noreply.github.com>

* Update test_automl_forecasting_training_jobs.py

* Update test_automl_tabular_training_jobs.py

* Update training_jobs.py

* Update training_jobs.py

Co-authored-by: sasha-gitg <44654632+sasha-gitg@users.noreply.github.com>
  • Loading branch information
dwkk-google and sasha-gitg committed Feb 25, 2022
1 parent 5ee6354 commit 5fe59a4
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 264 deletions.
208 changes: 12 additions & 196 deletions google/cloud/aiplatform/training_jobs.py
Expand Up @@ -3371,6 +3371,7 @@ def run(
export_evaluated_data_items: bool = False,
export_evaluated_data_items_bigquery_destination_uri: Optional[str] = None,
export_evaluated_data_items_override_destination: bool = False,
additional_experiments: Optional[List[str]] = None,
sync: bool = True,
) -> models.Model:
"""Runs the training job and returns a model.
Expand Down Expand Up @@ -3497,6 +3498,8 @@ def run(
Applies only if [export_evaluated_data_items] is True and
[export_evaluated_data_items_bigquery_destination_uri] is specified.
additional_experiments (List[str]):
Optional. Additional experiment flags for the automl tables training.
sync (bool):
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
Expand All @@ -3519,6 +3522,9 @@ def run(
if self._has_run:
raise RuntimeError("AutoML Tabular Training has already run.")

if additional_experiments:
self._add_additional_experiments(additional_experiments)

return self._run(
dataset=dataset,
target_column=target_column,
Expand Down Expand Up @@ -3961,6 +3967,7 @@ def run(
budget_milli_node_hours: int = 1000,
model_display_name: Optional[str] = None,
model_labels: Optional[Dict[str, str]] = None,
additional_experiments: Optional[List[str]] = None,
sync: bool = True,
) -> models.Model:
"""Runs the training job and returns a model.
Expand Down Expand Up @@ -4107,6 +4114,8 @@ def run(
are allowed.
See https://goo.gl/xmQnxf for more information
and examples of labels.
additional_experiments (List[str]):
Optional. Additional experiment flags for the time series forcasting training.
sync (bool):
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
Expand All @@ -4132,6 +4141,9 @@ def run(
if self._has_run:
raise RuntimeError("AutoML Forecasting Training has already run.")

if additional_experiments:
self._add_additional_experiments(additional_experiments)

return self._run(
dataset=dataset,
target_column=target_column,
Expand Down Expand Up @@ -4160,202 +4172,6 @@ def run(
sync=sync,
)

def _run_with_experiments(
self,
dataset: datasets.TimeSeriesDataset,
target_column: str,
time_column: str,
time_series_identifier_column: str,
unavailable_at_forecast_columns: List[str],
available_at_forecast_columns: List[str],
forecast_horizon: int,
data_granularity_unit: str,
data_granularity_count: int,
predefined_split_column_name: Optional[str] = None,
weight_column: Optional[str] = None,
time_series_attribute_columns: Optional[List[str]] = None,
context_window: Optional[int] = None,
export_evaluated_data_items: bool = False,
export_evaluated_data_items_bigquery_destination_uri: Optional[str] = None,
export_evaluated_data_items_override_destination: bool = False,
quantiles: Optional[List[float]] = None,
validation_options: Optional[str] = None,
budget_milli_node_hours: int = 1000,
model_display_name: Optional[str] = None,
model_labels: Optional[Dict[str, str]] = None,
sync: bool = True,
additional_experiments: Optional[List[str]] = None,
) -> models.Model:
"""Runs the training job with experiment flags and returns a model.
The training data splits are set by default: Roughly 80% will be used for training,
10% for validation, and 10% for test.
Args:
dataset (datasets.TimeSeriesDataset):
Required. The dataset within the same Project from which data will be used to train the Model. The
Dataset must use schema compatible with Model being trained,
and what is compatible should be described in the used
TrainingPipeline's [training_task_definition]
[google.cloud.aiplatform.v1beta1.TrainingPipeline.training_task_definition].
For time series Datasets, all their data is exported to
training, to pick and choose from.
target_column (str):
Required. Name of the column that the Model is to predict values for.
time_column (str):
Required. Name of the column that identifies time order in the time series.
time_series_identifier_column (str):
Required. Name of the column that identifies the time series.
unavailable_at_forecast_columns (List[str]):
Required. Column names of columns that are unavailable at forecast.
Each column contains information for the given entity (identified by the
[time_series_identifier_column]) that is unknown before the forecast
(e.g. population of a city in a given year, or weather on a given day).
available_at_forecast_columns (List[str]):
Required. Column names of columns that are available at forecast.
Each column contains information for the given entity (identified by the
[time_series_identifier_column]) that is known at forecast.
forecast_horizon: (int):
Required. The amount of time into the future for which forecasted values for the target are
returned. Expressed in number of units defined by the [data_granularity_unit] and
[data_granularity_count] field. Inclusive.
data_granularity_unit (str):
Required. The data granularity unit. Accepted values are ``minute``,
``hour``, ``day``, ``week``, ``month``, ``year``.
data_granularity_count (int):
Required. The number of data granularity units between data points in the training
data. If [data_granularity_unit] is `minute`, can be 1, 5, 10, 15, or 30. For all other
values of [data_granularity_unit], must be 1.
predefined_split_column_name (str):
Optional. The key is a name of one of the Dataset's data
columns. The value of the key (either the label's value or
value in the column) must be one of {``TRAIN``,
``VALIDATE``, ``TEST``}, and it defines to which set the
given piece of data is assigned. If for a piece of data the
key is not present or has an invalid value, that piece is
ignored by the pipeline.
Supported only for tabular and time series Datasets.
weight_column (str):
Optional. Name of the column that should be used as the weight column.
Higher values in this column give more importance to the row
during Model training. The column must have numeric values between 0 and
10000 inclusively, and 0 value means that the row is ignored.
If the weight column field is not set, then all rows are assumed to have
equal weight of 1.
time_series_attribute_columns (List[str]):
Optional. Column names that should be used as attribute columns.
Each column is constant within a time series.
context_window (int):
Optional. The amount of time into the past training and prediction data is used for
model training and prediction respectively. Expressed in number of units defined by the
[data_granularity_unit] and [data_granularity_count] fields. When not provided uses the
default value of 0 which means the model sets each series context window to be 0 (also
known as "cold start"). Inclusive.
export_evaluated_data_items (bool):
Whether to export the test set predictions to a BigQuery table.
If False, then the export is not performed.
export_evaluated_data_items_bigquery_destination_uri (string):
Optional. URI of desired destination BigQuery table for exported test set predictions.
Expected format:
``bq://<project_id>:<dataset_id>:<table>``
If not specified, then results are exported to the following auto-created BigQuery
table:
``<project_id>:export_evaluated_examples_<model_name>_<yyyy_MM_dd'T'HH_mm_ss_SSS'Z'>.evaluated_examples``
Applies only if [export_evaluated_data_items] is True.
export_evaluated_data_items_override_destination (bool):
Whether to override the contents of [export_evaluated_data_items_bigquery_destination_uri],
if the table exists, for exported test set predictions. If False, and the
table exists, then the training job will fail.
Applies only if [export_evaluated_data_items] is True and
[export_evaluated_data_items_bigquery_destination_uri] is specified.
quantiles (List[float]):
Quantiles to use for the `minizmize-quantile-loss`
[AutoMLForecastingTrainingJob.optimization_objective]. This argument is required in
this case.
Accepts up to 5 quantiles in the form of a double from 0 to 1, exclusive.
Each quantile must be unique.
validation_options (str):
Validation options for the data validation component. The available options are:
"fail-pipeline" - (default), will validate against the validation and fail the pipeline
if it fails.
"ignore-validation" - ignore the results of the validation and continue the pipeline
budget_milli_node_hours (int):
Optional. The train budget of creating this Model, expressed in milli node
hours i.e. 1,000 value in this field means 1 node hour.
The training cost of the model will not exceed this budget. The final
cost will be attempted to be close to the budget, though may end up
being (even) noticeably smaller - at the backend's discretion. This
especially may happen when further model training ceases to provide
any improvements.
If the budget is set to a value known to be insufficient to train a
Model for the given training set, the training won't be attempted and
will error.
The minimum value is 1000 and the maximum is 72000.
model_display_name (str):
Optional. If the script produces a managed Vertex AI Model. The display name of
the Model. The name can be up to 128 characters long and can be consist
of any UTF-8 characters.
If not provided upon creation, the job's display_name is used.
model_labels (Dict[str, str]):
Optional. The labels with user-defined metadata to
organize your Models.
Label keys and values can be no longer than 64
characters (Unicode codepoints), can only
contain lowercase letters, numeric characters,
underscores and dashes. International characters
are allowed.
See https://goo.gl/xmQnxf for more information
and examples of labels.
sync (bool):
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
additional_experiments (List[str]):
Additional experiment flags for the time series forcasting training.
Returns:
model: The trained Vertex AI Model resource or None if training did not
produce a Vertex AI Model.
Raises:
RuntimeError: If Training job has already been run or is waiting to run.
"""

if additional_experiments:
self._add_additional_experiments(additional_experiments)

return self.run(
dataset=dataset,
target_column=target_column,
time_column=time_column,
time_series_identifier_column=time_series_identifier_column,
unavailable_at_forecast_columns=unavailable_at_forecast_columns,
available_at_forecast_columns=available_at_forecast_columns,
forecast_horizon=forecast_horizon,
data_granularity_unit=data_granularity_unit,
data_granularity_count=data_granularity_count,
predefined_split_column_name=predefined_split_column_name,
weight_column=weight_column,
time_series_attribute_columns=time_series_attribute_columns,
context_window=context_window,
budget_milli_node_hours=budget_milli_node_hours,
export_evaluated_data_items=export_evaluated_data_items,
export_evaluated_data_items_bigquery_destination_uri=export_evaluated_data_items_bigquery_destination_uri,
export_evaluated_data_items_override_destination=export_evaluated_data_items_override_destination,
quantiles=quantiles,
validation_options=validation_options,
model_display_name=model_display_name,
model_labels=model_labels,
sync=sync,
)

@base.optional_sync()
def _run(
self,
Expand Down
74 changes: 7 additions & 67 deletions tests/unit/aiplatform/test_automl_forecasting_training_jobs.py
Expand Up @@ -91,9 +91,7 @@
"validationOptions": _TEST_TRAINING_VALIDATION_OPTIONS,
"optimizationObjective": _TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME,
}
_TEST_TRAINING_TASK_INPUTS = json_format.ParseDict(
_TEST_TRAINING_TASK_INPUTS_DICT, struct_pb2.Value(),
)

_TEST_TRAINING_TASK_INPUTS_WITH_ADDITIONAL_EXPERIMENTS = json_format.ParseDict(
{
**_TEST_TRAINING_TASK_INPUTS_DICT,
Expand All @@ -102,6 +100,10 @@
struct_pb2.Value(),
)

_TEST_TRAINING_TASK_INPUTS = json_format.ParseDict(
_TEST_TRAINING_TASK_INPUTS_DICT, struct_pb2.Value(),
)

_TEST_DATASET_NAME = "test-dataset-name"

_TEST_MODEL_DISPLAY_NAME = "model-display-name"
Expand Down Expand Up @@ -269,6 +271,7 @@ def test_run_call_pipeline_service_create(
export_evaluated_data_items_override_destination=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION,
quantiles=_TEST_TRAINING_QUANTILES,
validation_options=_TEST_TRAINING_VALIDATION_OPTIONS,
additional_experiments=_TEST_ADDITIONAL_EXPERIMENTS,
sync=sync,
)

Expand All @@ -290,7 +293,7 @@ def test_run_call_pipeline_service_create(
display_name=_TEST_DISPLAY_NAME,
labels=_TEST_LABELS,
training_task_definition=schema.training_job.definition.automl_forecasting,
training_task_inputs=_TEST_TRAINING_TASK_INPUTS,
training_task_inputs=_TEST_TRAINING_TASK_INPUTS_WITH_ADDITIONAL_EXPERIMENTS,
model_to_upload=true_managed_model,
input_data_config=true_input_data_config,
)
Expand Down Expand Up @@ -380,69 +383,6 @@ def test_run_call_pipeline_if_no_model_display_name_nor_model_labels(
training_pipeline=true_training_pipeline,
)

@pytest.mark.usefixtures("mock_pipeline_service_get")
@pytest.mark.parametrize("sync", [True, False])
def test_run_with_experiments(
self,
mock_pipeline_service_create,
mock_dataset_time_series,
mock_model_service_get,
sync,
):
aiplatform.init(project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME)

job = AutoMLForecastingTrainingJob(
display_name=_TEST_DISPLAY_NAME,
optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME,
column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS,
)

model_from_job = job._run_with_experiments(
dataset=mock_dataset_time_series,
target_column=_TEST_TRAINING_TARGET_COLUMN,
time_column=_TEST_TRAINING_TIME_COLUMN,
time_series_identifier_column=_TEST_TRAINING_TIME_SERIES_IDENTIFIER_COLUMN,
unavailable_at_forecast_columns=_TEST_TRAINING_UNAVAILABLE_AT_FORECAST_COLUMNS,
available_at_forecast_columns=_TEST_TRAINING_AVAILABLE_AT_FORECAST_COLUMNS,
forecast_horizon=_TEST_TRAINING_FORECAST_HORIZON,
data_granularity_unit=_TEST_TRAINING_DATA_GRANULARITY_UNIT,
data_granularity_count=_TEST_TRAINING_DATA_GRANULARITY_COUNT,
weight_column=_TEST_TRAINING_WEIGHT_COLUMN,
time_series_attribute_columns=_TEST_TRAINING_TIME_SERIES_ATTRIBUTE_COLUMNS,
context_window=_TEST_TRAINING_CONTEXT_WINDOW,
budget_milli_node_hours=_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS,
export_evaluated_data_items=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS,
export_evaluated_data_items_bigquery_destination_uri=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI,
export_evaluated_data_items_override_destination=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION,
quantiles=_TEST_TRAINING_QUANTILES,
validation_options=_TEST_TRAINING_VALIDATION_OPTIONS,
sync=sync,
additional_experiments=_TEST_ADDITIONAL_EXPERIMENTS,
)

if not sync:
model_from_job.wait()

# Test that if defaults to the job display name
true_managed_model = gca_model.Model(display_name=_TEST_DISPLAY_NAME)

true_input_data_config = gca_training_pipeline.InputDataConfig(
dataset_id=mock_dataset_time_series.name,
)

true_training_pipeline = gca_training_pipeline.TrainingPipeline(
display_name=_TEST_DISPLAY_NAME,
training_task_definition=schema.training_job.definition.automl_forecasting,
training_task_inputs=_TEST_TRAINING_TASK_INPUTS_WITH_ADDITIONAL_EXPERIMENTS,
model_to_upload=true_managed_model,
input_data_config=true_input_data_config,
)

mock_pipeline_service_create.assert_called_once_with(
parent=initializer.global_config.common_location_path(),
training_pipeline=true_training_pipeline,
)

@pytest.mark.usefixtures("mock_pipeline_service_get")
@pytest.mark.parametrize("sync", [True, False])
def test_run_call_pipeline_if_set_additional_experiments(
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/aiplatform/test_automl_tabular_training_jobs.py
Expand Up @@ -330,6 +330,7 @@ def test_run_call_pipeline_service_create(
weight_column=_TEST_TRAINING_WEIGHT_COLUMN,
budget_milli_node_hours=_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS,
disable_early_stopping=_TEST_TRAINING_DISABLE_EARLY_STOPPING,
additional_experiments=_TEST_ADDITIONAL_EXPERIMENTS,
sync=sync,
)

Expand All @@ -354,7 +355,7 @@ def test_run_call_pipeline_service_create(
display_name=_TEST_DISPLAY_NAME,
labels=_TEST_LABELS,
training_task_definition=schema.training_job.definition.automl_tabular,
training_task_inputs=_TEST_TRAINING_TASK_INPUTS,
training_task_inputs=_TEST_TRAINING_TASK_INPUTS_WITH_ADDITIONAL_EXPERIMENTS,
model_to_upload=true_managed_model,
input_data_config=true_input_data_config,
encryption_spec=_TEST_DEFAULT_ENCRYPTION_SPEC,
Expand Down

0 comments on commit 5fe59a4

Please sign in to comment.