From 437d4e44676fe65ce9e9f7b99fb85740332071a7 Mon Sep 17 00:00:00 2001 From: VladaZakharova <80038284+VladaZakharova@users.noreply.github.com> Date: Wed, 17 Jan 2024 15:03:09 +0100 Subject: [PATCH] Deprecate AutoMLTrainModelOperator for Vision and Video (#36473) Co-authored-by: Ulada Zakharava --- .../google/cloud/operators/automl.py | 20 +- .../operators/cloud/automl.rst | 37 +++- .../example_automl_nl_text_classification.py | 7 +- .../example_automl_nl_text_extraction.py | 7 +- .../example_automl_nl_text_sentiment.py | 7 +- .../example_automl_video_classification.py | 169 +++++++++++++++ ...utoml_video_intelligence_classification.py | 155 -------------- ...mple_automl_video_intelligence_tracking.py | 155 -------------- .../automl/example_automl_video_tracking.py | 169 +++++++++++++++ .../example_automl_vision_classification.py | 202 ++++++++---------- .../example_automl_vision_object_detection.py | 159 +++++++------- 11 files changed, 562 insertions(+), 525 deletions(-) create mode 100644 tests/system/providers/google/cloud/automl/example_automl_video_classification.py delete mode 100644 tests/system/providers/google/cloud/automl/example_automl_video_intelligence_classification.py delete mode 100644 tests/system/providers/google/cloud/automl/example_automl_video_intelligence_tracking.py create mode 100644 tests/system/providers/google/cloud/automl/example_automl_video_tracking.py diff --git a/airflow/providers/google/cloud/operators/automl.py b/airflow/providers/google/cloud/operators/automl.py index aee9fc9631774..a30bd766453da 100644 --- a/airflow/providers/google/cloud/operators/automl.py +++ b/airflow/providers/google/cloud/operators/automl.py @@ -118,19 +118,17 @@ def __init__( self.impersonation_chain = impersonation_chain def execute(self, context: Context): - # Output warning if running AutoML Natural Language prediction job - automl_nl_model_keys = [ - "text_classification_model_metadata", - "text_extraction_model_metadata", - "text_sentiment_dataset_metadata", - ] - if any(key in automl_nl_model_keys for key in self.model): + # Output warning if running not AutoML Translation prediction job + if "translation_model_metadata" not in self.model: warnings.warn( - "AutoMLTrainModelOperator for text prediction is deprecated. All the functionality of legacy " - "AutoML Natural Language and new features are available on the Vertex AI platform. " - "Please use `CreateAutoMLTextTrainingJobOperator`", + "AutoMLTrainModelOperator for text, image and video prediction is deprecated. " + "All the functionality of legacy " + "AutoML Natural Language, Vision and Video Intelligence and new features are available " + "on the Vertex AI platform. " + "Please use `CreateAutoMLTextTrainingJobOperator`, `CreateAutoMLImageTrainingJobOperator` or" + " `CreateAutoMLVideoTrainingJobOperator` from VertexAI.", AirflowProviderDeprecationWarning, - stacklevel=2, + stacklevel=3, ) hook = CloudAutoMLHook( gcp_conn_id=self.gcp_conn_id, diff --git a/docs/apache-airflow-providers-google/operators/cloud/automl.rst b/docs/apache-airflow-providers-google/operators/cloud/automl.rst index b283f51c0726c..34324ea60d914 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/automl.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/automl.rst @@ -102,11 +102,38 @@ To create a Google AutoML model you can use The operator will wait for the operation to complete. Additionally the operator returns the id of model in :ref:`XCom ` under ``model_id`` key. -This Operator is deprecated when running for text prediction and will be removed soon. -All the functionality of legacy AutoML Natural Language and new features are available on the -Vertex AI platform. Please use -:class:`~airflow.providers.google.cloud.operators.vertex_ai.auto_ml.CreateAutoMLTextTrainingJobOperator`. -When running Vertex AI Operator for training dat, please ensure that your data is correctly stored in Vertex AI +This Operator is deprecated when running for text, video and vision prediction and will be removed soon. +All the functionality of legacy AutoML Natural Language, Vision, Video Intelligence and new features are +available on the Vertex AI platform. Please use +:class:`~airflow.providers.google.cloud.operators.vertex_ai.auto_ml.CreateAutoMLTextTrainingJobOperator`, +:class:`~airflow.providers.google.cloud.operators.vertex_ai.auto_ml.CreateAutoMLImageTrainingJobOperator` or +:class:`~airflow.providers.google.cloud.operators.vertex_ai.auto_ml.CreateAutoMLVideoTrainingJobOperator`. + +You can find example on how to use VertexAI operators for AutoML Natural Language classification here: + +.. exampleinclude:: /../../tests/system/providers/google/cloud/automl/example_automl_nl_text_classification.py + :language: python + :dedent: 4 + :start-after: [START howto_cloud_create_text_classification_training_job_operator] + :end-before: [END howto_cloud_create_text_classification_training_job_operator] + +Additionally, you can find example on how to use VertexAI operators for AutoML Vision classification here: + +.. exampleinclude:: /../../tests/system/providers/google/cloud/automl/example_automl_vision_classification.py + :language: python + :dedent: 4 + :start-after: [START howto_cloud_create_image_classification_training_job_operator] + :end-before: [END howto_cloud_create_image_classification_training_job_operator] + +Example on how to use VertexAI operators for AutoML Video Intelligence classification you can find here: + +.. exampleinclude:: /../../tests/system/providers/google/cloud/automl/example_automl_video_classification.py + :language: python + :dedent: 4 + :start-after: [START howto_cloud_create_video_classification_training_job_operator] + :end-before: [END howto_cloud_create_video_classification_training_job_operator] + +When running Vertex AI Operator for training data, please ensure that your data is correctly stored in Vertex AI datasets. To create and import data to the dataset please use :class:`~airflow.providers.google.cloud.operators.vertex_ai.dataset.CreateDatasetOperator` and diff --git a/tests/system/providers/google/cloud/automl/example_automl_nl_text_classification.py b/tests/system/providers/google/cloud/automl/example_automl_nl_text_classification.py index e43463063cf8e..54b2fa1330096 100644 --- a/tests/system/providers/google/cloud/automl/example_automl_nl_text_classification.py +++ b/tests/system/providers/google/cloud/automl/example_automl_nl_text_classification.py @@ -22,13 +22,11 @@ import os from datetime import datetime -from typing import cast from google.cloud.aiplatform import schema from google.protobuf.struct_pb2 import Value from airflow.models.dag import DAG -from airflow.models.xcom_arg import XComArg from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook from airflow.providers.google.cloud.operators.gcs import ( GCSCreateBucketOperator, @@ -114,7 +112,7 @@ import_configs=DATA_CONFIG, ) - # [START howto_operator_automl_create_model] + # [START howto_cloud_create_text_classification_training_job_operator] create_clss_training_job = CreateAutoMLTextTrainingJobOperator( task_id="create_clss_training_job", display_name=TEXT_CLSS_DISPLAY_NAME, @@ -129,8 +127,7 @@ region=GCP_AUTOML_LOCATION, project_id=GCP_PROJECT_ID, ) - # [END howto_operator_automl_create_model] - model_id = cast(str, XComArg(create_clss_training_job, key="model_id")) + # [END howto_cloud_create_text_classification_training_job_operator] delete_clss_training_job = DeleteAutoMLTrainingJobOperator( task_id="delete_clss_training_job", diff --git a/tests/system/providers/google/cloud/automl/example_automl_nl_text_extraction.py b/tests/system/providers/google/cloud/automl/example_automl_nl_text_extraction.py index 3575eb6b38a7f..47727cb80d688 100644 --- a/tests/system/providers/google/cloud/automl/example_automl_nl_text_extraction.py +++ b/tests/system/providers/google/cloud/automl/example_automl_nl_text_extraction.py @@ -22,13 +22,11 @@ import os from datetime import datetime -from typing import cast from google.cloud.aiplatform import schema from google.protobuf.struct_pb2 import Value from airflow.models.dag import DAG -from airflow.models.xcom_arg import XComArg from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook from airflow.providers.google.cloud.operators.gcs import ( GCSCreateBucketOperator, @@ -114,7 +112,7 @@ import_configs=DATA_CONFIG, ) - # [START howto_operator_automl_create_model] + # [START howto_cloud_create_text_extraction_training_job_operator] create_extr_training_job = CreateAutoMLTextTrainingJobOperator( task_id="create_extr_training_job", display_name=TEXT_EXTR_DISPLAY_NAME, @@ -129,8 +127,7 @@ region=GCP_AUTOML_LOCATION, project_id=GCP_PROJECT_ID, ) - # [END howto_operator_automl_create_model] - model_id = cast(str, XComArg(create_extr_training_job, key="model_id")) + # [END howto_cloud_create_text_extraction_training_job_operator] delete_extr_training_job = DeleteAutoMLTrainingJobOperator( task_id="delete_extr_training_job", diff --git a/tests/system/providers/google/cloud/automl/example_automl_nl_text_sentiment.py b/tests/system/providers/google/cloud/automl/example_automl_nl_text_sentiment.py index f4e16c39088f9..b593e6043bce6 100644 --- a/tests/system/providers/google/cloud/automl/example_automl_nl_text_sentiment.py +++ b/tests/system/providers/google/cloud/automl/example_automl_nl_text_sentiment.py @@ -22,13 +22,11 @@ import os from datetime import datetime -from typing import cast from google.cloud.aiplatform import schema from google.protobuf.struct_pb2 import Value from airflow.models.dag import DAG -from airflow.models.xcom_arg import XComArg from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook from airflow.providers.google.cloud.operators.gcs import ( GCSCreateBucketOperator, @@ -115,7 +113,7 @@ import_configs=DATA_CONFIG, ) - # [START howto_operator_automl_create_model] + # [START howto_cloud_create_text_sentiment_training_job_operator] create_sent_training_job = CreateAutoMLTextTrainingJobOperator( task_id="create_sent_training_job", display_name=TEXT_SENT_DISPLAY_NAME, @@ -131,8 +129,7 @@ region=GCP_AUTOML_LOCATION, project_id=GCP_PROJECT_ID, ) - # [END howto_operator_automl_create_model] - model_id = cast(str, XComArg(create_sent_training_job, key="model_id")) + # [END howto_cloud_create_text_sentiment_training_job_operator] delete_sent_training_job = DeleteAutoMLTrainingJobOperator( task_id="delete_sent_training_job", diff --git a/tests/system/providers/google/cloud/automl/example_automl_video_classification.py b/tests/system/providers/google/cloud/automl/example_automl_video_classification.py new file mode 100644 index 0000000000000..efec1185a0d23 --- /dev/null +++ b/tests/system/providers/google/cloud/automl/example_automl_video_classification.py @@ -0,0 +1,169 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG that uses Google AutoML services. +""" +from __future__ import annotations + +import os +from datetime import datetime + +from google.cloud.aiplatform import schema +from google.protobuf.struct_pb2 import Value + +from airflow.models.dag import DAG +from airflow.providers.google.cloud.operators.gcs import ( + GCSCreateBucketOperator, + GCSDeleteBucketOperator, + GCSSynchronizeBucketsOperator, +) +from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import ( + CreateAutoMLVideoTrainingJobOperator, + DeleteAutoMLTrainingJobOperator, +) +from airflow.providers.google.cloud.operators.vertex_ai.dataset import ( + CreateDatasetOperator, + DeleteDatasetOperator, + ImportDataOperator, +) +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") +DAG_ID = "example_automl_video_clss" +REGION = "us-central1" +VIDEO_DISPLAY_NAME = f"auto-ml-video-clss-{ENV_ID}" +MODEL_DISPLAY_NAME = f"auto-ml-video-clss-model-{ENV_ID}" + +RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" +VIDEO_GCS_BUCKET_NAME = f"bucket_video_clss_{ENV_ID}".replace("_", "-") + +VIDEO_DATASET = { + "display_name": f"video-dataset-{ENV_ID}", + "metadata_schema_uri": schema.dataset.metadata.video, + "metadata": Value(string_value="video-dataset"), +} +VIDEO_DATA_CONFIG = [ + { + "import_schema_uri": schema.dataset.ioformat.video.classification, + "gcs_source": {"uris": [f"gs://{VIDEO_GCS_BUCKET_NAME}/automl/classification.csv"]}, + }, +] + + +# Example DAG for AutoML Video Intelligence Classification +with DAG( + DAG_ID, + schedule="@once", + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["example", "automl", "video", "classification"], +) as dag: + create_bucket = GCSCreateBucketOperator( + task_id="create_bucket", + bucket_name=VIDEO_GCS_BUCKET_NAME, + storage_class="REGIONAL", + location=REGION, + ) + + move_dataset_file = GCSSynchronizeBucketsOperator( + task_id="move_dataset_to_bucket", + source_bucket=RESOURCE_DATA_BUCKET, + source_object="automl/datasets/video", + destination_bucket=VIDEO_GCS_BUCKET_NAME, + destination_object="automl", + recursive=True, + ) + + create_video_dataset = CreateDatasetOperator( + task_id="video_dataset", + dataset=VIDEO_DATASET, + region=REGION, + project_id=PROJECT_ID, + ) + video_dataset_id = create_video_dataset.output["dataset_id"] + + import_video_dataset = ImportDataOperator( + task_id="import_video_data", + dataset_id=video_dataset_id, + region=REGION, + project_id=PROJECT_ID, + import_configs=VIDEO_DATA_CONFIG, + ) + + # [START howto_cloud_create_video_classification_training_job_operator] + create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator( + task_id="auto_ml_video_task", + display_name=VIDEO_DISPLAY_NAME, + prediction_type="classification", + model_type="CLOUD", + dataset_id=video_dataset_id, + model_display_name=MODEL_DISPLAY_NAME, + region=REGION, + project_id=PROJECT_ID, + ) + # [END howto_cloud_create_video_classification_training_job_operator] + + delete_auto_ml_video_training_job = DeleteAutoMLTrainingJobOperator( + task_id="delete_auto_ml_video_training_job", + training_pipeline_id="{{ task_instance.xcom_pull(task_ids='auto_ml_video_task', " + "key='training_id') }}", + region=REGION, + project_id=PROJECT_ID, + trigger_rule=TriggerRule.ALL_DONE, + ) + + delete_video_dataset = DeleteDatasetOperator( + task_id="delete_video_dataset", + dataset_id=video_dataset_id, + region=REGION, + project_id=PROJECT_ID, + trigger_rule=TriggerRule.ALL_DONE, + ) + + delete_bucket = GCSDeleteBucketOperator( + task_id="delete_bucket", + bucket_name=VIDEO_GCS_BUCKET_NAME, + trigger_rule=TriggerRule.ALL_DONE, + ) + + ( + # TEST SETUP + [ + create_bucket >> move_dataset_file, + create_video_dataset, + ] + >> import_video_dataset + # TEST BODY + >> create_auto_ml_video_training_job + # TEST TEARDOWN + >> delete_auto_ml_video_training_job + >> delete_video_dataset + >> delete_bucket + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_classification.py b/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_classification.py deleted file mode 100644 index b88dcebe108fd..0000000000000 --- a/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_classification.py +++ /dev/null @@ -1,155 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -""" -Example Airflow DAG that uses Google AutoML services. -""" -from __future__ import annotations - -import os -from datetime import datetime -from typing import cast - -from airflow.models.dag import DAG -from airflow.models.xcom_arg import XComArg -from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook -from airflow.providers.google.cloud.operators.automl import ( - AutoMLCreateDatasetOperator, - AutoMLDeleteDatasetOperator, - AutoMLDeleteModelOperator, - AutoMLImportDataOperator, - AutoMLTrainModelOperator, -) -from airflow.providers.google.cloud.operators.gcs import ( - GCSCreateBucketOperator, - GCSDeleteBucketOperator, - GCSSynchronizeBucketsOperator, -) -from airflow.utils.trigger_rule import TriggerRule - -ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") -DAG_ID = "example_automl_video_clss" -GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") -GCP_AUTOML_LOCATION = "us-central1" -DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-") -RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" - -MODEL_NAME = "video_clss_test_model" -MODEL = { - "display_name": MODEL_NAME, - "video_classification_model_metadata": {}, -} - -DATASET_NAME = f"ds_video_clss_{ENV_ID}".replace("-", "_") -DATASET = { - "display_name": DATASET_NAME, - "video_classification_dataset_metadata": {}, -} - -AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/video_classification.csv" -IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}} - - -extract_object_id = CloudAutoMLHook.extract_object_id - - -# Example DAG for AutoML Video Intelligence Classification -with DAG( - DAG_ID, - schedule="@once", - start_date=datetime(2021, 1, 1), - catchup=False, - user_defined_macros={"extract_object_id": extract_object_id}, - tags=["example", "automl", "video-clss"], -) as dag: - create_bucket = GCSCreateBucketOperator( - task_id="create_bucket", - bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME, - storage_class="REGIONAL", - location=GCP_AUTOML_LOCATION, - ) - - move_dataset_file = GCSSynchronizeBucketsOperator( - task_id="move_dataset_to_bucket", - source_bucket=RESOURCE_DATA_BUCKET, - source_object="automl/datasets/video", - destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME, - destination_object="automl", - recursive=True, - ) - - create_dataset = AutoMLCreateDatasetOperator( - task_id="create_dataset", dataset=DATASET, location=GCP_AUTOML_LOCATION - ) - - dataset_id = cast(str, XComArg(create_dataset, key="dataset_id")) - MODEL["dataset_id"] = dataset_id - - import_dataset = AutoMLImportDataOperator( - task_id="import_dataset", - dataset_id=dataset_id, - location=GCP_AUTOML_LOCATION, - input_config=IMPORT_INPUT_CONFIG, - ) - - MODEL["dataset_id"] = dataset_id - - create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION) - model_id = cast(str, XComArg(create_model, key="model_id")) - - delete_model = AutoMLDeleteModelOperator( - task_id="delete_model", - model_id=model_id, - location=GCP_AUTOML_LOCATION, - project_id=GCP_PROJECT_ID, - ) - - delete_dataset = AutoMLDeleteDatasetOperator( - task_id="delete_dataset", - dataset_id=dataset_id, - location=GCP_AUTOML_LOCATION, - project_id=GCP_PROJECT_ID, - ) - - delete_bucket = GCSDeleteBucketOperator( - task_id="delete_bucket", - bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME, - trigger_rule=TriggerRule.ALL_DONE, - ) - - ( - # TEST SETUP - [create_bucket >> move_dataset_file, create_dataset] - # TEST BODY - >> import_dataset - >> create_model - # TEST TEARDOWN - >> delete_model - >> delete_dataset - >> delete_bucket - ) - - from tests.system.utils.watcher import watcher - - # This test needs watcher in order to properly mark success/failure - # when "tearDown" task with trigger rule is part of the DAG - list(dag.tasks) >> watcher() - -from tests.system.utils import get_test_run # noqa: E402 - -# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) -test_run = get_test_run(dag) diff --git a/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_tracking.py b/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_tracking.py deleted file mode 100644 index be0d07c7c45f9..0000000000000 --- a/tests/system/providers/google/cloud/automl/example_automl_video_intelligence_tracking.py +++ /dev/null @@ -1,155 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -""" -Example Airflow DAG that uses Google AutoML services. -""" -from __future__ import annotations - -import os -from datetime import datetime -from typing import cast - -from airflow.models.dag import DAG -from airflow.models.xcom_arg import XComArg -from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook -from airflow.providers.google.cloud.operators.automl import ( - AutoMLCreateDatasetOperator, - AutoMLDeleteDatasetOperator, - AutoMLDeleteModelOperator, - AutoMLImportDataOperator, - AutoMLTrainModelOperator, -) -from airflow.providers.google.cloud.operators.gcs import ( - GCSCreateBucketOperator, - GCSDeleteBucketOperator, - GCSSynchronizeBucketsOperator, -) -from airflow.utils.trigger_rule import TriggerRule - -ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") -DAG_ID = "example_automl_video_track" -GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") -GCP_AUTOML_LOCATION = "us-central1" -DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-") -RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" - -MODEL_NAME = "video_track_test_model" -MODEL = { - "display_name": MODEL_NAME, - "video_object_tracking_model_metadata": {}, -} - -DATASET_NAME = f"ds_video_track_{ENV_ID}".replace("-", "_") -DATASET = { - "display_name": DATASET_NAME, - "video_object_tracking_dataset_metadata": {}, -} - -AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/video_tracking.csv" -IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}} - - -extract_object_id = CloudAutoMLHook.extract_object_id - - -# Example DAG for AutoML Video Intelligence Object Tracking -with DAG( - DAG_ID, - schedule="@once", - start_date=datetime(2021, 1, 1), - catchup=False, - user_defined_macros={"extract_object_id": extract_object_id}, - tags=["example", "automl", "video-tracking"], -) as dag: - create_bucket = GCSCreateBucketOperator( - task_id="create_bucket", - bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME, - storage_class="REGIONAL", - location=GCP_AUTOML_LOCATION, - ) - - move_dataset_file = GCSSynchronizeBucketsOperator( - task_id="move_dataset_to_bucket", - source_bucket=RESOURCE_DATA_BUCKET, - source_object="automl/datasets/video", - destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME, - destination_object="automl", - recursive=True, - ) - - create_dataset = AutoMLCreateDatasetOperator( - task_id="create_dataset", dataset=DATASET, location=GCP_AUTOML_LOCATION - ) - - dataset_id = cast(str, XComArg(create_dataset, key="dataset_id")) - MODEL["dataset_id"] = dataset_id - - import_dataset = AutoMLImportDataOperator( - task_id="import_dataset", - dataset_id=dataset_id, - location=GCP_AUTOML_LOCATION, - input_config=IMPORT_INPUT_CONFIG, - ) - - MODEL["dataset_id"] = dataset_id - - create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION) - model_id = cast(str, XComArg(create_model, key="model_id")) - - delete_model = AutoMLDeleteModelOperator( - task_id="delete_model", - model_id=model_id, - location=GCP_AUTOML_LOCATION, - project_id=GCP_PROJECT_ID, - ) - - delete_dataset = AutoMLDeleteDatasetOperator( - task_id="delete_dataset", - dataset_id=dataset_id, - location=GCP_AUTOML_LOCATION, - project_id=GCP_PROJECT_ID, - ) - - delete_bucket = GCSDeleteBucketOperator( - task_id="delete_bucket", - bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME, - trigger_rule=TriggerRule.ALL_DONE, - ) - - ( - # TEST SETUP - [create_bucket >> move_dataset_file, create_dataset] - # TEST BODY - >> import_dataset - >> create_model - # TEST TEARDOWN - >> delete_model - >> delete_dataset - >> delete_bucket - ) - - from tests.system.utils.watcher import watcher - - # This test needs watcher in order to properly mark success/failure - # when "tearDown" task with trigger rule is part of the DAG - list(dag.tasks) >> watcher() - -from tests.system.utils import get_test_run # noqa: E402 - -# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) -test_run = get_test_run(dag) diff --git a/tests/system/providers/google/cloud/automl/example_automl_video_tracking.py b/tests/system/providers/google/cloud/automl/example_automl_video_tracking.py new file mode 100644 index 0000000000000..46b873ee119e9 --- /dev/null +++ b/tests/system/providers/google/cloud/automl/example_automl_video_tracking.py @@ -0,0 +1,169 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG that uses Google AutoML services. +""" +from __future__ import annotations + +import os +from datetime import datetime + +from google.cloud.aiplatform import schema +from google.protobuf.struct_pb2 import Value + +from airflow.models.dag import DAG +from airflow.providers.google.cloud.operators.gcs import ( + GCSCreateBucketOperator, + GCSDeleteBucketOperator, + GCSSynchronizeBucketsOperator, +) +from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import ( + CreateAutoMLVideoTrainingJobOperator, + DeleteAutoMLTrainingJobOperator, +) +from airflow.providers.google.cloud.operators.vertex_ai.dataset import ( + CreateDatasetOperator, + DeleteDatasetOperator, + ImportDataOperator, +) +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") +DAG_ID = "example_automl_video_track" +REGION = "us-central1" +VIDEO_DISPLAY_NAME = f"auto-ml-video-tracking-{ENV_ID}" +MODEL_DISPLAY_NAME = f"auto-ml-video-tracking-model-{ENV_ID}" + +RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" +VIDEO_GCS_BUCKET_NAME = f"bucket_video_tracking_{ENV_ID}".replace("_", "-") + +VIDEO_DATASET = { + "display_name": f"video-dataset-{ENV_ID}", + "metadata_schema_uri": schema.dataset.metadata.video, + "metadata": Value(string_value="video-dataset"), +} +VIDEO_DATA_CONFIG = [ + { + "import_schema_uri": schema.dataset.ioformat.video.object_tracking, + "gcs_source": {"uris": [f"gs://{VIDEO_GCS_BUCKET_NAME}/automl/tracking.csv"]}, + }, +] + + +# Example DAG for AutoML Video Intelligence Object Tracking +with DAG( + DAG_ID, + schedule="@once", + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["example", "auto_ml", "video", "tracking"], +) as dag: + create_bucket = GCSCreateBucketOperator( + task_id="create_bucket", + bucket_name=VIDEO_GCS_BUCKET_NAME, + storage_class="REGIONAL", + location=REGION, + ) + + move_dataset_file = GCSSynchronizeBucketsOperator( + task_id="move_dataset_to_bucket", + source_bucket=RESOURCE_DATA_BUCKET, + source_object="automl/datasets/video", + destination_bucket=VIDEO_GCS_BUCKET_NAME, + destination_object="automl", + recursive=True, + ) + + create_video_dataset = CreateDatasetOperator( + task_id="video_dataset", + dataset=VIDEO_DATASET, + region=REGION, + project_id=PROJECT_ID, + ) + video_dataset_id = create_video_dataset.output["dataset_id"] + + import_video_dataset = ImportDataOperator( + task_id="import_video_data", + dataset_id=video_dataset_id, + region=REGION, + project_id=PROJECT_ID, + import_configs=VIDEO_DATA_CONFIG, + ) + + # [START howto_cloud_create_video_tracking_training_job_operator] + create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator( + task_id="auto_ml_video_task", + display_name=VIDEO_DISPLAY_NAME, + prediction_type="object_tracking", + model_type="CLOUD", + dataset_id=video_dataset_id, + model_display_name=MODEL_DISPLAY_NAME, + region=REGION, + project_id=PROJECT_ID, + ) + # [END howto_cloud_create_video_tracking_training_job_operator] + + delete_auto_ml_video_training_job = DeleteAutoMLTrainingJobOperator( + task_id="delete_auto_ml_video_training_job", + training_pipeline_id="{{ task_instance.xcom_pull(task_ids='auto_ml_video_task', " + "key='training_id') }}", + region=REGION, + project_id=PROJECT_ID, + trigger_rule=TriggerRule.ALL_DONE, + ) + + delete_video_dataset = DeleteDatasetOperator( + task_id="delete_video_dataset", + dataset_id=video_dataset_id, + region=REGION, + project_id=PROJECT_ID, + trigger_rule=TriggerRule.ALL_DONE, + ) + + delete_bucket = GCSDeleteBucketOperator( + task_id="delete_bucket", + bucket_name=VIDEO_GCS_BUCKET_NAME, + trigger_rule=TriggerRule.ALL_DONE, + ) + + ( + # TEST SETUP + [ + create_bucket >> move_dataset_file, + create_video_dataset, + ] + >> import_video_dataset + # TEST BODY + >> create_auto_ml_video_training_job + # TEST TEARDOWN + >> delete_auto_ml_video_training_job + >> delete_video_dataset + >> delete_bucket + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/google/cloud/automl/example_automl_vision_classification.py b/tests/system/providers/google/cloud/automl/example_automl_vision_classification.py index 39a93f254bac7..e99fc5caf7717 100644 --- a/tests/system/providers/google/cloud/automl/example_automl_vision_classification.py +++ b/tests/system/providers/google/cloud/automl/example_automl_vision_classification.py @@ -22,55 +22,48 @@ import os from datetime import datetime -from typing import cast -from google.cloud import storage # type: ignore[attr-defined] +from google.cloud.aiplatform import schema +from google.protobuf.struct_pb2 import Value -from airflow.decorators import task from airflow.models.dag import DAG -from airflow.models.xcom_arg import XComArg -from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook -from airflow.providers.google.cloud.operators.automl import ( - AutoMLCreateDatasetOperator, - AutoMLDeleteDatasetOperator, - AutoMLDeleteModelOperator, - AutoMLImportDataOperator, - AutoMLTrainModelOperator, -) from airflow.providers.google.cloud.operators.gcs import ( GCSCreateBucketOperator, GCSDeleteBucketOperator, + GCSSynchronizeBucketsOperator, +) +from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import ( + CreateAutoMLImageTrainingJobOperator, + DeleteAutoMLTrainingJobOperator, +) +from airflow.providers.google.cloud.operators.vertex_ai.dataset import ( + CreateDatasetOperator, + DeleteDatasetOperator, + ImportDataOperator, ) -from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator from airflow.utils.trigger_rule import TriggerRule DAG_ID = "example_automl_vision_clss" ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") -GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") -GCP_AUTOML_LOCATION = "us-central1" -DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-") -RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") +REGION = "us-central1" +IMAGE_DISPLAY_NAME = f"automl-vision-clss-{ENV_ID}" +MODEL_DISPLAY_NAME = f"automl-vision-clss-model-{ENV_ID}" +RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" +IMAGE_GCS_BUCKET_NAME = f"bucket_image_clss_{ENV_ID}".replace("_", "-") -DATASET_NAME = f"ds_vision_clss_{ENV_ID}".replace("-", "_") -DATASET = { - "display_name": DATASET_NAME, - "image_classification_dataset_metadata": {"classification_type": "MULTILABEL"}, -} -AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/vision_classification.csv" -IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}} - -MODEL_NAME = "vision_clss_test_model" -MODEL = { - "display_name": MODEL_NAME, - "image_classification_model_metadata": {"train_budget": 1}, +IMAGE_DATASET = { + "display_name": f"automl-vision-clss-dataset-{ENV_ID}", + "metadata_schema_uri": schema.dataset.metadata.image, + "metadata": Value(string_value="image-dataset"), } - -CSV_FILE_NAME = "vision_classification.csv" -GCS_FILE_PATH = f"automl/datasets/vision/{CSV_FILE_NAME}" -DESTINATION_FILE_PATH = f"/tmp/{CSV_FILE_NAME}" - -extract_object_id = CloudAutoMLHook.extract_object_id +IMAGE_DATA_CONFIG = [ + { + "import_schema_uri": schema.dataset.ioformat.image.single_label_classification, + "gcs_source": {"uris": [f"gs://{IMAGE_GCS_BUCKET_NAME}/automl/image-dataset-classification.csv"]}, + }, +] # Example DAG for AutoML Vision Classification with DAG( @@ -78,107 +71,94 @@ schedule="@once", # Override to match your needs start_date=datetime(2021, 1, 1), catchup=False, - user_defined_macros={"extract_object_id": extract_object_id}, - tags=["example", "automl", "vision-clss"], + tags=["example", "automl", "vision", "classification"], ) as dag: create_bucket = GCSCreateBucketOperator( task_id="create_bucket", - bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME, + bucket_name=IMAGE_GCS_BUCKET_NAME, storage_class="REGIONAL", - location=GCP_AUTOML_LOCATION, + location=REGION, ) - @task - def upload_csv_file_to_gcs(): - # download file to local storage - storage_client = storage.Client() - bucket = storage_client.bucket(RESOURCE_DATA_BUCKET) - blob = bucket.blob(GCS_FILE_PATH) - blob.download_to_filename(DESTINATION_FILE_PATH) - - # update file content - with open(DESTINATION_FILE_PATH) as file: - lines = file.readlines() - - updated_lines = [line.replace("template-bucket", DATA_SAMPLE_GCS_BUCKET_NAME) for line in lines] - - with open(DESTINATION_FILE_PATH, "w") as file: - file.writelines(updated_lines) - - # upload updated file to bucket storage - destination_bucket = storage_client.bucket(DATA_SAMPLE_GCS_BUCKET_NAME) - destination_blob = destination_bucket.blob(f"automl/{CSV_FILE_NAME}") - generation_match_precondition = 0 - destination_blob.upload_from_filename( - DESTINATION_FILE_PATH, if_generation_match=generation_match_precondition - ) - - upload_csv_file_to_gcs_task = upload_csv_file_to_gcs() - - copy_folder_tasks = [ - GCSToGCSOperator( - task_id=f"copy_dataset_folder_{folder}", - source_bucket=RESOURCE_DATA_BUCKET, - source_object=f"automl/datasets/vision/{folder}", - destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME, - destination_object=f"automl/{folder}", - ) - for folder in ("cirrus", "cumulonimbus", "cumulus") - ] - - create_dataset_task = AutoMLCreateDatasetOperator( - task_id="create_dataset_task", - dataset=DATASET, - location=GCP_AUTOML_LOCATION, + move_dataset_file = GCSSynchronizeBucketsOperator( + task_id="move_dataset_to_bucket", + source_bucket=RESOURCE_DATA_BUCKET, + source_object="automl/datasets/vision", + destination_bucket=IMAGE_GCS_BUCKET_NAME, + destination_object="automl", + recursive=True, ) - dataset_id = cast(str, XComArg(create_dataset_task, key="dataset_id")) - import_dataset_task = AutoMLImportDataOperator( - task_id="import_dataset_task", - dataset_id=dataset_id, - location=GCP_AUTOML_LOCATION, - input_config=IMPORT_INPUT_CONFIG, + create_image_dataset = CreateDatasetOperator( + task_id="image_dataset", + dataset=IMAGE_DATASET, + region=REGION, + project_id=PROJECT_ID, + ) + image_dataset_id = create_image_dataset.output["dataset_id"] + + import_image_dataset = ImportDataOperator( + task_id="import_image_data", + dataset_id=image_dataset_id, + region=REGION, + project_id=PROJECT_ID, + import_configs=IMAGE_DATA_CONFIG, ) - MODEL["dataset_id"] = dataset_id - - create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION) - model_id = cast(str, XComArg(create_model, key="model_id")) - - delete_model = AutoMLDeleteModelOperator( - task_id="delete_model", - model_id=model_id, - location=GCP_AUTOML_LOCATION, - project_id=GCP_PROJECT_ID, + # [START howto_cloud_create_image_classification_training_job_operator] + create_auto_ml_image_training_job = CreateAutoMLImageTrainingJobOperator( + task_id="auto_ml_image_task", + display_name=IMAGE_DISPLAY_NAME, + dataset_id=image_dataset_id, + prediction_type="classification", + multi_label=False, + model_type="CLOUD", + training_fraction_split=0.6, + validation_fraction_split=0.2, + test_fraction_split=0.2, + budget_milli_node_hours=8000, + model_display_name=MODEL_DISPLAY_NAME, + disable_early_stopping=False, + region=REGION, + project_id=PROJECT_ID, + ) + # [END howto_cloud_create_image_classification_training_job_operator] + + delete_auto_ml_image_training_job = DeleteAutoMLTrainingJobOperator( + task_id="delete_auto_ml_training_job", + training_pipeline_id="{{ task_instance.xcom_pull(task_ids='auto_ml_image_task', " + "key='training_id') }}", + region=REGION, + project_id=PROJECT_ID, trigger_rule=TriggerRule.ALL_DONE, ) - delete_dataset = AutoMLDeleteDatasetOperator( - task_id="delete_dataset", - dataset_id=dataset_id, - location=GCP_AUTOML_LOCATION, - project_id=GCP_PROJECT_ID, + delete_image_dataset = DeleteDatasetOperator( + task_id="delete_image_dataset", + dataset_id=image_dataset_id, + region=REGION, + project_id=PROJECT_ID, trigger_rule=TriggerRule.ALL_DONE, ) delete_bucket = GCSDeleteBucketOperator( task_id="delete_bucket", - bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME, + bucket_name=IMAGE_GCS_BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE, ) ( # TEST SETUP - create_bucket - >> upload_csv_file_to_gcs_task - >> copy_folder_tasks + [ + create_bucket >> move_dataset_file, + create_image_dataset, + ] + >> import_image_dataset # TEST BODY - >> create_dataset_task - >> import_dataset_task - >> create_model + >> create_auto_ml_image_training_job # TEST TEARDOWN - >> delete_model - >> delete_dataset + >> delete_auto_ml_image_training_job + >> delete_image_dataset >> delete_bucket ) diff --git a/tests/system/providers/google/cloud/automl/example_automl_vision_object_detection.py b/tests/system/providers/google/cloud/automl/example_automl_vision_object_detection.py index e9a49c4b2403b..6f690a2311c0c 100644 --- a/tests/system/providers/google/cloud/automl/example_automl_vision_object_detection.py +++ b/tests/system/providers/google/cloud/automl/example_automl_vision_object_detection.py @@ -22,48 +22,48 @@ import os from datetime import datetime -from typing import cast + +from google.cloud.aiplatform import schema +from google.protobuf.struct_pb2 import Value from airflow.models.dag import DAG -from airflow.models.xcom_arg import XComArg -from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook -from airflow.providers.google.cloud.operators.automl import ( - AutoMLCreateDatasetOperator, - AutoMLDeleteDatasetOperator, - AutoMLDeleteModelOperator, - AutoMLDeployModelOperator, - AutoMLImportDataOperator, - AutoMLTrainModelOperator, -) from airflow.providers.google.cloud.operators.gcs import ( GCSCreateBucketOperator, GCSDeleteBucketOperator, GCSSynchronizeBucketsOperator, ) +from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import ( + CreateAutoMLImageTrainingJobOperator, + DeleteAutoMLTrainingJobOperator, +) +from airflow.providers.google.cloud.operators.vertex_ai.dataset import ( + CreateDatasetOperator, + DeleteDatasetOperator, + ImportDataOperator, +) from airflow.utils.trigger_rule import TriggerRule ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") DAG_ID = "example_automl_vision_obj_detect" -GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") -GCP_AUTOML_LOCATION = "us-central1" -DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-") -RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" +REGION = "us-central1" +IMAGE_DISPLAY_NAME = f"automl-vision-detect-{ENV_ID}" +MODEL_DISPLAY_NAME = f"automl-vision-detect-model-{ENV_ID}" -MODEL_NAME = "vision_detect_test_model" -MODEL = { - "display_name": MODEL_NAME, - "image_object_detection_model_metadata": {}, -} +RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" +IMAGE_GCS_BUCKET_NAME = f"bucket_image_detect_{ENV_ID}".replace("_", "-") -DATASET_NAME = f"ds_vision_detect_{ENV_ID}".replace("-", "_") -DATASET = { - "display_name": DATASET_NAME, - "image_object_detection_dataset_metadata": {}, +IMAGE_DATASET = { + "display_name": f"image-detect-dataset-{ENV_ID}", + "metadata_schema_uri": schema.dataset.metadata.image, + "metadata": Value(string_value="image-dataset"), } - -AUTOML_DATASET_BUCKET = f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/automl/object_detection.csv" -IMPORT_INPUT_CONFIG = {"gcs_source": {"input_uris": [AUTOML_DATASET_BUCKET]}} -extract_object_id = CloudAutoMLHook.extract_object_id +IMAGE_DATA_CONFIG = [ + { + "import_schema_uri": schema.dataset.ioformat.image.bounding_box, + "gcs_source": {"uris": [f"gs://{IMAGE_GCS_BUCKET_NAME}/automl/object_detection.csv"]}, + }, +] # Example DAG for AutoML Vision Object Detection @@ -72,81 +72,94 @@ schedule="@once", # Override to match your needs start_date=datetime(2021, 1, 1), catchup=False, - user_defined_macros={"extract_object_id": extract_object_id}, - tags=["example", "automl", "object-detection"], + tags=["example", "automl", "vision", "object-detection"], ) as dag: create_bucket = GCSCreateBucketOperator( task_id="create_bucket", - bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME, + bucket_name=IMAGE_GCS_BUCKET_NAME, storage_class="REGIONAL", - location=GCP_AUTOML_LOCATION, + location=REGION, ) move_dataset_file = GCSSynchronizeBucketsOperator( - task_id="move_data_to_bucket", + task_id="move_dataset_to_bucket", source_bucket=RESOURCE_DATA_BUCKET, source_object="automl/datasets/vision", - destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME, + destination_bucket=IMAGE_GCS_BUCKET_NAME, destination_object="automl", recursive=True, ) - create_dataset = AutoMLCreateDatasetOperator( - task_id="create_dataset_task", dataset=DATASET, location=GCP_AUTOML_LOCATION + create_image_dataset = CreateDatasetOperator( + task_id="image_dataset", + dataset=IMAGE_DATASET, + region=REGION, + project_id=PROJECT_ID, ) - - dataset_id = cast(str, XComArg(create_dataset, key="dataset_id")) - MODEL["dataset_id"] = dataset_id - - import_dataset = AutoMLImportDataOperator( - task_id="import_dataset", - dataset_id=dataset_id, - location=GCP_AUTOML_LOCATION, - input_config=IMPORT_INPUT_CONFIG, + image_dataset_id = create_image_dataset.output["dataset_id"] + + import_image_dataset = ImportDataOperator( + task_id="import_image_data", + dataset_id=image_dataset_id, + region=REGION, + project_id=PROJECT_ID, + import_configs=IMAGE_DATA_CONFIG, ) - MODEL["dataset_id"] = dataset_id - - create_model = AutoMLTrainModelOperator(task_id="create_model", model=MODEL, location=GCP_AUTOML_LOCATION) - model_id = cast(str, XComArg(create_model, key="model_id")) - - deploy_model = AutoMLDeployModelOperator( - task_id="deploy_model", - model_id=model_id, - location=GCP_AUTOML_LOCATION, - project_id=GCP_PROJECT_ID, + # [START howto_cloud_create_image_object_detection_training_job_operator] + create_auto_ml_image_training_job = CreateAutoMLImageTrainingJobOperator( + task_id="auto_ml_image_task", + display_name=IMAGE_DISPLAY_NAME, + dataset_id=image_dataset_id, + prediction_type="object_detection", + multi_label=False, + model_type="CLOUD", + training_fraction_split=0.6, + validation_fraction_split=0.2, + test_fraction_split=0.2, + budget_milli_node_hours=20000, + model_display_name=MODEL_DISPLAY_NAME, + disable_early_stopping=False, + region=REGION, + project_id=PROJECT_ID, ) - - delete_model = AutoMLDeleteModelOperator( - task_id="delete_model", - model_id=model_id, - location=GCP_AUTOML_LOCATION, - project_id=GCP_PROJECT_ID, + # [END howto_cloud_create_image_object_detection_training_job_operator] + + delete_auto_ml_image_training_job = DeleteAutoMLTrainingJobOperator( + task_id="delete_auto_ml_training_job", + training_pipeline_id="{{ task_instance.xcom_pull(task_ids='auto_ml_image_task', " + "key='training_id') }}", + region=REGION, + project_id=PROJECT_ID, + trigger_rule=TriggerRule.ALL_DONE, ) - delete_dataset = AutoMLDeleteDatasetOperator( - task_id="delete_dataset", - dataset_id=dataset_id, - location=GCP_AUTOML_LOCATION, - project_id=GCP_PROJECT_ID, + delete_image_dataset = DeleteDatasetOperator( + task_id="delete_image_dataset", + dataset_id=image_dataset_id, + region=REGION, + project_id=PROJECT_ID, + trigger_rule=TriggerRule.ALL_DONE, ) delete_bucket = GCSDeleteBucketOperator( task_id="delete_bucket", - bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME, + bucket_name=IMAGE_GCS_BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE, ) ( # TEST SETUP - [create_bucket >> move_dataset_file, create_dataset] + [ + create_bucket >> move_dataset_file, + create_image_dataset, + ] + >> import_image_dataset # TEST BODY - >> import_dataset - >> create_model - >> deploy_model + >> create_auto_ml_image_training_job # TEST TEARDOWN - >> delete_model - >> delete_dataset + >> delete_auto_ml_image_training_job + >> delete_image_dataset >> delete_bucket )