Skip to content

Commit

Permalink
Add system tests for Vertex AI operators in new approach (#27053)
Browse files Browse the repository at this point in the history
  • Loading branch information
VladaZakharova committed Oct 31, 2022
1 parent 9c737f6 commit 7a7c5f8
Show file tree
Hide file tree
Showing 33 changed files with 3,053 additions and 123 deletions.
75 changes: 55 additions & 20 deletions airflow/providers/google/cloud/hooks/vertex_ai/auto_ml.py
Expand Up @@ -259,6 +259,11 @@ def extract_model_id(obj: dict) -> str:
"""Returns unique id of the Model."""
return obj["name"].rpartition("/")[-1]

@staticmethod
def extract_training_id(resource_name: str) -> str:
"""Returns unique id of the Training pipeline."""
return resource_name.rpartition("/")[-1]

def wait_for_operation(self, operation: Operation, timeout: float | None = None):
"""Waits for long-lasting operation to complete."""
try:
Expand Down Expand Up @@ -303,7 +308,7 @@ def create_auto_ml_tabular_training_job(
export_evaluated_data_items_bigquery_destination_uri: str | None = None,
export_evaluated_data_items_override_destination: bool = False,
sync: bool = True,
) -> models.Model:
) -> tuple[models.Model | None, str]:
"""
Create an AutoML Tabular Training Job.
Expand Down Expand Up @@ -488,9 +493,15 @@ def create_auto_ml_tabular_training_job(
export_evaluated_data_items_override_destination=export_evaluated_data_items_override_destination,
sync=sync,
)
model.wait()

return model
training_id = self.extract_training_id(self._job.resource_name)
if model:
model.wait()
else:
self.log.warning(
"Training did not produce a Managed Model returning None. Training Pipeline is not "
"configured to upload a Model."
)
return model, training_id

@GoogleBaseHook.fallback_to_default_project_id
def create_auto_ml_forecasting_training_job(
Expand Down Expand Up @@ -529,7 +540,7 @@ def create_auto_ml_forecasting_training_job(
model_display_name: str | None = None,
model_labels: dict[str, str] | None = None,
sync: bool = True,
) -> models.Model:
) -> tuple[models.Model | None, str]:
"""
Create an AutoML Forecasting Training Job.
Expand Down Expand Up @@ -715,9 +726,15 @@ def create_auto_ml_forecasting_training_job(
model_labels=model_labels,
sync=sync,
)
model.wait()

return model
training_id = self.extract_training_id(self._job.resource_name)
if model:
model.wait()
else:
self.log.warning(
"Training did not produce a Managed Model returning None. Training Pipeline is not "
"configured to upload a Model."
)
return model, training_id

@GoogleBaseHook.fallback_to_default_project_id
def create_auto_ml_image_training_job(
Expand All @@ -744,7 +761,7 @@ def create_auto_ml_image_training_job(
model_labels: dict[str, str] | None = None,
disable_early_stopping: bool = False,
sync: bool = True,
) -> models.Model:
) -> tuple[models.Model | None, str]:
"""
Create an AutoML Image Training Job.
Expand Down Expand Up @@ -885,9 +902,15 @@ def create_auto_ml_image_training_job(
disable_early_stopping=disable_early_stopping,
sync=sync,
)
model.wait()

return model
training_id = self.extract_training_id(self._job.resource_name)
if model:
model.wait()
else:
self.log.warning(
"Training did not produce a Managed Model returning None. AutoML Image Training "
"Pipeline is not configured to upload a Model."
)
return model, training_id

@GoogleBaseHook.fallback_to_default_project_id
def create_auto_ml_text_training_job(
Expand All @@ -911,7 +934,7 @@ def create_auto_ml_text_training_job(
model_display_name: str | None = None,
model_labels: dict[str, str] | None = None,
sync: bool = True,
) -> models.Model:
) -> tuple[models.Model | None, str]:
"""
Create an AutoML Text Training Job.
Expand Down Expand Up @@ -1016,9 +1039,15 @@ def create_auto_ml_text_training_job(
model_labels=model_labels,
sync=sync,
)
model.wait()

return model
training_id = self.extract_training_id(self._job.resource_name)
if model:
model.wait()
else:
self.log.warning(
"Training did not produce a Managed Model returning None. AutoML Text Training "
"Pipeline is not configured to upload a Model."
)
return model, training_id

@GoogleBaseHook.fallback_to_default_project_id
def create_auto_ml_video_training_job(
Expand All @@ -1039,7 +1068,7 @@ def create_auto_ml_video_training_job(
model_display_name: str | None = None,
model_labels: dict[str, str] | None = None,
sync: bool = True,
) -> models.Model:
) -> tuple[models.Model | None, str]:
"""
Create an AutoML Video Training Job.
Expand Down Expand Up @@ -1141,9 +1170,15 @@ def create_auto_ml_video_training_job(
model_labels=model_labels,
sync=sync,
)
model.wait()

return model
training_id = self.extract_training_id(self._job.resource_name)
if model:
model.wait()
else:
self.log.warning(
"Training did not produce a Managed Model returning None. AutoML Video Training "
"Pipeline is not configured to upload a Model."
)
return model, training_id

@GoogleBaseHook.fallback_to_default_project_id
def delete_training_pipeline(
Expand Down
30 changes: 19 additions & 11 deletions airflow/providers/google/cloud/hooks/vertex_ai/custom_job.py
Expand Up @@ -247,6 +247,11 @@ def extract_training_id(resource_name: str) -> str:
"""Returns unique id of the Training pipeline."""
return resource_name.rpartition("/")[-1]

@staticmethod
def extract_custom_job_id(custom_job_name: str) -> str:
"""Returns unique id of the Custom Job pipeline."""
return custom_job_name.rpartition("/")[-1]

def wait_for_operation(self, operation: Operation, timeout: float | None = None):
"""Waits for long-lasting operation to complete."""
try:
Expand Down Expand Up @@ -292,7 +297,7 @@ def _run_job(
timestamp_split_column_name: str | None = None,
tensorboard: str | None = None,
sync=True,
) -> tuple[models.Model | None, str]:
) -> tuple[models.Model | None, str, str]:
"""Run Job for training pipeline"""
model = job.run(
dataset=dataset,
Expand Down Expand Up @@ -323,6 +328,9 @@ def _run_job(
sync=sync,
)
training_id = self.extract_training_id(job.resource_name)
custom_job_id = self.extract_custom_job_id(
job.gca_resource.training_task_metadata.get("backingCustomJob")
)
if model:
model.wait()
else:
Expand All @@ -332,7 +340,7 @@ def _run_job(
"model_serving_container_image_uri and model_display_name passed in. "
"Ensure that your training script saves to model to os.environ['AIP_MODEL_DIR']."
)
return model, training_id
return model, training_id, custom_job_id

@GoogleBaseHook.fallback_to_default_project_id
def cancel_pipeline_job(
Expand Down Expand Up @@ -613,7 +621,7 @@ def create_custom_container_training_job(
timestamp_split_column_name: str | None = None,
tensorboard: str | None = None,
sync=True,
) -> tuple[models.Model | None, str]:
) -> tuple[models.Model | None, str, str]:
"""
Create Custom Container Training Job
Expand Down Expand Up @@ -885,7 +893,7 @@ def create_custom_container_training_job(
if not self._job:
raise AirflowException("CustomJob was not created")

model, training_id = self._run_job(
model, training_id, custom_job_id = self._run_job(
job=self._job,
dataset=dataset,
annotation_schema_uri=annotation_schema_uri,
Expand Down Expand Up @@ -915,7 +923,7 @@ def create_custom_container_training_job(
sync=sync,
)

return model, training_id
return model, training_id, custom_job_id

@GoogleBaseHook.fallback_to_default_project_id
def create_custom_python_package_training_job(
Expand Down Expand Up @@ -971,7 +979,7 @@ def create_custom_python_package_training_job(
timestamp_split_column_name: str | None = None,
tensorboard: str | None = None,
sync=True,
) -> tuple[models.Model | None, str]:
) -> tuple[models.Model | None, str, str]:
"""
Create Custom Python Package Training Job
Expand Down Expand Up @@ -1243,7 +1251,7 @@ def create_custom_python_package_training_job(
if not self._job:
raise AirflowException("CustomJob was not created")

model, training_id = self._run_job(
model, training_id, custom_job_id = self._run_job(
job=self._job,
dataset=dataset,
annotation_schema_uri=annotation_schema_uri,
Expand Down Expand Up @@ -1273,7 +1281,7 @@ def create_custom_python_package_training_job(
sync=sync,
)

return model, training_id
return model, training_id, custom_job_id

@GoogleBaseHook.fallback_to_default_project_id
def create_custom_training_job(
Expand Down Expand Up @@ -1329,7 +1337,7 @@ def create_custom_training_job(
timestamp_split_column_name: str | None = None,
tensorboard: str | None = None,
sync=True,
) -> tuple[models.Model | None, str]:
) -> tuple[models.Model | None, str, str]:
"""
Create Custom Training Job
Expand Down Expand Up @@ -1601,7 +1609,7 @@ def create_custom_training_job(
if not self._job:
raise AirflowException("CustomJob was not created")

model, training_id = self._run_job(
model, training_id, custom_job_id = self._run_job(
job=self._job,
dataset=dataset,
annotation_schema_uri=annotation_schema_uri,
Expand Down Expand Up @@ -1631,7 +1639,7 @@ def create_custom_training_job(
sync=sync,
)

return model, training_id
return model, training_id, custom_job_id

@GoogleBaseHook.fallback_to_default_project_id
def delete_pipeline_job(
Expand Down

0 comments on commit 7a7c5f8

Please sign in to comment.