Skip to content

Commit

Permalink
Deprecate AutoMLTrainModelOperator for Vision and Video (#36473)
Browse files Browse the repository at this point in the history
Co-authored-by: Ulada Zakharava <Vlada_Zakharava@epam.com>
  • Loading branch information
VladaZakharova and Ulada Zakharava committed Jan 17, 2024
1 parent 7a814f1 commit 437d4e4
Show file tree
Hide file tree
Showing 11 changed files with 562 additions and 525 deletions.
20 changes: 9 additions & 11 deletions airflow/providers/google/cloud/operators/automl.py
Expand Up @@ -118,19 +118,17 @@ def __init__(
self.impersonation_chain = impersonation_chain

def execute(self, context: Context):
# Output warning if running AutoML Natural Language prediction job
automl_nl_model_keys = [
"text_classification_model_metadata",
"text_extraction_model_metadata",
"text_sentiment_dataset_metadata",
]
if any(key in automl_nl_model_keys for key in self.model):
# Output warning if running not AutoML Translation prediction job
if "translation_model_metadata" not in self.model:
warnings.warn(
"AutoMLTrainModelOperator for text prediction is deprecated. All the functionality of legacy "
"AutoML Natural Language and new features are available on the Vertex AI platform. "
"Please use `CreateAutoMLTextTrainingJobOperator`",
"AutoMLTrainModelOperator for text, image and video prediction is deprecated. "
"All the functionality of legacy "
"AutoML Natural Language, Vision and Video Intelligence and new features are available "
"on the Vertex AI platform. "
"Please use `CreateAutoMLTextTrainingJobOperator`, `CreateAutoMLImageTrainingJobOperator` or"
" `CreateAutoMLVideoTrainingJobOperator` from VertexAI.",
AirflowProviderDeprecationWarning,
stacklevel=2,
stacklevel=3,
)
hook = CloudAutoMLHook(
gcp_conn_id=self.gcp_conn_id,
Expand Down
37 changes: 32 additions & 5 deletions docs/apache-airflow-providers-google/operators/cloud/automl.rst
Expand Up @@ -102,11 +102,38 @@ To create a Google AutoML model you can use
The operator will wait for the operation to complete. Additionally the operator
returns the id of model in :ref:`XCom <concepts:xcom>` under ``model_id`` key.

This Operator is deprecated when running for text prediction and will be removed soon.
All the functionality of legacy AutoML Natural Language and new features are available on the
Vertex AI platform. Please use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.auto_ml.CreateAutoMLTextTrainingJobOperator`.
When running Vertex AI Operator for training dat, please ensure that your data is correctly stored in Vertex AI
This Operator is deprecated when running for text, video and vision prediction and will be removed soon.
All the functionality of legacy AutoML Natural Language, Vision, Video Intelligence and new features are
available on the Vertex AI platform. Please use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.auto_ml.CreateAutoMLTextTrainingJobOperator`,
:class:`~airflow.providers.google.cloud.operators.vertex_ai.auto_ml.CreateAutoMLImageTrainingJobOperator` or
:class:`~airflow.providers.google.cloud.operators.vertex_ai.auto_ml.CreateAutoMLVideoTrainingJobOperator`.

You can find example on how to use VertexAI operators for AutoML Natural Language classification here:

.. exampleinclude:: /../../tests/system/providers/google/cloud/automl/example_automl_nl_text_classification.py
:language: python
:dedent: 4
:start-after: [START howto_cloud_create_text_classification_training_job_operator]
:end-before: [END howto_cloud_create_text_classification_training_job_operator]

Additionally, you can find example on how to use VertexAI operators for AutoML Vision classification here:

.. exampleinclude:: /../../tests/system/providers/google/cloud/automl/example_automl_vision_classification.py
:language: python
:dedent: 4
:start-after: [START howto_cloud_create_image_classification_training_job_operator]
:end-before: [END howto_cloud_create_image_classification_training_job_operator]

Example on how to use VertexAI operators for AutoML Video Intelligence classification you can find here:

.. exampleinclude:: /../../tests/system/providers/google/cloud/automl/example_automl_video_classification.py
:language: python
:dedent: 4
:start-after: [START howto_cloud_create_video_classification_training_job_operator]
:end-before: [END howto_cloud_create_video_classification_training_job_operator]

When running Vertex AI Operator for training data, please ensure that your data is correctly stored in Vertex AI
datasets. To create and import data to the dataset please use
:class:`~airflow.providers.google.cloud.operators.vertex_ai.dataset.CreateDatasetOperator`
and
Expand Down
Expand Up @@ -22,13 +22,11 @@

import os
from datetime import datetime
from typing import cast

from google.cloud.aiplatform import schema
from google.protobuf.struct_pb2 import Value

from airflow.models.dag import DAG
from airflow.models.xcom_arg import XComArg
from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook
from airflow.providers.google.cloud.operators.gcs import (
GCSCreateBucketOperator,
Expand Down Expand Up @@ -114,7 +112,7 @@
import_configs=DATA_CONFIG,
)

# [START howto_operator_automl_create_model]
# [START howto_cloud_create_text_classification_training_job_operator]
create_clss_training_job = CreateAutoMLTextTrainingJobOperator(
task_id="create_clss_training_job",
display_name=TEXT_CLSS_DISPLAY_NAME,
Expand All @@ -129,8 +127,7 @@
region=GCP_AUTOML_LOCATION,
project_id=GCP_PROJECT_ID,
)
# [END howto_operator_automl_create_model]
model_id = cast(str, XComArg(create_clss_training_job, key="model_id"))
# [END howto_cloud_create_text_classification_training_job_operator]

delete_clss_training_job = DeleteAutoMLTrainingJobOperator(
task_id="delete_clss_training_job",
Expand Down
Expand Up @@ -22,13 +22,11 @@

import os
from datetime import datetime
from typing import cast

from google.cloud.aiplatform import schema
from google.protobuf.struct_pb2 import Value

from airflow.models.dag import DAG
from airflow.models.xcom_arg import XComArg
from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook
from airflow.providers.google.cloud.operators.gcs import (
GCSCreateBucketOperator,
Expand Down Expand Up @@ -114,7 +112,7 @@
import_configs=DATA_CONFIG,
)

# [START howto_operator_automl_create_model]
# [START howto_cloud_create_text_extraction_training_job_operator]
create_extr_training_job = CreateAutoMLTextTrainingJobOperator(
task_id="create_extr_training_job",
display_name=TEXT_EXTR_DISPLAY_NAME,
Expand All @@ -129,8 +127,7 @@
region=GCP_AUTOML_LOCATION,
project_id=GCP_PROJECT_ID,
)
# [END howto_operator_automl_create_model]
model_id = cast(str, XComArg(create_extr_training_job, key="model_id"))
# [END howto_cloud_create_text_extraction_training_job_operator]

delete_extr_training_job = DeleteAutoMLTrainingJobOperator(
task_id="delete_extr_training_job",
Expand Down
Expand Up @@ -22,13 +22,11 @@

import os
from datetime import datetime
from typing import cast

from google.cloud.aiplatform import schema
from google.protobuf.struct_pb2 import Value

from airflow.models.dag import DAG
from airflow.models.xcom_arg import XComArg
from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook
from airflow.providers.google.cloud.operators.gcs import (
GCSCreateBucketOperator,
Expand Down Expand Up @@ -115,7 +113,7 @@
import_configs=DATA_CONFIG,
)

# [START howto_operator_automl_create_model]
# [START howto_cloud_create_text_sentiment_training_job_operator]
create_sent_training_job = CreateAutoMLTextTrainingJobOperator(
task_id="create_sent_training_job",
display_name=TEXT_SENT_DISPLAY_NAME,
Expand All @@ -131,8 +129,7 @@
region=GCP_AUTOML_LOCATION,
project_id=GCP_PROJECT_ID,
)
# [END howto_operator_automl_create_model]
model_id = cast(str, XComArg(create_sent_training_job, key="model_id"))
# [END howto_cloud_create_text_sentiment_training_job_operator]

delete_sent_training_job = DeleteAutoMLTrainingJobOperator(
task_id="delete_sent_training_job",
Expand Down
@@ -0,0 +1,169 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG that uses Google AutoML services.
"""
from __future__ import annotations

import os
from datetime import datetime

from google.cloud.aiplatform import schema
from google.protobuf.struct_pb2 import Value

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.gcs import (
GCSCreateBucketOperator,
GCSDeleteBucketOperator,
GCSSynchronizeBucketsOperator,
)
from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
CreateAutoMLVideoTrainingJobOperator,
DeleteAutoMLTrainingJobOperator,
)
from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
CreateDatasetOperator,
DeleteDatasetOperator,
ImportDataOperator,
)
from airflow.utils.trigger_rule import TriggerRule

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
DAG_ID = "example_automl_video_clss"
REGION = "us-central1"
VIDEO_DISPLAY_NAME = f"auto-ml-video-clss-{ENV_ID}"
MODEL_DISPLAY_NAME = f"auto-ml-video-clss-model-{ENV_ID}"

RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
VIDEO_GCS_BUCKET_NAME = f"bucket_video_clss_{ENV_ID}".replace("_", "-")

VIDEO_DATASET = {
"display_name": f"video-dataset-{ENV_ID}",
"metadata_schema_uri": schema.dataset.metadata.video,
"metadata": Value(string_value="video-dataset"),
}
VIDEO_DATA_CONFIG = [
{
"import_schema_uri": schema.dataset.ioformat.video.classification,
"gcs_source": {"uris": [f"gs://{VIDEO_GCS_BUCKET_NAME}/automl/classification.csv"]},
},
]


# Example DAG for AutoML Video Intelligence Classification
with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "automl", "video", "classification"],
) as dag:
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket",
bucket_name=VIDEO_GCS_BUCKET_NAME,
storage_class="REGIONAL",
location=REGION,
)

move_dataset_file = GCSSynchronizeBucketsOperator(
task_id="move_dataset_to_bucket",
source_bucket=RESOURCE_DATA_BUCKET,
source_object="automl/datasets/video",
destination_bucket=VIDEO_GCS_BUCKET_NAME,
destination_object="automl",
recursive=True,
)

create_video_dataset = CreateDatasetOperator(
task_id="video_dataset",
dataset=VIDEO_DATASET,
region=REGION,
project_id=PROJECT_ID,
)
video_dataset_id = create_video_dataset.output["dataset_id"]

import_video_dataset = ImportDataOperator(
task_id="import_video_data",
dataset_id=video_dataset_id,
region=REGION,
project_id=PROJECT_ID,
import_configs=VIDEO_DATA_CONFIG,
)

# [START howto_cloud_create_video_classification_training_job_operator]
create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator(
task_id="auto_ml_video_task",
display_name=VIDEO_DISPLAY_NAME,
prediction_type="classification",
model_type="CLOUD",
dataset_id=video_dataset_id,
model_display_name=MODEL_DISPLAY_NAME,
region=REGION,
project_id=PROJECT_ID,
)
# [END howto_cloud_create_video_classification_training_job_operator]

delete_auto_ml_video_training_job = DeleteAutoMLTrainingJobOperator(
task_id="delete_auto_ml_video_training_job",
training_pipeline_id="{{ task_instance.xcom_pull(task_ids='auto_ml_video_task', "
"key='training_id') }}",
region=REGION,
project_id=PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)

delete_video_dataset = DeleteDatasetOperator(
task_id="delete_video_dataset",
dataset_id=video_dataset_id,
region=REGION,
project_id=PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)

delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket",
bucket_name=VIDEO_GCS_BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE,
)

(
# TEST SETUP
[
create_bucket >> move_dataset_file,
create_video_dataset,
]
>> import_video_dataset
# TEST BODY
>> create_auto_ml_video_training_job
# TEST TEARDOWN
>> delete_auto_ml_video_training_job
>> delete_video_dataset
>> delete_bucket
)

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()

from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)

0 comments on commit 437d4e4

Please sign in to comment.