From 58d61826a3f47a071c1f0ed4d5b8a5bd01131acb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Hirsz?= Date: Thu, 27 Oct 2022 10:17:09 +0200 Subject: [PATCH] Migrate Dataproc Metastore system tests according to AIP-47 (#26858) --- .../google/cloud/hooks/dataproc_metastore.py | 2 +- .../google/cloud/operators/dataproc.py | 2 +- .../cloud/operators/dataproc_metastore.py | 2 +- .../operators/cloud/dataproc_metastore.rst | 28 +-- .../test_dataproc_metastore_system.py | 42 ----- .../cloud/dataproc_metastore/__init__.py | 16 ++ .../example_dataproc_metastore.py | 165 ++++++++---------- .../example_dataproc_metastore_backup.py | 136 +++++++++++++++ 8 files changed, 239 insertions(+), 154 deletions(-) delete mode 100644 tests/providers/google/cloud/operators/test_dataproc_metastore_system.py create mode 100644 tests/system/providers/google/cloud/dataproc_metastore/__init__.py rename {airflow/providers/google/cloud/example_dags => tests/system/providers/google/cloud/dataproc_metastore}/example_dataproc_metastore.py (58%) create mode 100644 tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py diff --git a/airflow/providers/google/cloud/hooks/dataproc_metastore.py b/airflow/providers/google/cloud/hooks/dataproc_metastore.py index 669ad12cd76ab..c7dcebee9fcab 100644 --- a/airflow/providers/google/cloud/hooks/dataproc_metastore.py +++ b/airflow/providers/google/cloud/hooks/dataproc_metastore.py @@ -116,7 +116,7 @@ def create_metadata_import( project_id: str, region: str, service_id: str, - metadata_import: MetadataImport, + metadata_import: dict | MetadataImport, metadata_import_id: str, request_id: str | None = None, retry: Retry | _MethodDefault = DEFAULT, diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py index b42d1245de2e8..924287766911e 100644 --- a/airflow/providers/google/cloud/operators/dataproc.py +++ b/airflow/providers/google/cloud/operators/dataproc.py @@ -2128,7 +2128,7 @@ def execute(self, context: Context): metadata=self.metadata, ) - # The existing batch may be a in a number of states other than 'SUCCEEDED' + # The existing batch may be a number of states other than 'SUCCEEDED' if result.state != Batch.State.SUCCEEDED: if result.state == Batch.State.FAILED or result.state == Batch.State.CANCELLED: raise AirflowException( diff --git a/airflow/providers/google/cloud/operators/dataproc_metastore.py b/airflow/providers/google/cloud/operators/dataproc_metastore.py index 6e6e9fcfe3f91..e0585805cc4bd 100644 --- a/airflow/providers/google/cloud/operators/dataproc_metastore.py +++ b/airflow/providers/google/cloud/operators/dataproc_metastore.py @@ -311,7 +311,7 @@ def __init__( project_id: str, region: str, service_id: str, - metadata_import: MetadataImport, + metadata_import: dict | MetadataImport, metadata_import_id: str, request_id: str | None = None, retry: Retry | _MethodDefault = DEFAULT, diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataproc_metastore.rst b/docs/apache-airflow-providers-google/operators/cloud/dataproc_metastore.rst index c7ff5305c96b6..564ddac527a03 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/dataproc_metastore.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/dataproc_metastore.rst @@ -33,7 +33,7 @@ For more information about the available fields to pass when creating a service, A simple service configuration can look as followed: -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py :language: python :dedent: 0 :start-after: [START how_to_cloud_dataproc_metastore_create_service] @@ -42,7 +42,7 @@ A simple service configuration can look as followed: With this configuration we can create the service: :class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreCreateServiceOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py :language: python :dedent: 4 :start-after: [START how_to_cloud_dataproc_metastore_create_service_operator] @@ -55,7 +55,7 @@ To get a service you can use: :class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreGetServiceOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py :language: python :dedent: 4 :start-after: [START how_to_cloud_dataproc_metastore_get_service_operator] @@ -69,7 +69,7 @@ For more information on updateMask and other parameters take a look at `Dataproc An example of a new service config and the updateMask: -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py :language: python :dedent: 0 :start-after: [START how_to_cloud_dataproc_metastore_update_service] @@ -78,7 +78,7 @@ An example of a new service config and the updateMask: To update a service you can use: :class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreUpdateServiceOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py :language: python :dedent: 4 :start-after: [START how_to_cloud_dataproc_metastore_update_service_operator] @@ -91,7 +91,7 @@ To delete a service you can use: :class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreDeleteServiceOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py :language: python :dedent: 4 :start-after: [START how_to_cloud_dataproc_metastore_delete_service_operator] @@ -104,7 +104,7 @@ To export metadata you can use: :class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreExportMetadataOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py :language: python :dedent: 4 :start-after: [START how_to_cloud_dataproc_metastore_export_metadata_operator] @@ -117,7 +117,7 @@ To restore a service you can use: :class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreRestoreServiceOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py :language: python :dedent: 4 :start-after: [START how_to_cloud_dataproc_metastore_restore_service_operator] @@ -131,7 +131,7 @@ For more information about the available fields to pass when creating a metadata A simple metadata import configuration can look as followed: -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py :language: python :dedent: 0 :start-after: [START how_to_cloud_dataproc_metastore_create_metadata_import] @@ -140,7 +140,7 @@ A simple metadata import configuration can look as followed: To create a metadata import you can use: :class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreCreateMetadataImportOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py :language: python :dedent: 4 :start-after: [START how_to_cloud_dataproc_metastore_create_metadata_import_operator] @@ -154,7 +154,7 @@ For more information about the available fields to pass when creating a backup, A simple backup configuration can look as followed: -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py :language: python :dedent: 0 :start-after: [START how_to_cloud_dataproc_metastore_create_backup] @@ -163,7 +163,7 @@ A simple backup configuration can look as followed: With this configuration we can create the backup: :class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreCreateBackupOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py :language: python :dedent: 4 :start-after: [START how_to_cloud_dataproc_metastore_create_backup_operator] @@ -176,7 +176,7 @@ To delete a backup you can use: :class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreDeleteBackupOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py :language: python :dedent: 4 :start-after: [START how_to_cloud_dataproc_metastore_delete_backup_operator] @@ -189,7 +189,7 @@ To list backups you can use: :class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreListBackupsOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py :language: python :dedent: 4 :start-after: [START how_to_cloud_dataproc_metastore_list_backups_operator] diff --git a/tests/providers/google/cloud/operators/test_dataproc_metastore_system.py b/tests/providers/google/cloud/operators/test_dataproc_metastore_system.py deleted file mode 100644 index 2db8f62223d64..0000000000000 --- a/tests/providers/google/cloud/operators/test_dataproc_metastore_system.py +++ /dev/null @@ -1,42 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import pytest - -from airflow.providers.google.cloud.example_dags.example_dataproc_metastore import BUCKET -from tests.providers.google.cloud.utils.gcp_authenticator import GCP_DATAPROC_KEY -from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context - - -@pytest.mark.backend("mysql", "postgres") -@pytest.mark.credential_file(GCP_DATAPROC_KEY) -class DataprocMetastoreExampleDagsTest(GoogleSystemTest): - @provide_gcp_context(GCP_DATAPROC_KEY) - def setUp(self): - super().setUp() - self.create_gcs_bucket(BUCKET) - - @provide_gcp_context(GCP_DATAPROC_KEY) - def tearDown(self): - self.delete_gcs_bucket(BUCKET) - super().tearDown() - - @provide_gcp_context(GCP_DATAPROC_KEY) - def test_run_example_dag(self): - self.run_dag(dag_id="example_gcp_dataproc_metastore", dag_folder=CLOUD_DAG_FOLDER) diff --git a/tests/system/providers/google/cloud/dataproc_metastore/__init__.py b/tests/system/providers/google/cloud/dataproc_metastore/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/system/providers/google/cloud/dataproc_metastore/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py similarity index 58% rename from airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py rename to tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py index c1208510a2c7e..1b6341a73a7c9 100644 --- a/airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py +++ b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py @@ -23,36 +23,39 @@ import datetime import os +from pathlib import Path -from google.cloud.metastore_v1 import MetadataImport from google.protobuf.field_mask_pb2 import FieldMask from airflow import models -from airflow.models.baseoperator import chain from airflow.providers.google.cloud.operators.dataproc_metastore import ( - DataprocMetastoreCreateBackupOperator, DataprocMetastoreCreateMetadataImportOperator, DataprocMetastoreCreateServiceOperator, - DataprocMetastoreDeleteBackupOperator, DataprocMetastoreDeleteServiceOperator, DataprocMetastoreExportMetadataOperator, DataprocMetastoreGetServiceOperator, - DataprocMetastoreListBackupsOperator, - DataprocMetastoreRestoreServiceOperator, DataprocMetastoreUpdateServiceOperator, ) +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator +from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator +from airflow.utils.trigger_rule import TriggerRule -PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "") -SERVICE_ID = os.environ.get("GCP_DATAPROC_METASTORE_SERVICE_ID", "dataproc-metastore-system-tests-service-1") -BACKUP_ID = os.environ.get("GCP_DATAPROC_METASTORE_BACKUP_ID", "dataproc-metastore-system-tests-backup-1") -REGION = os.environ.get("GCP_REGION", "") -BUCKET = os.environ.get("GCP_DATAPROC_METASTORE_BUCKET", "INVALID BUCKET NAME") -METADATA_IMPORT_FILE = os.environ.get("GCS_METADATA_IMPORT_FILE", None) -GCS_URI = os.environ.get("GCS_URI", f"gs://{BUCKET}/data/hive.sql") -METADATA_IMPORT_ID = "dataproc-metastore-system-tests-metadata-import-1" -TIMEOUT = 1200 +DAG_ID = "dataproc_metastore" +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "") +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") + +SERVICE_ID = f"{DAG_ID}-service-{ENV_ID}".replace("_", "-") +METADATA_IMPORT_ID = f"{DAG_ID}-metadata-{ENV_ID}".replace("_", "-") + +REGION = "europe-west1" +BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" +TIMEOUT = 2400 DB_TYPE = "MYSQL" -DESTINATION_GCS_FOLDER = f"gs://{BUCKET}/>" +DESTINATION_GCS_FOLDER = f"gs://{BUCKET_NAME}/>" + +HIVE_FILE_SRC = str(Path(__file__).parent.parent / "dataproc" / "resources" / "hive.sql") +HIVE_FILE = "data/hive.sql" +GCS_URI = f"gs://{BUCKET_NAME}/data/hive.sql" # Service definition # Docs: https://cloud.google.com/dataproc-metastore/docs/reference/rest/v1/projects.locations.services#Service @@ -62,6 +65,16 @@ } # [END how_to_cloud_dataproc_metastore_create_service] +# [START how_to_cloud_dataproc_metastore_create_metadata_import] +METADATA_IMPORT = { + "name": "test-metadata-import", + "database_dump": { + "gcs_uri": GCS_URI, + "database_type": DB_TYPE, + }, +} +# [END how_to_cloud_dataproc_metastore_create_metadata_import] + # Update service # [START how_to_cloud_dataproc_metastore_update_service] SERVICE_TO_UPDATE = { @@ -73,31 +86,24 @@ UPDATE_MASK = FieldMask(paths=["labels"]) # [END how_to_cloud_dataproc_metastore_update_service] -# Backup definition -# [START how_to_cloud_dataproc_metastore_create_backup] -BACKUP = { - "name": "test-backup", -} -# [END how_to_cloud_dataproc_metastore_create_backup] - -# Metadata import definition -# [START how_to_cloud_dataproc_metastore_create_metadata_import] -METADATA_IMPORT = MetadataImport( - { - "name": "test-metadata-import", - "database_dump": { - "gcs_uri": GCS_URI, - "database_type": DB_TYPE, - }, - } -) -# [END how_to_cloud_dataproc_metastore_create_metadata_import] - - with models.DAG( - dag_id="example_gcp_dataproc_metastore", + DAG_ID, start_date=datetime.datetime(2021, 1, 1), + schedule="@once", + catchup=False, + tags=["example", "dataproc", "metastore"], ) as dag: + create_bucket = GCSCreateBucketOperator( + task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID + ) + + upload_file = LocalFilesystemToGCSOperator( + task_id="upload_file", + src=HIVE_FILE_SRC, + dst=HIVE_FILE, + bucket=BUCKET_NAME, + ) + # [START how_to_cloud_dataproc_metastore_create_service_operator] create_service = DataprocMetastoreCreateServiceOperator( task_id="create_service", @@ -153,52 +159,6 @@ ) # [END how_to_cloud_dataproc_metastore_export_metadata_operator] - # [START how_to_cloud_dataproc_metastore_create_backup_operator] - backup_service = DataprocMetastoreCreateBackupOperator( - task_id="create_backup", - project_id=PROJECT_ID, - region=REGION, - service_id=SERVICE_ID, - backup=BACKUP, - backup_id=BACKUP_ID, - timeout=TIMEOUT, - ) - # [END how_to_cloud_dataproc_metastore_create_backup_operator] - - # [START how_to_cloud_dataproc_metastore_list_backups_operator] - list_backups = DataprocMetastoreListBackupsOperator( - task_id="list_backups", - project_id=PROJECT_ID, - region=REGION, - service_id=SERVICE_ID, - ) - # [END how_to_cloud_dataproc_metastore_list_backups_operator] - - # [START how_to_cloud_dataproc_metastore_delete_backup_operator] - delete_backup = DataprocMetastoreDeleteBackupOperator( - task_id="delete_backup", - project_id=PROJECT_ID, - region=REGION, - service_id=SERVICE_ID, - backup_id=BACKUP_ID, - timeout=TIMEOUT, - ) - # [END how_to_cloud_dataproc_metastore_delete_backup_operator] - - # [START how_to_cloud_dataproc_metastore_restore_service_operator] - restore_service = DataprocMetastoreRestoreServiceOperator( - task_id="restore_metastore", - region=REGION, - project_id=PROJECT_ID, - service_id=SERVICE_ID, - backup_id=BACKUP_ID, - backup_region=REGION, - backup_project_id=PROJECT_ID, - backup_service_id=SERVICE_ID, - timeout=TIMEOUT, - ) - # [END how_to_cloud_dataproc_metastore_restore_service_operator] - # [START how_to_cloud_dataproc_metastore_delete_service_operator] delete_service = DataprocMetastoreDeleteServiceOperator( task_id="delete_service", @@ -209,15 +169,30 @@ ) # [END how_to_cloud_dataproc_metastore_delete_service_operator] - chain( - create_service, - update_service, - get_service_details, - backup_service, - list_backups, - restore_service, - delete_backup, - export_metadata, - import_metadata, - delete_service, + delete_service.trigger_rule = TriggerRule.ALL_DONE + delete_bucket = GCSDeleteBucketOperator( + task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE + ) + + ( + create_bucket + >> create_service + >> get_service_details + >> update_service + >> import_metadata + >> export_metadata + >> delete_service + >> delete_bucket ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "teardown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py new file mode 100644 index 0000000000000..5351d9df7d618 --- /dev/null +++ b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py @@ -0,0 +1,136 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Airflow System Test DAG that verifies Dataproc Metastore +operators for managing backups. +""" +from __future__ import annotations + +import datetime +import os + +from airflow import models +from airflow.providers.google.cloud.operators.dataproc_metastore import ( + DataprocMetastoreCreateBackupOperator, + DataprocMetastoreCreateServiceOperator, + DataprocMetastoreDeleteBackupOperator, + DataprocMetastoreDeleteServiceOperator, + DataprocMetastoreListBackupsOperator, + DataprocMetastoreRestoreServiceOperator, +) +from airflow.utils.trigger_rule import TriggerRule + +DAG_ID = "dataproc_metastore_backup" + +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "") +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") + +SERVICE_ID = f"{DAG_ID}-service-{ENV_ID}".replace("_", "-") +BACKUP_ID = f"{DAG_ID}-backup-{ENV_ID}".replace("_", "-") +REGION = "europe-west1" +TIMEOUT = 1200 +# Service definition +SERVICE = { + "name": "test-service", +} +# Backup definition +# [START how_to_cloud_dataproc_metastore_create_backup] +BACKUP = { + "name": "test-backup", +} +# [END how_to_cloud_dataproc_metastore_create_backup] + +with models.DAG( + DAG_ID, + start_date=datetime.datetime(2021, 1, 1), + schedule="@once", + catchup=False, + tags=["example", "dataproc", "metastore"], +) as dag: + create_service = DataprocMetastoreCreateServiceOperator( + task_id="create_service", + region=REGION, + project_id=PROJECT_ID, + service=SERVICE, + service_id=SERVICE_ID, + timeout=TIMEOUT, + ) + # [START how_to_cloud_dataproc_metastore_create_backup_operator] + backup_service = DataprocMetastoreCreateBackupOperator( + task_id="create_backup", + project_id=PROJECT_ID, + region=REGION, + service_id=SERVICE_ID, + backup=BACKUP, + backup_id=BACKUP_ID, + timeout=TIMEOUT, + ) + # [END how_to_cloud_dataproc_metastore_create_backup_operator] + # [START how_to_cloud_dataproc_metastore_list_backups_operator] + list_backups = DataprocMetastoreListBackupsOperator( + task_id="list_backups", + project_id=PROJECT_ID, + region=REGION, + service_id=SERVICE_ID, + ) + # [END how_to_cloud_dataproc_metastore_list_backups_operator] + # [START how_to_cloud_dataproc_metastore_delete_backup_operator] + delete_backup = DataprocMetastoreDeleteBackupOperator( + task_id="delete_backup", + project_id=PROJECT_ID, + region=REGION, + service_id=SERVICE_ID, + backup_id=BACKUP_ID, + timeout=TIMEOUT, + ) + # [END how_to_cloud_dataproc_metastore_delete_backup_operator] + delete_backup.trigger_rule = TriggerRule.ALL_DONE + # [START how_to_cloud_dataproc_metastore_restore_service_operator] + restore_service = DataprocMetastoreRestoreServiceOperator( + task_id="restore_metastore", + region=REGION, + project_id=PROJECT_ID, + service_id=SERVICE_ID, + backup_id=BACKUP_ID, + backup_region=REGION, + backup_project_id=PROJECT_ID, + backup_service_id=SERVICE_ID, + timeout=TIMEOUT, + ) + # [END how_to_cloud_dataproc_metastore_restore_service_operator] + delete_service = DataprocMetastoreDeleteServiceOperator( + task_id="delete_service", + region=REGION, + project_id=PROJECT_ID, + service_id=SERVICE_ID, + timeout=TIMEOUT, + trigger_rule=TriggerRule.ALL_DONE, + ) + (create_service >> backup_service >> list_backups >> restore_service >> delete_backup >> delete_service) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "teardown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)