diff --git a/google/cloud/aiplatform/__init__.py b/google/cloud/aiplatform/__init__.py index 58eb824454..cc7e119603 100644 --- a/google/cloud/aiplatform/__init__.py +++ b/google/cloud/aiplatform/__init__.py @@ -33,6 +33,7 @@ CustomContainerTrainingJob, CustomPythonPackageTrainingJob, AutoMLTabularTrainingJob, + AutoMLForecastingTrainingJob, AutoMLImageTrainingJob, AutoMLTextTrainingJob, AutoMLVideoTrainingJob, @@ -52,6 +53,7 @@ "init", "AutoMLImageTrainingJob", "AutoMLTabularTrainingJob", + "AutoMLForecastingTrainingJob", "AutoMLTextTrainingJob", "AutoMLVideoTrainingJob", "BatchPredictionJob", @@ -63,5 +65,6 @@ "Model", "TabularDataset", "TextDataset", + "TimeSeriesDataset", "VideoDataset", ) diff --git a/google/cloud/aiplatform/datasets/__init__.py b/google/cloud/aiplatform/datasets/__init__.py index 57e2bad45d..b297530955 100644 --- a/google/cloud/aiplatform/datasets/__init__.py +++ b/google/cloud/aiplatform/datasets/__init__.py @@ -17,6 +17,7 @@ from google.cloud.aiplatform.datasets.dataset import _Dataset from google.cloud.aiplatform.datasets.tabular_dataset import TabularDataset +from google.cloud.aiplatform.datasets.time_series_dataset import TimeSeriesDataset from google.cloud.aiplatform.datasets.image_dataset import ImageDataset from google.cloud.aiplatform.datasets.text_dataset import TextDataset from google.cloud.aiplatform.datasets.video_dataset import VideoDataset @@ -25,6 +26,7 @@ __all__ = ( "_Dataset", "TabularDataset", + "TimeSeriesDataset", "ImageDataset", "TextDataset", "VideoDataset", diff --git a/google/cloud/aiplatform/datasets/_datasources.py b/google/cloud/aiplatform/datasets/_datasources.py index eefd1b04fd..1221429258 100644 --- a/google/cloud/aiplatform/datasets/_datasources.py +++ b/google/cloud/aiplatform/datasets/_datasources.py @@ -224,6 +224,11 @@ def create_datasource( raise ValueError("tabular dataset does not support data import.") return TabularDatasource(gcs_source, bq_source) + if metadata_schema_uri == schema.dataset.metadata.time_series: + if import_schema_uri: + raise ValueError("time series dataset does not support data import.") + return TabularDatasource(gcs_source, bq_source) + if not import_schema_uri and not gcs_source: return NonTabularDatasource() elif import_schema_uri and gcs_source: diff --git a/google/cloud/aiplatform/datasets/time_series_dataset.py b/google/cloud/aiplatform/datasets/time_series_dataset.py new file mode 100644 index 0000000000..92d8e60c37 --- /dev/null +++ b/google/cloud/aiplatform/datasets/time_series_dataset.py @@ -0,0 +1,134 @@ +# -*- coding: utf-8 -*- + +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Optional, Sequence, Tuple, Union + +from google.auth import credentials as auth_credentials + +from google.cloud.aiplatform import datasets +from google.cloud.aiplatform.datasets import _datasources +from google.cloud.aiplatform import initializer +from google.cloud.aiplatform import schema +from google.cloud.aiplatform import utils + + +class TimeSeriesDataset(datasets._Dataset): + """Managed time series dataset resource for AI Platform""" + + _supported_metadata_schema_uris: Optional[Tuple[str]] = ( + schema.dataset.metadata.time_series, + ) + + @classmethod + def create( + cls, + display_name: str, + gcs_source: Optional[Union[str, Sequence[str]]] = None, + bq_source: Optional[str] = None, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + request_metadata: Optional[Sequence[Tuple[str, str]]] = (), + encryption_spec_key_name: Optional[str] = None, + sync: bool = True, + ) -> "TimeSeriesDataset": + """Creates a new tabular dataset. + + Args: + display_name (str): + Required. The user-defined name of the Dataset. + The name can be up to 128 characters long and can be consist + of any UTF-8 characters. + gcs_source (Union[str, Sequence[str]]): + Google Cloud Storage URI(-s) to the + input file(s). May contain wildcards. For more + information on wildcards, see + https://cloud.google.com/storage/docs/gsutil/addlhelp/WildcardNames. + examples: + str: "gs://bucket/file.csv" + Sequence[str]: ["gs://bucket/file1.csv", "gs://bucket/file2.csv"] + bq_source (str): + BigQuery URI to the input table. + example: + "bq://project.dataset.table_name" + project (str): + Project to upload this model to. Overrides project set in + aiplatform.init. + location (str): + Location to upload this model to. Overrides location set in + aiplatform.init. + credentials (auth_credentials.Credentials): + Custom credentials to use to upload this model. Overrides + credentials set in aiplatform.init. + request_metadata (Sequence[Tuple[str, str]]): + Strings which should be sent along with the request as metadata. + encryption_spec_key_name (Optional[str]): + Optional. The Cloud KMS resource identifier of the customer + managed encryption key used to protect the dataset. Has the + form: + ``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``. + The key needs to be in the same region as where the compute + resource is created. + + If set, this Dataset and all sub-resources of this Dataset will be secured by this key. + + Overrides encryption_spec_key_name set in aiplatform.init. + sync (bool): + Whether to execute this method synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + + Returns: + time_series_dataset (TimeSeriesDataset): + Instantiated representation of the managed time series dataset resource. + + """ + + utils.validate_display_name(display_name) + + api_client = cls._instantiate_client(location=location, credentials=credentials) + + metadata_schema_uri = schema.dataset.metadata.time_series + + datasource = _datasources.create_datasource( + metadata_schema_uri=metadata_schema_uri, + gcs_source=gcs_source, + bq_source=bq_source, + ) + + return cls._create_and_import( + api_client=api_client, + parent=initializer.global_config.common_location_path( + project=project, location=location + ), + display_name=display_name, + metadata_schema_uri=metadata_schema_uri, + datasource=datasource, + project=project or initializer.global_config.project, + location=location or initializer.global_config.location, + credentials=credentials or initializer.global_config.credentials, + request_metadata=request_metadata, + encryption_spec=initializer.global_config.get_encryption_spec( + encryption_spec_key_name=encryption_spec_key_name + ), + sync=sync, + ) + + def import_data(self): + raise NotImplementedError( + f"{self.__class__.__name__} class does not support 'import_data'" + ) diff --git a/google/cloud/aiplatform/schema.py b/google/cloud/aiplatform/schema.py index 04d2f026a1..6b2a3d7d66 100644 --- a/google/cloud/aiplatform/schema.py +++ b/google/cloud/aiplatform/schema.py @@ -22,6 +22,7 @@ class training_job: class definition: custom_task = "gs://google-cloud-aiplatform/schema/trainingjob/definition/custom_task_1.0.0.yaml" automl_tabular = "gs://google-cloud-aiplatform/schema/trainingjob/definition/automl_tabular_1.0.0.yaml" + automl_forecasting = "gs://google-cloud-aiplatform/schema/trainingjob/definition/automl_time_series_forecasting_1.0.0.yaml" automl_image_classification = "gs://google-cloud-aiplatform/schema/trainingjob/definition/automl_image_classification_1.0.0.yaml" automl_image_object_detection = "gs://google-cloud-aiplatform/schema/trainingjob/definition/automl_image_object_detection_1.0.0.yaml" automl_text_classification = "gs://google-cloud-aiplatform/schema/trainingjob/definition/automl_text_classification_1.0.0.yaml" @@ -37,6 +38,7 @@ class metadata: tabular = ( "gs://google-cloud-aiplatform/schema/dataset/metadata/tabular_1.0.0.yaml" ) + time_series = "gs://google-cloud-aiplatform/schema/dataset/metadata/time_series_1.0.0.yaml" image = "gs://google-cloud-aiplatform/schema/dataset/metadata/image_1.0.0.yaml" text = "gs://google-cloud-aiplatform/schema/dataset/metadata/text_1.0.0.yaml" video = "gs://google-cloud-aiplatform/schema/dataset/metadata/video_1.0.0.yaml" diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index 220a34637e..572e3ad0ae 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -279,7 +279,7 @@ def _create_input_data_config( key is not present or has an invalid value, that piece is ignored by the pipeline. - Supported only for tabular Datasets. + Supported only for tabular and time series Datasets. gcs_destination_uri_prefix (str): Optional. The Google Cloud Storage location. @@ -320,12 +320,12 @@ def _create_input_data_config( # Create predefined split spec predefined_split = None if predefined_split_column_name: - if ( - dataset._gca_resource.metadata_schema_uri - != schema.dataset.metadata.tabular + if dataset._gca_resource.metadata_schema_uri not in ( + schema.dataset.metadata.tabular, + schema.dataset.metadata.time_series, ): raise ValueError( - "A pre-defined split may only be used with a tabular Dataset" + "A pre-defined split may only be used with a tabular or time series Dataset" ) predefined_split = gca_training_pipeline.PredefinedSplit( @@ -438,7 +438,7 @@ def _run_job( key is not present or has an invalid value, that piece is ignored by the pipeline. - Supported only for tabular Datasets. + Supported only for tabular and time series Datasets. model (~.model.Model): Optional. Describes the Model that may be uploaded (via [ModelService.UploadMode][]) by this TrainingPipeline. The @@ -1904,7 +1904,7 @@ def run( key is not present or has an invalid value, that piece is ignored by the pipeline. - Supported only for tabular Datasets. + Supported only for tabular and time series Datasets. sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will @@ -2026,7 +2026,7 @@ def _run( key is not present or has an invalid value, that piece is ignored by the pipeline. - Supported only for tabular Datasets. + Supported only for tabular and time series Datasets. sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will @@ -2421,7 +2421,7 @@ def run( key is not present or has an invalid value, that piece is ignored by the pipeline. - Supported only for tabular Datasets. + Supported only for tabular and time series Datasets. sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will @@ -2537,7 +2537,7 @@ def _run( key is not present or has an invalid value, that piece is ignored by the pipeline. - Supported only for tabular Datasets. + Supported only for tabular and time series Datasets. sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will @@ -2759,7 +2759,7 @@ def run( key is not present or has an invalid value, that piece is ignored by the pipeline. - Supported only for tabular Datasets. + Supported only for tabular and time series Datasets. weight_column (str): Optional. Name of the column that should be used as the weight column. Higher values in this column give more importance to the row @@ -2874,7 +2874,7 @@ def _run( key is not present or has an invalid value, that piece is ignored by the pipeline. - Supported only for tabular Datasets. + Supported only for tabular and time series Datasets. weight_column (str): Optional. Name of the column that should be used as the weight column. Higher values in this column give more importance to the row @@ -2960,6 +2960,458 @@ def _model_upload_fail_string(self) -> str: ) +class AutoMLForecastingTrainingJob(_TrainingJob): + _supported_training_schemas = (schema.training_job.definition.automl_forecasting,) + + def __init__( + self, + display_name: str, + optimization_objective: Optional[str] = None, + column_transformations: Optional[Union[Dict, List[Dict]]] = None, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + ): + """Constructs a AutoML Forecasting Training Job. + + Args: + display_name (str): + Required. The user-defined name of this TrainingPipeline. + optimization_objective (str): + Optional. Objective function the model is to be optimized towards. + The training process creates a Model that optimizes the value of the objective + function over the validation set. The supported optimization objectives: + "minimize-rmse" (default) - Minimize root-mean-squared error (RMSE). + "minimize-mae" - Minimize mean-absolute error (MAE). + "minimize-rmsle" - Minimize root-mean-squared log error (RMSLE). + "minimize-rmspe" - Minimize root-mean-squared percentage error (RMSPE). + "minimize-wape-mae" - Minimize the combination of weighted absolute percentage error (WAPE) + and mean-absolute-error (MAE). + "minimize-quantile-loss" - Minimize the quantile loss at the defined quantiles. + (Set this objective to build quantile forecasts.) + column_transformations (Optional[Union[Dict, List[Dict]]]): + Optional. Transformations to apply to the input columns (i.e. columns other + than the targetColumn). Each transformation may produce multiple + result values from the column's value, and all are used for training. + When creating transformation for BigQuery Struct column, the column + should be flattened using "." as the delimiter. + If an input column has no transformations on it, such a column is + ignored by the training, except for the targetColumn, which should have + no transformations defined on. + project (str): + Optional. Project to run training in. Overrides project set in aiplatform.init. + location (str): + Optional. Location to run training in. Overrides location set in aiplatform.init. + credentials (auth_credentials.Credentials): + Optional. Custom credentials to use to run call training service. Overrides + credentials set in aiplatform.init. + """ + super().__init__( + display_name=display_name, + project=project, + location=location, + credentials=credentials, + ) + self._column_transformations = column_transformations + self._optimization_objective = optimization_objective + + def run( + self, + dataset: datasets.TimeSeriesDataset, + target_column: str, + time_column: str, + time_series_identifier_column: str, + unavailable_at_forecast_columns: List[str], + available_at_forecast_columns: List[str], + forecast_horizon: int, + data_granularity_unit: str, + data_granularity_count: int, + predefined_split_column_name: Optional[str] = None, + weight_column: Optional[str] = None, + time_series_attribute_columns: Optional[List[str]] = None, + context_window: Optional[int] = None, + export_evaluated_data_items: bool = False, + export_evaluated_data_items_bigquery_destination_uri: Optional[str] = None, + export_evaluated_data_items_override_destination: bool = False, + quantiles: Optional[List[float]] = None, + validation_options: Optional[str] = None, + budget_milli_node_hours: int = 1000, + model_display_name: Optional[str] = None, + sync: bool = True, + ) -> models.Model: + """Runs the training job and returns a model. + + The training data splits are set by default: Roughly 80% will be used for training, + 10% for validation, and 10% for test. + + Args: + dataset (datasets.Dataset): + Required. The dataset within the same Project from which data will be used to train the Model. The + Dataset must use schema compatible with Model being trained, + and what is compatible should be described in the used + TrainingPipeline's [training_task_definition] + [google.cloud.aiplatform.v1beta1.TrainingPipeline.training_task_definition]. + For time series Datasets, all their data is exported to + training, to pick and choose from. + target_column (str): + Required. Name of the column that the Model is to predict values for. + time_column (str): + Required. Name of the column that identifies time order in the time series. + time_series_identifier_column (str): + Required. Name of the column that identifies the time series. + unavailable_at_forecast_columns (List[str]): + Required. Column names of columns that are unavailable at forecast. + Each column contains information for the given entity (identified by the + [time_series_identifier_column]) that is unknown before the forecast + (e.g. population of a city in a given year, or weather on a given day). + available_at_forecast_columns (List[str]): + Required. Column names of columns that are available at forecast. + Each column contains information for the given entity (identified by the + [time_series_identifier_column]) that is known at forecast. + forecast_horizon: (int): + Required. The amount of time into the future for which forecasted values for the target are + returned. Expressed in number of units defined by the [data_granularity_unit] and + [data_granularity_count] field. Inclusive. + data_granularity_unit (str): + Required. The data granularity unit. Accepted values are ``minute``, + ``hour``, ``day``, ``week``, ``month``, ``year``. + data_granularity_count (int): + Required. The number of data granularity units between data points in the training + data. If [data_granularity_unit] is `minute`, can be 1, 5, 10, 15, or 30. For all other + values of [data_granularity_unit], must be 1. + predefined_split_column_name (str): + Optional. The key is a name of one of the Dataset's data + columns. The value of the key (either the label's value or + value in the column) must be one of {``TRAIN``, + ``VALIDATE``, ``TEST``}, and it defines to which set the + given piece of data is assigned. If for a piece of data the + key is not present or has an invalid value, that piece is + ignored by the pipeline. + + Supported only for tabular and time series Datasets. + weight_column (str): + Optional. Name of the column that should be used as the weight column. + Higher values in this column give more importance to the row + during Model training. The column must have numeric values between 0 and + 10000 inclusively, and 0 value means that the row is ignored. + If the weight column field is not set, then all rows are assumed to have + equal weight of 1. + time_series_attribute_columns (List[str]): + Optional. Column names that should be used as attribute columns. + Each column is constant within a time series. + context_window (int): + Optional. The amount of time into the past training and prediction data is used for + model training and prediction respectively. Expressed in number of units defined by the + [data_granularity_unit] and [data_granularity_count] fields. When not provided uses the + default value of 0 which means the model sets each series context window to be 0 (also + known as "cold start"). Inclusive. + export_evaluated_data_items (bool): + Whether to export the test set predictions to a BigQuery table. + If False, then the export is not performed. + export_evaluated_data_items_bigquery_destination_uri (string): + Optional. URI of desired destination BigQuery table for exported test set predictions. + + Expected format: + ``bq://::`` + + If not specified, then results are exported to the following auto-created BigQuery + table: + ``:export_evaluated_examples__.evaluated_examples`` + + Applies only if [export_evaluated_data_items] is True. + export_evaluated_data_items_override_destination (bool): + Whether to override the contents of [export_evaluated_data_items_bigquery_destination_uri], + if the table exists, for exported test set predictions. If False, and the + table exists, then the training job will fail. + + Applies only if [export_evaluated_data_items] is True and + [export_evaluated_data_items_bigquery_destination_uri] is specified. + quantiles (List[float]): + Quantiles to use for the `minizmize-quantile-loss` + [AutoMLForecastingTrainingJob.optimization_objective]. This argument is required in + this case. + + Accepts up to 5 quantiles in the form of a double from 0 to 1, exclusive. + Each quantile must be unique. + validation_options (str): + Validation options for the data validation component. The available options are: + "fail-pipeline" - (default), will validate against the validation and fail the pipeline + if it fails. + "ignore-validation" - ignore the results of the validation and continue the pipeline + budget_milli_node_hours (int): + Optional. The train budget of creating this Model, expressed in milli node + hours i.e. 1,000 value in this field means 1 node hour. + The training cost of the model will not exceed this budget. The final + cost will be attempted to be close to the budget, though may end up + being (even) noticeably smaller - at the backend's discretion. This + especially may happen when further model training ceases to provide + any improvements. + If the budget is set to a value known to be insufficient to train a + Model for the given training set, the training won't be attempted and + will error. + The minimum value is 1000 and the maximum is 72000. + model_display_name (str): + Optional. If the script produces a managed AI Platform Model. The display name of + the Model. The name can be up to 128 characters long and can be consist + of any UTF-8 characters. + + If not provided upon creation, the job's display_name is used. + sync (bool): + Whether to execute this method synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + Returns: + model: The trained AI Platform Model resource or None if training did not + produce an AI Platform Model. + + Raises: + RuntimeError if Training job has already been run or is waiting to run. + """ + + if self._is_waiting_to_run(): + raise RuntimeError( + "AutoML Forecasting Training is already scheduled to run." + ) + + if self._has_run: + raise RuntimeError("AutoML Forecasting Training has already run.") + + return self._run( + dataset=dataset, + target_column=target_column, + time_column=time_column, + time_series_identifier_column=time_series_identifier_column, + unavailable_at_forecast_columns=unavailable_at_forecast_columns, + available_at_forecast_columns=available_at_forecast_columns, + forecast_horizon=forecast_horizon, + data_granularity_unit=data_granularity_unit, + data_granularity_count=data_granularity_count, + predefined_split_column_name=predefined_split_column_name, + weight_column=weight_column, + time_series_attribute_columns=time_series_attribute_columns, + context_window=context_window, + budget_milli_node_hours=budget_milli_node_hours, + export_evaluated_data_items=export_evaluated_data_items, + export_evaluated_data_items_bigquery_destination_uri=export_evaluated_data_items_bigquery_destination_uri, + export_evaluated_data_items_override_destination=export_evaluated_data_items_override_destination, + quantiles=quantiles, + validation_options=validation_options, + model_display_name=model_display_name, + sync=sync, + ) + + @base.optional_sync() + def _run( + self, + dataset: datasets.TimeSeriesDataset, + target_column: str, + time_column: str, + time_series_identifier_column: str, + unavailable_at_forecast_columns: List[str], + available_at_forecast_columns: List[str], + forecast_horizon: int, + data_granularity_unit: str, + data_granularity_count: int, + predefined_split_column_name: Optional[str] = None, + weight_column: Optional[str] = None, + time_series_attribute_columns: Optional[List[str]] = None, + context_window: Optional[int] = None, + export_evaluated_data_items: bool = False, + export_evaluated_data_items_bigquery_destination_uri: Optional[str] = None, + export_evaluated_data_items_override_destination: bool = False, + quantiles: Optional[List[float]] = None, + validation_options: Optional[str] = None, + budget_milli_node_hours: int = 1000, + model_display_name: Optional[str] = None, + sync: bool = True, + ) -> models.Model: + """Runs the training job and returns a model. + + The training data splits are set by default: Roughly 80% will be used for training, + 10% for validation, and 10% for test. + + Args: + dataset (datasets.Dataset): + Required. The dataset within the same Project from which data will be used to train the Model. The + Dataset must use schema compatible with Model being trained, + and what is compatible should be described in the used + TrainingPipeline's [training_task_definition] + [google.cloud.aiplatform.v1beta1.TrainingPipeline.training_task_definition]. + For time series Datasets, all their data is exported to + training, to pick and choose from. + target_column (str): + Required. Name of the column that the Model is to predict values for. + time_column (str): + Required. Name of the column that identifies time order in the time series. + time_series_identifier_column (str): + Required. Name of the column that identifies the time series. + unavailable_at_forecast_columns (List[str]): + Required. Column names of columns that are unavailable at forecast. + Each column contains information for the given entity (identified by the + [time_series_identifier_column]) that is unknown before the forecast + (e.g. population of a city in a given year, or weather on a given day). + available_at_forecast_columns (List[str]): + Required. Column names of columns that are available at forecast. + Each column contains information for the given entity (identified by the + [time_series_identifier_column]) that is known at forecast. + forecast_horizon: (int): + Required. The amount of time into the future for which forecasted values for the target are + returned. Expressed in number of units defined by the [data_granularity_unit] and + [data_granularity_count] field. Inclusive. + data_granularity_unit (str): + Required. The data granularity unit. Accepted values are ``minute``, + ``hour``, ``day``, ``week``, ``month``, ``year``. + data_granularity_count (int): + Required. The number of data granularity units between data points in the training + data. If [data_granularity_unit] is `minute`, can be 1, 5, 10, 15, or 30. For all other + values of [data_granularity_unit], must be 1. + predefined_split_column_name (str): + Optional. The key is a name of one of the Dataset's data + columns. The value of the key (either the label's value or + value in the column) must be one of {``TRAIN``, + ``VALIDATE``, ``TEST``}, and it defines to which set the + given piece of data is assigned. If for a piece of data the + key is not present or has an invalid value, that piece is + ignored by the pipeline. + + Supported only for tabular and time series Datasets. + weight_column (str): + Optional. Name of the column that should be used as the weight column. + Higher values in this column give more importance to the row + during Model training. The column must have numeric values between 0 and + 10000 inclusively, and 0 value means that the row is ignored. + If the weight column field is not set, then all rows are assumed to have + equal weight of 1. + time_series_attribute_columns (List[str]): + Optional. Column names that should be used as attribute columns. + Each column is constant within a time series. + context_window (int): + Optional. The number of periods offset into the past to restrict past sequence, where each + period is one unit of granularity as defined by [period]. When not provided uses the + default value of 0 which means the model sets each series historical window to be 0 (also + known as "cold start"). Inclusive. + export_evaluated_data_items (bool): + Whether to export the test set predictions to a BigQuery table. + If False, then the export is not performed. + export_evaluated_data_items_bigquery_destination_uri (string): + Optional. URI of desired destination BigQuery table for exported test set predictions. + + Expected format: + ``bq://::
`` + + If not specified, then results are exported to the following auto-created BigQuery + table: + ``:export_evaluated_examples__.evaluated_examples`` + + Applies only if [export_evaluated_data_items] is True. + export_evaluated_data_items_override_destination (bool): + Whether to override the contents of [export_evaluated_data_items_bigquery_destination_uri], + if the table exists, for exported test set predictions. If False, and the + table exists, then the training job will fail. + + Applies only if [export_evaluated_data_items] is True and + [export_evaluated_data_items_bigquery_destination_uri] is specified. + quantiles (List[float]): + Quantiles to use for the `minizmize-quantile-loss` + [AutoMLForecastingTrainingJob.optimization_objective]. This argument is required in + this case. + + Accepts up to 5 quantiles in the form of a double from 0 to 1, exclusive. + Each quantile must be unique. + validation_options (str): + Validation options for the data validation component. The available options are: + "fail-pipeline" - (default), will validate against the validation and fail the pipeline + if it fails. + "ignore-validation" - ignore the results of the validation and continue the pipeline + budget_milli_node_hours (int): + Optional. The train budget of creating this Model, expressed in milli node + hours i.e. 1,000 value in this field means 1 node hour. + The training cost of the model will not exceed this budget. The final + cost will be attempted to be close to the budget, though may end up + being (even) noticeably smaller - at the backend's discretion. This + especially may happen when further model training ceases to provide + any improvements. + If the budget is set to a value known to be insufficient to train a + Model for the given training set, the training won't be attempted and + will error. + The minimum value is 1000 and the maximum is 72000. + model_display_name (str): + Optional. If the script produces a managed AI Platform Model. The display name of + the Model. The name can be up to 128 characters long and can be consist + of any UTF-8 characters. + + If not provided upon creation, the job's display_name is used. + sync (bool): + Whether to execute this method synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + Returns: + model: The trained AI Platform Model resource or None if training did not + produce an AI Platform Model. + """ + + training_task_definition = schema.training_job.definition.automl_forecasting + + training_task_inputs_dict = { + # required inputs + "targetColumn": target_column, + "timeColumn": time_column, + "timeSeriesIdentifierColumn": time_series_identifier_column, + "timeSeriesAttributeColumns": time_series_attribute_columns, + "unavailableAtForecastColumns": unavailable_at_forecast_columns, + "availableAtForecastColumns": available_at_forecast_columns, + "forecastHorizon": forecast_horizon, + "dataGranularity": { + "unit": data_granularity_unit, + "quantity": data_granularity_count, + }, + "transformations": self._column_transformations, + "trainBudgetMilliNodeHours": budget_milli_node_hours, + # optional inputs + "weightColumn": weight_column, + "contextWindow": context_window, + "quantiles": quantiles, + "validationOptions": validation_options, + "optimizationObjective": self._optimization_objective, + } + + final_export_eval_bq_uri = export_evaluated_data_items_bigquery_destination_uri + if final_export_eval_bq_uri and not final_export_eval_bq_uri.startswith( + "bq://" + ): + final_export_eval_bq_uri = f"bq://{final_export_eval_bq_uri}" + + if export_evaluated_data_items: + training_task_inputs_dict["exportEvaluatedDataItemsConfig"] = { + "destinationBigqueryUri": final_export_eval_bq_uri, + "overrideExistingTable": export_evaluated_data_items_override_destination, + } + + if model_display_name is None: + model_display_name = self._display_name + + model = gca_model.Model(display_name=model_display_name) + + return self._run_job( + training_task_definition=training_task_definition, + training_task_inputs=training_task_inputs_dict, + dataset=dataset, + training_fraction_split=0.8, + validation_fraction_split=0.1, + test_fraction_split=0.1, + predefined_split_column_name=predefined_split_column_name, + model=model, + ) + + @property + def _model_upload_fail_string(self) -> str: + """Helper property for model upload failure.""" + return ( + f"Training Pipeline {self.resource_name} is not configured to upload a " + "Model." + ) + + class AutoMLImageTrainingJob(_TrainingJob): _supported_training_schemas = ( schema.training_job.definition.automl_image_classification, @@ -3686,7 +4138,7 @@ def run( key is not present or has an invalid value, that piece is ignored by the pipeline. - Supported only for tabular Datasets. + Supported only for tabular and time series Datasets. sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will @@ -3784,7 +4236,7 @@ def _run( key is not present or has an invalid value, that piece is ignored by the pipeline. - Supported only for tabular Datasets. + Supported only for tabular and time series Datasets. sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will diff --git a/tests/unit/aiplatform/test_automl_forecasting_training_jobs.py b/tests/unit/aiplatform/test_automl_forecasting_training_jobs.py new file mode 100644 index 0000000000..5d89360566 --- /dev/null +++ b/tests/unit/aiplatform/test_automl_forecasting_training_jobs.py @@ -0,0 +1,488 @@ +import importlib +import pytest +from unittest import mock + +from google.cloud import aiplatform +from google.cloud.aiplatform import datasets +from google.cloud.aiplatform import initializer +from google.cloud.aiplatform import schema +from google.cloud.aiplatform.training_jobs import AutoMLForecastingTrainingJob + +from google.cloud.aiplatform_v1.services.model_service import ( + client as model_service_client, +) +from google.cloud.aiplatform_v1.services.pipeline_service import ( + client as pipeline_service_client, +) +from google.cloud.aiplatform_v1.types import ( + dataset as gca_dataset, + model as gca_model, + pipeline_state as gca_pipeline_state, + training_pipeline as gca_training_pipeline, +) +from google.protobuf import json_format +from google.protobuf import struct_pb2 + +_TEST_BUCKET_NAME = "test-bucket" +_TEST_GCS_PATH_WITHOUT_BUCKET = "path/to/folder" +_TEST_GCS_PATH = f"{_TEST_BUCKET_NAME}/{_TEST_GCS_PATH_WITHOUT_BUCKET}" +_TEST_GCS_PATH_WITH_TRAILING_SLASH = f"{_TEST_GCS_PATH}/" +_TEST_PROJECT = "test-project" + +_TEST_DATASET_DISPLAY_NAME = "test-dataset-display-name" +_TEST_DATASET_NAME = "test-dataset-name" +_TEST_DISPLAY_NAME = "test-display-name" +_TEST_TRAINING_CONTAINER_IMAGE = "gcr.io/test-training/container:image" +_TEST_METADATA_SCHEMA_URI_TIMESERIES = schema.dataset.metadata.time_series +_TEST_METADATA_SCHEMA_URI_NONTIMESERIES = schema.dataset.metadata.image + +_TEST_TRAINING_COLUMN_TRANSFORMATIONS = [ + {"auto": {"column_name": "time"}}, + {"auto": {"column_name": "time_series_identifier"}}, + {"auto": {"column_name": "target"}}, + {"auto": {"column_name": "weight"}}, +] +_TEST_TRAINING_TARGET_COLUMN = "target" +_TEST_TRAINING_TIME_COLUMN = "time" +_TEST_TRAINING_TIME_SERIES_IDENTIFIER_COLUMN = "time_series_identifier" +_TEST_TRAINING_TIME_SERIES_ATTRIBUTE_COLUMNS = [] +_TEST_TRAINING_UNAVAILABLE_AT_FORECAST_COLUMNS = [] +_TEST_TRAINING_AVAILABLE_AT_FORECAST_COLUMNS = [] +_TEST_TRAINING_FORECAST_HORIZON = 10 +_TEST_TRAINING_DATA_GRANULARITY_UNIT = "day" +_TEST_TRAINING_DATA_GRANULARITY_COUNT = 1 +_TEST_TRAINING_CONTEXT_WINDOW = None +_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS = True +_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI = ( + "bq://path.to.table" +) +_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION = False +_TEST_TRAINING_QUANTILES = None +_TEST_TRAINING_VALIDATION_OPTIONS = None +_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS = 1000 +_TEST_TRAINING_WEIGHT_COLUMN = "weight" +_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME = "minimize-rmse" +_TEST_TRAINING_TASK_INPUTS = json_format.ParseDict( + { + # required inputs + "targetColumn": _TEST_TRAINING_TARGET_COLUMN, + "timeColumn": _TEST_TRAINING_TIME_COLUMN, + "timeSeriesIdentifierColumn": _TEST_TRAINING_TIME_SERIES_IDENTIFIER_COLUMN, + "timeSeriesAttributeColumns": _TEST_TRAINING_TIME_SERIES_ATTRIBUTE_COLUMNS, + "unavailableAtForecastColumns": _TEST_TRAINING_UNAVAILABLE_AT_FORECAST_COLUMNS, + "availableAtForecastColumns": _TEST_TRAINING_AVAILABLE_AT_FORECAST_COLUMNS, + "forecastHorizon": _TEST_TRAINING_FORECAST_HORIZON, + "dataGranularity": { + "unit": _TEST_TRAINING_DATA_GRANULARITY_UNIT, + "quantity": _TEST_TRAINING_DATA_GRANULARITY_COUNT, + }, + "transformations": _TEST_TRAINING_COLUMN_TRANSFORMATIONS, + "trainBudgetMilliNodeHours": _TEST_TRAINING_BUDGET_MILLI_NODE_HOURS, + # optional inputs + "weightColumn": _TEST_TRAINING_WEIGHT_COLUMN, + "contextWindow": _TEST_TRAINING_CONTEXT_WINDOW, + "exportEvaluatedDataItemsConfig": { + "destinationBigqueryUri": _TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI, + "overrideExistingTable": _TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION, + }, + "quantiles": _TEST_TRAINING_QUANTILES, + "validationOptions": _TEST_TRAINING_VALIDATION_OPTIONS, + "optimizationObjective": _TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME, + }, + struct_pb2.Value(), +) + +_TEST_DATASET_NAME = "test-dataset-name" + +_TEST_MODEL_DISPLAY_NAME = "model-display-name" +_TEST_TRAINING_FRACTION_SPLIT = 0.8 +_TEST_VALIDATION_FRACTION_SPLIT = 0.1 +_TEST_TEST_FRACTION_SPLIT = 0.1 +_TEST_PREDEFINED_SPLIT_COLUMN_NAME = "split" + +_TEST_OUTPUT_PYTHON_PACKAGE_PATH = "gs://test/ouput/python/trainer.tar.gz" + +_TEST_MODEL_NAME = "projects/my-project/locations/us-central1/models/12345" + +_TEST_PIPELINE_RESOURCE_NAME = ( + "projects/my-project/locations/us-central1/trainingPipeline/12345" +) + + +@pytest.fixture +def mock_pipeline_service_create(): + with mock.patch.object( + pipeline_service_client.PipelineServiceClient, "create_training_pipeline" + ) as mock_create_training_pipeline: + mock_create_training_pipeline.return_value = gca_training_pipeline.TrainingPipeline( + name=_TEST_PIPELINE_RESOURCE_NAME, + state=gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED, + model_to_upload=gca_model.Model(name=_TEST_MODEL_NAME), + ) + yield mock_create_training_pipeline + + +@pytest.fixture +def mock_pipeline_service_get(): + with mock.patch.object( + pipeline_service_client.PipelineServiceClient, "get_training_pipeline" + ) as mock_get_training_pipeline: + mock_get_training_pipeline.return_value = gca_training_pipeline.TrainingPipeline( + name=_TEST_PIPELINE_RESOURCE_NAME, + state=gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED, + model_to_upload=gca_model.Model(name=_TEST_MODEL_NAME), + ) + yield mock_get_training_pipeline + + +@pytest.fixture +def mock_pipeline_service_create_and_get_with_fail(): + with mock.patch.object( + pipeline_service_client.PipelineServiceClient, "create_training_pipeline" + ) as mock_create_training_pipeline: + mock_create_training_pipeline.return_value = gca_training_pipeline.TrainingPipeline( + name=_TEST_PIPELINE_RESOURCE_NAME, + state=gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING, + ) + + with mock.patch.object( + pipeline_service_client.PipelineServiceClient, "get_training_pipeline" + ) as mock_get_training_pipeline: + mock_get_training_pipeline.return_value = gca_training_pipeline.TrainingPipeline( + name=_TEST_PIPELINE_RESOURCE_NAME, + state=gca_pipeline_state.PipelineState.PIPELINE_STATE_FAILED, + ) + + yield mock_create_training_pipeline, mock_get_training_pipeline + + +@pytest.fixture +def mock_model_service_get(): + with mock.patch.object( + model_service_client.ModelServiceClient, "get_model" + ) as mock_get_model: + mock_get_model.return_value = gca_model.Model() + yield mock_get_model + + +@pytest.fixture +def mock_dataset_time_series(): + ds = mock.MagicMock(datasets.TimeSeriesDataset) + ds.name = _TEST_DATASET_NAME + ds._latest_future = None + ds._exception = None + ds._gca_resource = gca_dataset.Dataset( + display_name=_TEST_DATASET_DISPLAY_NAME, + metadata_schema_uri=_TEST_METADATA_SCHEMA_URI_TIMESERIES, + labels={}, + name=_TEST_DATASET_NAME, + metadata={}, + ) + return ds + + +@pytest.fixture +def mock_dataset_nontimeseries(): + ds = mock.MagicMock(datasets.ImageDataset) + ds.name = _TEST_DATASET_NAME + ds._latest_future = None + ds._exception = None + ds._gca_resource = gca_dataset.Dataset( + display_name=_TEST_DATASET_DISPLAY_NAME, + metadata_schema_uri=_TEST_METADATA_SCHEMA_URI_NONTIMESERIES, + labels={}, + name=_TEST_DATASET_NAME, + metadata={}, + ) + return ds + + +class TestAutoMLForecastingTrainingJob: + def setup_method(self): + importlib.reload(initializer) + importlib.reload(aiplatform) + + def teardown_method(self): + initializer.global_pool.shutdown(wait=True) + + @pytest.mark.parametrize("sync", [True, False]) + def test_run_call_pipeline_service_create( + self, + mock_pipeline_service_create, + mock_pipeline_service_get, + mock_dataset_time_series, + mock_model_service_get, + sync, + ): + aiplatform.init(project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME) + + job = AutoMLForecastingTrainingJob( + display_name=_TEST_DISPLAY_NAME, + optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME, + column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS, + ) + + model_from_job = job.run( + dataset=mock_dataset_time_series, + target_column=_TEST_TRAINING_TARGET_COLUMN, + time_column=_TEST_TRAINING_TIME_COLUMN, + time_series_identifier_column=_TEST_TRAINING_TIME_SERIES_IDENTIFIER_COLUMN, + unavailable_at_forecast_columns=_TEST_TRAINING_UNAVAILABLE_AT_FORECAST_COLUMNS, + available_at_forecast_columns=_TEST_TRAINING_AVAILABLE_AT_FORECAST_COLUMNS, + forecast_horizon=_TEST_TRAINING_FORECAST_HORIZON, + data_granularity_unit=_TEST_TRAINING_DATA_GRANULARITY_UNIT, + data_granularity_count=_TEST_TRAINING_DATA_GRANULARITY_COUNT, + model_display_name=_TEST_MODEL_DISPLAY_NAME, + predefined_split_column_name=_TEST_PREDEFINED_SPLIT_COLUMN_NAME, + weight_column=_TEST_TRAINING_WEIGHT_COLUMN, + time_series_attribute_columns=_TEST_TRAINING_TIME_SERIES_ATTRIBUTE_COLUMNS, + context_window=_TEST_TRAINING_CONTEXT_WINDOW, + budget_milli_node_hours=_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS, + export_evaluated_data_items=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS, + export_evaluated_data_items_bigquery_destination_uri=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI, + export_evaluated_data_items_override_destination=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION, + quantiles=_TEST_TRAINING_QUANTILES, + validation_options=_TEST_TRAINING_VALIDATION_OPTIONS, + sync=sync, + ) + + if not sync: + model_from_job.wait() + + true_fraction_split = gca_training_pipeline.FractionSplit( + training_fraction=_TEST_TRAINING_FRACTION_SPLIT, + validation_fraction=_TEST_VALIDATION_FRACTION_SPLIT, + test_fraction=_TEST_TEST_FRACTION_SPLIT, + ) + + true_managed_model = gca_model.Model(display_name=_TEST_MODEL_DISPLAY_NAME) + + true_input_data_config = gca_training_pipeline.InputDataConfig( + fraction_split=true_fraction_split, + predefined_split=gca_training_pipeline.PredefinedSplit( + key=_TEST_PREDEFINED_SPLIT_COLUMN_NAME + ), + dataset_id=mock_dataset_time_series.name, + ) + + true_training_pipeline = gca_training_pipeline.TrainingPipeline( + display_name=_TEST_DISPLAY_NAME, + training_task_definition=schema.training_job.definition.automl_forecasting, + training_task_inputs=_TEST_TRAINING_TASK_INPUTS, + model_to_upload=true_managed_model, + input_data_config=true_input_data_config, + ) + + mock_pipeline_service_create.assert_called_once_with( + parent=initializer.global_config.common_location_path(), + training_pipeline=true_training_pipeline, + ) + + assert job._gca_resource is mock_pipeline_service_get.return_value + + mock_model_service_get.assert_called_once_with(name=_TEST_MODEL_NAME) + + assert model_from_job._gca_resource is mock_model_service_get.return_value + + assert job.get_model()._gca_resource is mock_model_service_get.return_value + + assert not job.has_failed + + assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + + @pytest.mark.usefixtures("mock_pipeline_service_get") + @pytest.mark.parametrize("sync", [True, False]) + def test_run_call_pipeline_if_no_model_display_name( + self, + mock_pipeline_service_create, + mock_dataset_time_series, + mock_model_service_get, + sync, + ): + aiplatform.init(project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME) + + job = AutoMLForecastingTrainingJob( + display_name=_TEST_DISPLAY_NAME, + optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME, + column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS, + ) + + model_from_job = job.run( + dataset=mock_dataset_time_series, + target_column=_TEST_TRAINING_TARGET_COLUMN, + time_column=_TEST_TRAINING_TIME_COLUMN, + time_series_identifier_column=_TEST_TRAINING_TIME_SERIES_IDENTIFIER_COLUMN, + unavailable_at_forecast_columns=_TEST_TRAINING_UNAVAILABLE_AT_FORECAST_COLUMNS, + available_at_forecast_columns=_TEST_TRAINING_AVAILABLE_AT_FORECAST_COLUMNS, + forecast_horizon=_TEST_TRAINING_FORECAST_HORIZON, + data_granularity_unit=_TEST_TRAINING_DATA_GRANULARITY_UNIT, + data_granularity_count=_TEST_TRAINING_DATA_GRANULARITY_COUNT, + weight_column=_TEST_TRAINING_WEIGHT_COLUMN, + time_series_attribute_columns=_TEST_TRAINING_TIME_SERIES_ATTRIBUTE_COLUMNS, + context_window=_TEST_TRAINING_CONTEXT_WINDOW, + budget_milli_node_hours=_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS, + export_evaluated_data_items=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS, + export_evaluated_data_items_bigquery_destination_uri=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI, + export_evaluated_data_items_override_destination=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION, + quantiles=_TEST_TRAINING_QUANTILES, + validation_options=_TEST_TRAINING_VALIDATION_OPTIONS, + sync=sync, + ) + + if not sync: + model_from_job.wait() + + true_fraction_split = gca_training_pipeline.FractionSplit( + training_fraction=_TEST_TRAINING_FRACTION_SPLIT, + validation_fraction=_TEST_VALIDATION_FRACTION_SPLIT, + test_fraction=_TEST_TEST_FRACTION_SPLIT, + ) + + # Test that if defaults to the job display name + true_managed_model = gca_model.Model(display_name=_TEST_DISPLAY_NAME) + + true_input_data_config = gca_training_pipeline.InputDataConfig( + fraction_split=true_fraction_split, + dataset_id=mock_dataset_time_series.name, + ) + + true_training_pipeline = gca_training_pipeline.TrainingPipeline( + display_name=_TEST_DISPLAY_NAME, + training_task_definition=schema.training_job.definition.automl_forecasting, + training_task_inputs=_TEST_TRAINING_TASK_INPUTS, + model_to_upload=true_managed_model, + input_data_config=true_input_data_config, + ) + + mock_pipeline_service_create.assert_called_once_with( + parent=initializer.global_config.common_location_path(), + training_pipeline=true_training_pipeline, + ) + + @pytest.mark.usefixtures( + "mock_pipeline_service_create", + "mock_pipeline_service_get", + "mock_model_service_get", + ) + @pytest.mark.parametrize("sync", [True, False]) + def test_run_called_twice_raises( + self, mock_dataset_time_series, sync, + ): + aiplatform.init(project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME) + + job = AutoMLForecastingTrainingJob( + display_name=_TEST_DISPLAY_NAME, + optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME, + column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS, + ) + + job.run( + dataset=mock_dataset_time_series, + target_column=_TEST_TRAINING_TARGET_COLUMN, + time_column=_TEST_TRAINING_TIME_COLUMN, + time_series_identifier_column=_TEST_TRAINING_TIME_SERIES_IDENTIFIER_COLUMN, + unavailable_at_forecast_columns=_TEST_TRAINING_UNAVAILABLE_AT_FORECAST_COLUMNS, + available_at_forecast_columns=_TEST_TRAINING_AVAILABLE_AT_FORECAST_COLUMNS, + forecast_horizon=_TEST_TRAINING_FORECAST_HORIZON, + data_granularity_unit=_TEST_TRAINING_DATA_GRANULARITY_UNIT, + data_granularity_count=_TEST_TRAINING_DATA_GRANULARITY_COUNT, + model_display_name=_TEST_MODEL_DISPLAY_NAME, + weight_column=_TEST_TRAINING_WEIGHT_COLUMN, + time_series_attribute_columns=_TEST_TRAINING_TIME_SERIES_ATTRIBUTE_COLUMNS, + context_window=_TEST_TRAINING_CONTEXT_WINDOW, + budget_milli_node_hours=_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS, + export_evaluated_data_items=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS, + export_evaluated_data_items_bigquery_destination_uri=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI, + export_evaluated_data_items_override_destination=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION, + quantiles=_TEST_TRAINING_QUANTILES, + validation_options=_TEST_TRAINING_VALIDATION_OPTIONS, + sync=sync, + ) + + with pytest.raises(RuntimeError): + job.run( + dataset=mock_dataset_time_series, + target_column=_TEST_TRAINING_TARGET_COLUMN, + time_column=_TEST_TRAINING_TIME_COLUMN, + time_series_identifier_column=_TEST_TRAINING_TIME_SERIES_IDENTIFIER_COLUMN, + unavailable_at_forecast_columns=_TEST_TRAINING_UNAVAILABLE_AT_FORECAST_COLUMNS, + available_at_forecast_columns=_TEST_TRAINING_AVAILABLE_AT_FORECAST_COLUMNS, + forecast_horizon=_TEST_TRAINING_FORECAST_HORIZON, + data_granularity_unit=_TEST_TRAINING_DATA_GRANULARITY_UNIT, + data_granularity_count=_TEST_TRAINING_DATA_GRANULARITY_COUNT, + model_display_name=_TEST_MODEL_DISPLAY_NAME, + weight_column=_TEST_TRAINING_WEIGHT_COLUMN, + time_series_attribute_columns=_TEST_TRAINING_TIME_SERIES_ATTRIBUTE_COLUMNS, + context_window=_TEST_TRAINING_CONTEXT_WINDOW, + budget_milli_node_hours=_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS, + export_evaluated_data_items=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS, + export_evaluated_data_items_bigquery_destination_uri=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI, + export_evaluated_data_items_override_destination=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION, + quantiles=_TEST_TRAINING_QUANTILES, + validation_options=_TEST_TRAINING_VALIDATION_OPTIONS, + sync=sync, + ) + + @pytest.mark.parametrize("sync", [True, False]) + def test_run_raises_if_pipeline_fails( + self, + mock_pipeline_service_create_and_get_with_fail, + mock_dataset_time_series, + sync, + ): + + aiplatform.init(project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME) + + job = AutoMLForecastingTrainingJob( + display_name=_TEST_DISPLAY_NAME, + optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME, + column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS, + ) + + with pytest.raises(RuntimeError): + job.run( + dataset=mock_dataset_time_series, + target_column=_TEST_TRAINING_TARGET_COLUMN, + time_column=_TEST_TRAINING_TIME_COLUMN, + time_series_identifier_column=_TEST_TRAINING_TIME_SERIES_IDENTIFIER_COLUMN, + unavailable_at_forecast_columns=_TEST_TRAINING_UNAVAILABLE_AT_FORECAST_COLUMNS, + available_at_forecast_columns=_TEST_TRAINING_AVAILABLE_AT_FORECAST_COLUMNS, + forecast_horizon=_TEST_TRAINING_FORECAST_HORIZON, + data_granularity_unit=_TEST_TRAINING_DATA_GRANULARITY_UNIT, + data_granularity_count=_TEST_TRAINING_DATA_GRANULARITY_COUNT, + model_display_name=_TEST_MODEL_DISPLAY_NAME, + weight_column=_TEST_TRAINING_WEIGHT_COLUMN, + time_series_attribute_columns=_TEST_TRAINING_TIME_SERIES_ATTRIBUTE_COLUMNS, + context_window=_TEST_TRAINING_CONTEXT_WINDOW, + budget_milli_node_hours=_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS, + export_evaluated_data_items=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS, + export_evaluated_data_items_bigquery_destination_uri=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI, + export_evaluated_data_items_override_destination=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION, + quantiles=_TEST_TRAINING_QUANTILES, + validation_options=_TEST_TRAINING_VALIDATION_OPTIONS, + sync=sync, + ) + + if not sync: + job.wait() + + with pytest.raises(RuntimeError): + job.get_model() + + def test_raises_before_run_is_called(self, mock_pipeline_service_create): + aiplatform.init(project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME) + + job = AutoMLForecastingTrainingJob( + display_name=_TEST_DISPLAY_NAME, + optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME, + column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS, + ) + + with pytest.raises(RuntimeError): + job.get_model() + + with pytest.raises(RuntimeError): + job.has_failed + + with pytest.raises(RuntimeError): + job.state