diff --git a/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py b/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py index 5b65917638a4c..7c7accddc9e32 100644 --- a/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py +++ b/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py @@ -164,8 +164,8 @@ def create_transfer_job(self, body: dict) -> dict: :rtype: dict """ body = self._inject_project_id(body, BODY, PROJECT_ID) - try: + try: transfer_job = ( self.get_conn().transferJobs().create(body=body).execute(num_retries=self.num_retries) ) diff --git a/airflow/providers/google/cloud/links/cloud_storage_transfer.py b/airflow/providers/google/cloud/links/cloud_storage_transfer.py new file mode 100644 index 0000000000000..ee5e5a2475b77 --- /dev/null +++ b/airflow/providers/google/cloud/links/cloud_storage_transfer.py @@ -0,0 +1,127 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""This module contains Google Storage Transfer Service links.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from airflow.providers.google.cloud.links.base import BaseGoogleLink + +if TYPE_CHECKING: + from airflow.utils.context import Context + +CLOUD_STORAGE_TRANSFER_BASE_LINK = "https://console.cloud.google.com/transfer" + +CLOUD_STORAGE_TRANSFER_LIST_LINK = CLOUD_STORAGE_TRANSFER_BASE_LINK + "/jobs?project={project_id}" + +CLOUD_STORAGE_TRANSFER_JOB_LINK = ( + CLOUD_STORAGE_TRANSFER_BASE_LINK + "/jobs/transferJobs%2F{transfer_job}/runs?project={project_id}" +) + +CLOUD_STORAGE_TRANSFER_OPERATION_LINK = ( + CLOUD_STORAGE_TRANSFER_BASE_LINK + + "/jobs/transferJobs%2F{transfer_job}/runs/transferOperations%2F{transfer_operation}" + + "?project={project_id}" +) + + +class CloudStorageTransferLinkHelper: + """Helper class for Storage Transfer links""" + + @staticmethod + def extract_parts(operation_name: str | None): + if not operation_name: + return "", "" + transfer_operation = operation_name.split("/")[1] + transfer_job = operation_name.split("-")[1] + return transfer_operation, transfer_job + + +class CloudStorageTransferListLink(BaseGoogleLink): + """Helper class for constructing Cloud Storage Transfer Link""" + + name = "Cloud Storage Transfer" + key = "cloud_storage_transfer" + format_str = CLOUD_STORAGE_TRANSFER_LIST_LINK + + @staticmethod + def persist( + context: Context, + task_instance, + project_id: str, + ): + task_instance.xcom_push( + context, + key=CloudStorageTransferListLink.key, + value={"project_id": project_id}, + ) + + +class CloudStorageTransferJobLink(BaseGoogleLink): + """Helper class for constructing Storage Transfer Job Link""" + + name = "Cloud Storage Transfer Job" + key = "cloud_storage_transfer_job" + format_str = CLOUD_STORAGE_TRANSFER_JOB_LINK + + @staticmethod + def persist( + task_instance, + context: Context, + project_id: str, + job_name: str, + ): + + job_name = job_name.split('/')[1] if job_name else "" + + task_instance.xcom_push( + context, + key=CloudStorageTransferJobLink.key, + value={ + "project_id": project_id, + "transfer_job": job_name, + }, + ) + + +class CloudStorageTransferDetailsLink(BaseGoogleLink): + """Helper class for constructing Cloud Storage Transfer Operation Link""" + + name = "Cloud Storage Transfer Details" + key = "cloud_storage_transfer_details" + format_str = CLOUD_STORAGE_TRANSFER_OPERATION_LINK + + @staticmethod + def persist( + task_instance, + context: Context, + project_id: str, + operation_name: str, + ): + transfer_operation, transfer_job = CloudStorageTransferLinkHelper.extract_parts(operation_name) + + task_instance.xcom_push( + context, + key=CloudStorageTransferDetailsLink.key, + value={ + "project_id": project_id, + "transfer_job": transfer_job, + "transfer_operation": transfer_operation, + }, + ) diff --git a/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py b/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py index 2c9b95ab137dd..51a7e29a27867 100644 --- a/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py +++ b/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py @@ -55,6 +55,11 @@ CloudDataTransferServiceHook, GcpTransferJobsStatus, ) +from airflow.providers.google.cloud.links.cloud_storage_transfer import ( + CloudStorageTransferDetailsLink, + CloudStorageTransferJobLink, + CloudStorageTransferListLink, +) from airflow.providers.google.cloud.utils.helpers import normalize_directory_path if TYPE_CHECKING: @@ -218,6 +223,7 @@ class CloudDataTransferServiceCreateJobOperator(BaseOperator): 'google_impersonation_chain', ) # [END gcp_transfer_job_create_template_fields] + operator_extra_links = (CloudStorageTransferJobLink(),) def __init__( self, @@ -226,6 +232,7 @@ def __init__( aws_conn_id: str = 'aws_default', gcp_conn_id: str = 'google_cloud_default', api_version: str = 'v1', + project_id: str | None = None, google_impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: @@ -234,6 +241,7 @@ def __init__( self.aws_conn_id = aws_conn_id self.gcp_conn_id = gcp_conn_id self.api_version = api_version + self.project_id = project_id self.google_impersonation_chain = google_impersonation_chain self._validate_inputs() @@ -247,7 +255,18 @@ def execute(self, context: Context) -> dict: gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.google_impersonation_chain, ) - return hook.create_transfer_job(body=self.body) + result = hook.create_transfer_job(body=self.body) + + project_id = self.project_id or hook.project_id + if project_id: + CloudStorageTransferJobLink.persist( + context=context, + task_instance=self, + project_id=project_id, + job_name=result[NAME], + ) + + return result class CloudDataTransferServiceUpdateJobOperator(BaseOperator): @@ -291,6 +310,7 @@ class CloudDataTransferServiceUpdateJobOperator(BaseOperator): 'google_impersonation_chain', ) # [END gcp_transfer_job_update_template_fields] + operator_extra_links = (CloudStorageTransferJobLink(),) def __init__( self, @@ -300,12 +320,14 @@ def __init__( aws_conn_id: str = 'aws_default', gcp_conn_id: str = 'google_cloud_default', api_version: str = 'v1', + project_id: str | None = None, google_impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: super().__init__(**kwargs) self.job_name = job_name self.body = body + self.project_id = project_id self.gcp_conn_id = gcp_conn_id self.api_version = api_version self.aws_conn_id = aws_conn_id @@ -324,6 +346,16 @@ def execute(self, context: Context) -> dict: gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.google_impersonation_chain, ) + + project_id = self.project_id or hook.project_id + if project_id: + CloudStorageTransferJobLink.persist( + context=context, + task_instance=self, + project_id=project_id, + job_name=self.job_name, + ) + return hook.update_transfer_job(job_name=self.job_name, body=self.body) @@ -426,10 +458,12 @@ class CloudDataTransferServiceGetOperationOperator(BaseOperator): 'google_impersonation_chain', ) # [END gcp_transfer_operation_get_template_fields] + operator_extra_links = (CloudStorageTransferDetailsLink(),) def __init__( self, *, + project_id: str | None = None, operation_name: str, gcp_conn_id: str = "google_cloud_default", api_version: str = "v1", @@ -438,6 +472,7 @@ def __init__( ) -> None: super().__init__(**kwargs) self.operation_name = operation_name + self.project_id = project_id self.gcp_conn_id = gcp_conn_id self.api_version = api_version self.google_impersonation_chain = google_impersonation_chain @@ -454,6 +489,16 @@ def execute(self, context: Context) -> dict: impersonation_chain=self.google_impersonation_chain, ) operation = hook.get_transfer_operation(operation_name=self.operation_name) + + project_id = self.project_id or hook.project_id + if project_id: + CloudStorageTransferDetailsLink.persist( + context=context, + task_instance=self, + project_id=project_id, + operation_name=self.operation_name, + ) + return operation @@ -488,10 +533,12 @@ class CloudDataTransferServiceListOperationsOperator(BaseOperator): 'google_impersonation_chain', ) # [END gcp_transfer_operations_list_template_fields] + operator_extra_links = (CloudStorageTransferListLink(),) def __init__( self, request_filter: dict | None = None, + project_id: str | None = None, gcp_conn_id: str = 'google_cloud_default', api_version: str = 'v1', google_impersonation_chain: str | Sequence[str] | None = None, @@ -508,6 +555,7 @@ def __init__( super().__init__(**kwargs) self.filter = request_filter + self.project_id = project_id self.gcp_conn_id = gcp_conn_id self.api_version = api_version self.google_impersonation_chain = google_impersonation_chain @@ -525,6 +573,15 @@ def execute(self, context: Context) -> list[dict]: ) operations_list = hook.list_transfer_operations(request_filter=self.filter) self.log.info(operations_list) + + project_id = self.project_id or hook.project_id + if project_id: + CloudStorageTransferListLink.persist( + context=context, + task_instance=self, + project_id=project_id, + ) + return operations_list diff --git a/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py b/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py index aaedecc35000f..f17783f02a102 100644 --- a/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py +++ b/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py @@ -26,6 +26,7 @@ NAME, CloudDataTransferServiceHook, ) +from airflow.providers.google.cloud.links.cloud_storage_transfer import CloudStorageTransferJobLink from airflow.sensors.base import BaseSensorOperator if TYPE_CHECKING: @@ -65,6 +66,7 @@ class CloudDataTransferServiceJobStatusSensor(BaseSensorOperator): 'impersonation_chain', ) # [END gcp_transfer_job_sensor_template_fields] + operator_extra_links = (CloudStorageTransferJobLink(),) def __init__( self, @@ -103,4 +105,13 @@ def poke(self, context: Context) -> bool: if check: self.xcom_push(key="sensed_operations", value=operations, context=context) + project_id = self.project_id or hook.project_id + if project_id: + CloudStorageTransferJobLink.persist( + context=context, + task_instance=self, + project_id=project_id, + job_name=self.job_name, + ) + return check diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index b877fd5660c1c..702b73392d1fd 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -1017,6 +1017,9 @@ extra-links: - airflow.providers.google.cloud.links.life_sciences.LifeSciencesLink - airflow.providers.google.cloud.links.cloud_functions.CloudFunctionsDetailsLink - airflow.providers.google.cloud.links.cloud_functions.CloudFunctionsListLink + - airflow.providers.google.cloud.links.cloud_storage_transfer.CloudStorageTransferListLink + - airflow.providers.google.cloud.links.cloud_storage_transfer.CloudStorageTransferJobLink + - airflow.providers.google.cloud.links.cloud_storage_transfer.CloudStorageTransferDetailsLink - airflow.providers.google.common.links.storage.StorageLink - airflow.providers.google.common.links.storage.FileDetailsLink diff --git a/docs/apache-airflow-providers-google/operators/cloud/cloud_storage_transfer_service.rst b/docs/apache-airflow-providers-google/operators/cloud/cloud_storage_transfer_service.rst index 9d9866764b51c..90976814cd968 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/cloud_storage_transfer_service.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/cloud_storage_transfer_service.rst @@ -58,7 +58,7 @@ For parameter definition, take a look at Using the operator """""""""""""""""" -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py :language: python :start-after: [START howto_operator_gcp_transfer_create_job_body_gcp] :end-before: [END howto_operator_gcp_transfer_create_job_body_gcp] @@ -138,12 +138,12 @@ For parameter definition, take a look at Using the operator """""""""""""""""" -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py :language: python :start-after: [START howto_operator_gcp_transfer_update_job_body] :end-before: [END howto_operator_gcp_transfer_update_job_body] -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py :language: python :dedent: 4 :start-after: [START howto_operator_gcp_transfer_update_job] diff --git a/tests/providers/google/cloud/operators/test_cloud_storage_transfer_service.py b/tests/providers/google/cloud/operators/test_cloud_storage_transfer_service.py index 947a3bbbcd06e..ea84d836ed412 100644 --- a/tests/providers/google/cloud/operators/test_cloud_storage_transfer_service.py +++ b/tests/providers/google/cloud/operators/test_cloud_storage_transfer_service.py @@ -75,8 +75,8 @@ TASK_ID = 'task-id' IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"] -JOB_NAME = "job-name" -OPERATION_NAME = "operation-name" +JOB_NAME = "job-name/job-name" +OPERATION_NAME = "transferOperations/transferJobs-123-456" AWS_BUCKET_NAME = "aws-bucket-name" GCS_BUCKET_NAME = "gcp-bucket-name" SOURCE_PATH = None @@ -255,7 +255,7 @@ class TestGcpStorageTransferJobCreateOperator: 'airflow.providers.google.cloud.operators.cloud_storage_transfer_service.CloudDataTransferServiceHook' ) def test_job_create_gcs(self, mock_hook): - mock_hook.return_value.create_transfer_job.return_value = VALID_TRANSFER_JOB_GCS_RAW + mock_hook.return_value.create_transfer_job.return_value = VALID_TRANSFER_JOB_GCS body = deepcopy(VALID_TRANSFER_JOB_GCS) del body['name'] op = CloudDataTransferServiceCreateJobOperator( @@ -263,7 +263,7 @@ def test_job_create_gcs(self, mock_hook): task_id=TASK_ID, google_impersonation_chain=IMPERSONATION_CHAIN, ) - result = op.execute(None) + result = op.execute(context=mock.MagicMock()) mock_hook.assert_called_once_with( api_version='v1', @@ -273,14 +273,14 @@ def test_job_create_gcs(self, mock_hook): mock_hook.return_value.create_transfer_job.assert_called_once_with(body=VALID_TRANSFER_JOB_GCS_RAW) - assert result == VALID_TRANSFER_JOB_GCS_RAW + assert result == VALID_TRANSFER_JOB_GCS @mock.patch( 'airflow.providers.google.cloud.operators.cloud_storage_transfer_service.CloudDataTransferServiceHook' ) @mock.patch('airflow.providers.google.cloud.operators.cloud_storage_transfer_service.AwsBaseHook') def test_job_create_aws(self, aws_hook, mock_hook): - mock_hook.return_value.create_transfer_job.return_value = VALID_TRANSFER_JOB_AWS_RAW + mock_hook.return_value.create_transfer_job.return_value = VALID_TRANSFER_JOB_AWS aws_hook.return_value.get_credentials.return_value = Credentials( TEST_AWS_ACCESS_KEY_ID, TEST_AWS_ACCESS_SECRET, None ) @@ -292,7 +292,7 @@ def test_job_create_aws(self, aws_hook, mock_hook): google_impersonation_chain=IMPERSONATION_CHAIN, ) - result = op.execute(None) + result = op.execute(context=mock.MagicMock()) mock_hook.assert_called_once_with( api_version='v1', @@ -302,7 +302,7 @@ def test_job_create_aws(self, aws_hook, mock_hook): mock_hook.return_value.create_transfer_job.assert_called_once_with(body=VALID_TRANSFER_JOB_AWS_RAW) - assert result == VALID_TRANSFER_JOB_AWS_RAW + assert result == VALID_TRANSFER_JOB_AWS @mock.patch( 'airflow.providers.google.cloud.operators.cloud_storage_transfer_service.CloudDataTransferServiceHook' @@ -312,16 +312,16 @@ def test_job_create_multiple(self, aws_hook, gcp_hook): aws_hook.return_value.get_credentials.return_value = Credentials( TEST_AWS_ACCESS_KEY_ID, TEST_AWS_ACCESS_SECRET, None ) - gcp_hook.return_value.create_transfer_job.return_value = VALID_TRANSFER_JOB_AWS_RAW + gcp_hook.return_value.create_transfer_job.return_value = VALID_TRANSFER_JOB_AWS body = deepcopy(VALID_TRANSFER_JOB_AWS) op = CloudDataTransferServiceCreateJobOperator(body=body, task_id=TASK_ID) - result = op.execute(None) - assert result == VALID_TRANSFER_JOB_AWS_RAW + result = op.execute(context=mock.MagicMock()) + assert result == VALID_TRANSFER_JOB_AWS op = CloudDataTransferServiceCreateJobOperator(body=body, task_id=TASK_ID) - result = op.execute(None) - assert result == VALID_TRANSFER_JOB_AWS_RAW + result = op.execute(context=mock.MagicMock()) + assert result == VALID_TRANSFER_JOB_AWS # Setting all the operator's input parameters as templated dag_ids # (could be anything else) just to test if the templating works for all @@ -359,7 +359,7 @@ def test_job_update(self, mock_hook): task_id=TASK_ID, google_impersonation_chain=IMPERSONATION_CHAIN, ) - result = op.execute(None) + result = op.execute(context=mock.MagicMock()) mock_hook.assert_called_once_with( api_version='v1', @@ -454,7 +454,7 @@ def test_operation_get(self, mock_hook): task_id=TASK_ID, google_impersonation_chain=IMPERSONATION_CHAIN, ) - result = op.execute(None) + result = op.execute(context=mock.MagicMock()) mock_hook.assert_called_once_with( api_version='v1', gcp_conn_id='google_cloud_default', @@ -503,7 +503,7 @@ def test_operation_list(self, mock_hook): task_id=TASK_ID, google_impersonation_chain=IMPERSONATION_CHAIN, ) - result = op.execute(None) + result = op.execute(context=mock.MagicMock()) mock_hook.assert_called_once_with( api_version='v1', gcp_conn_id='google_cloud_default', diff --git a/tests/providers/google/cloud/operators/test_cloud_storage_transfer_service_system.py b/tests/providers/google/cloud/operators/test_cloud_storage_transfer_service_system.py deleted file mode 100644 index 471a24a9d3173..0000000000000 --- a/tests/providers/google/cloud/operators/test_cloud_storage_transfer_service_system.py +++ /dev/null @@ -1,64 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import pytest - -from airflow.providers.google.cloud.example_dags.example_cloud_storage_transfer_service_gcp import ( - GCP_PROJECT_ID, - GCP_TRANSFER_FIRST_TARGET_BUCKET, - GCP_TRANSFER_SECOND_TARGET_BUCKET, -) -from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_TRANSFER_KEY -from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context - - -@pytest.fixture -def helper(): - with provide_gcp_context(GCP_GCS_TRANSFER_KEY): - # Create buckets - GoogleSystemTest.create_gcs_bucket(GCP_TRANSFER_SECOND_TARGET_BUCKET, location="asia-east1") - GoogleSystemTest.create_gcs_bucket(GCP_TRANSFER_FIRST_TARGET_BUCKET) - GoogleSystemTest.upload_content_to_gcs("test_contents", GCP_TRANSFER_FIRST_TARGET_BUCKET, "test.txt") - - # Grant bucket permissions - project_number = GoogleSystemTest.get_project_number(GCP_PROJECT_ID) - account_email = f"project-{project_number}@storage-transfer-service.iam.gserviceaccount.com" - GoogleSystemTest.grant_bucket_access(GCP_TRANSFER_FIRST_TARGET_BUCKET, account_email) - GoogleSystemTest.grant_bucket_access(GCP_TRANSFER_SECOND_TARGET_BUCKET, account_email) - - yield - - # Remove buckets - GoogleSystemTest.delete_gcs_bucket(GCP_TRANSFER_SECOND_TARGET_BUCKET) - GoogleSystemTest.delete_gcs_bucket(GCP_TRANSFER_FIRST_TARGET_BUCKET) - - -@pytest.mark.backend("mysql", "postgres") -@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY) -class GcpTransferExampleDagsSystemTest(GoogleSystemTest): - def setUp(self): - super().setUp() - - @pytest.mark.usefixtures("helper") - @provide_gcp_context(GCP_GCS_TRANSFER_KEY) - def test_run_example_dag_compute(self): - self.run_dag('example_gcp_transfer', CLOUD_DAG_FOLDER) - - def tearDown(self): - super().tearDown() diff --git a/tests/providers/google/cloud/sensors/test_cloud_storage_transfer_service.py b/tests/providers/google/cloud/sensors/test_cloud_storage_transfer_service.py index d0a0a4c345f85..3db30455b225c 100644 --- a/tests/providers/google/cloud/sensors/test_cloud_storage_transfer_service.py +++ b/tests/providers/google/cloud/sensors/test_cloud_storage_transfer_service.py @@ -32,6 +32,7 @@ "bytesFoundFromSource": 512, "bytesCopiedToSink": 1024, } +JOB_NAME = "job-name/123" class TestGcpStorageTransferOperationWaitForJobStatusSensor(unittest.TestCase): @@ -53,7 +54,7 @@ def test_wait_for_status_success(self, mock_tool): op = CloudDataTransferServiceJobStatusSensor( task_id='task-id', - job_name='job-name', + job_name=JOB_NAME, project_id='project-id', expected_statuses=GcpTransferOperationStatus.SUCCESS, ) @@ -62,7 +63,7 @@ def test_wait_for_status_success(self, mock_tool): result = op.poke(context) mock_tool.return_value.list_transfer_operations.assert_called_once_with( - request_filter={'project_id': 'project-id', 'job_names': ['job-name']} + request_filter={'project_id': 'project-id', 'job_names': [JOB_NAME]} ) mock_tool.operations_contain_expected_statuses.assert_called_once_with( operations=operations, expected_statuses={GcpTransferOperationStatus.SUCCESS} @@ -76,7 +77,7 @@ def test_wait_for_status_success_default_expected_status(self, mock_tool): op = CloudDataTransferServiceJobStatusSensor( task_id='task-id', - job_name='job-name', + job_name=JOB_NAME, project_id='project-id', expected_statuses=GcpTransferOperationStatus.SUCCESS, ) @@ -120,7 +121,7 @@ def test_wait_for_status_after_retry(self, mock_tool): op = CloudDataTransferServiceJobStatusSensor( task_id='task-id', - job_name='job-name', + job_name=JOB_NAME, project_id='project-id', expected_statuses=GcpTransferOperationStatus.SUCCESS, ) @@ -171,7 +172,7 @@ def test_wait_for_status_normalize_status(self, expected_status, received_status op = CloudDataTransferServiceJobStatusSensor( task_id='task-id', - job_name='job-name', + job_name=JOB_NAME, project_id='project-id', expected_statuses=expected_status, ) diff --git a/tests/system/providers/google/cloud/storage_transfer/__init__.py b/tests/system/providers/google/cloud/storage_transfer/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/system/providers/google/cloud/storage_transfer/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.py b/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py similarity index 64% rename from airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.py rename to tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py index a075999d23397..c207cfd715dd3 100644 --- a/airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.py +++ b/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py @@ -15,6 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + """ Example Airflow DAG that demonstrates interactions with Google Cloud Transfer. @@ -26,10 +27,12 @@ It is also a source bucket in next step * GCP_TRANSFER_SECOND_TARGET_BUCKET - Google Cloud Storage bucket to which files are copied """ + from __future__ import annotations import os from datetime import datetime, timedelta +from pathlib import Path from airflow import models from airflow.models.baseoperator import chain @@ -61,31 +64,39 @@ CloudDataTransferServiceListOperationsOperator, CloudDataTransferServiceUpdateJobOperator, ) +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator from airflow.providers.google.cloud.sensors.cloud_storage_transfer_service import ( CloudDataTransferServiceJobStatusSensor, ) +from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator +from airflow.utils.trigger_rule import TriggerRule -GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") -GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get( - "GCP_TRANSFER_FIRST_TARGET_BUCKET", "gcp-transfer-first-target" -) -GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get( - "GCP_TRANSFER_SECOND_TARGET_BUCKET", "gcp-transfer-second-target" -) +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +PROJECT_ID_TRANSFER = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") + +DAG_ID = "example_gcp_transfer" + +BUCKET_NAME_SRC = f"src-bucket-{DAG_ID}-{ENV_ID}" +BUCKET_NAME_DST = f"dst-bucket-{DAG_ID}-{ENV_ID}" +FILE_NAME = "file" +FILE_URI = f"gs://{BUCKET_NAME_SRC}/{FILE_NAME}" + +CURRENT_FOLDER = Path(__file__).parent +FILE_LOCAL_PATH = str(Path(CURRENT_FOLDER) / "resources" / FILE_NAME) # [START howto_operator_gcp_transfer_create_job_body_gcp] gcs_to_gcs_transfer_body = { DESCRIPTION: "description", STATUS: GcpTransferJobsStatus.ENABLED, - PROJECT_ID: GCP_PROJECT_ID, + PROJECT_ID: PROJECT_ID_TRANSFER, SCHEDULE: { SCHEDULE_START_DATE: datetime(2015, 1, 1).date(), SCHEDULE_END_DATE: datetime(2030, 1, 1).date(), START_TIME_OF_DAY: (datetime.utcnow() + timedelta(seconds=120)).time(), }, TRANSFER_SPEC: { - GCS_DATA_SOURCE: {BUCKET_NAME: GCP_TRANSFER_FIRST_TARGET_BUCKET}, - GCS_DATA_SINK: {BUCKET_NAME: GCP_TRANSFER_SECOND_TARGET_BUCKET}, + GCS_DATA_SOURCE: {BUCKET_NAME: BUCKET_NAME_SRC}, + GCS_DATA_SINK: {BUCKET_NAME: BUCKET_NAME_DST}, TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True}, }, } @@ -93,21 +104,42 @@ # [START howto_operator_gcp_transfer_update_job_body] update_body = { - PROJECT_ID: GCP_PROJECT_ID, + PROJECT_ID: PROJECT_ID_TRANSFER, TRANSFER_JOB: {DESCRIPTION: "description_updated"}, TRANSFER_JOB_FIELD_MASK: "description", } # [END howto_operator_gcp_transfer_update_job_body] with models.DAG( - "example_gcp_transfer", + DAG_ID, + schedule="@once", # Override to match your needs start_date=datetime(2021, 1, 1), catchup=False, tags=["example"], ) as dag: + create_bucket_src = GCSCreateBucketOperator( + task_id="create_bucket_src", + bucket_name=BUCKET_NAME_SRC, + project_id=PROJECT_ID_TRANSFER, + ) + + upload_file = LocalFilesystemToGCSOperator( + task_id="upload_file", + src=FILE_LOCAL_PATH, + dst=FILE_NAME, + bucket=BUCKET_NAME_SRC, + ) + + create_bucket_dst = GCSCreateBucketOperator( + task_id="create_bucket_dst", + bucket_name=BUCKET_NAME_DST, + project_id=PROJECT_ID_TRANSFER, + ) + create_transfer = CloudDataTransferServiceCreateJobOperator( - task_id="create_transfer", body=gcs_to_gcs_transfer_body + task_id="create_transfer", + body=gcs_to_gcs_transfer_body, ) # [START howto_operator_gcp_transfer_update_job] @@ -121,14 +153,14 @@ wait_for_transfer = CloudDataTransferServiceJobStatusSensor( task_id="wait_for_transfer", job_name="{{task_instance.xcom_pull('create_transfer')['name']}}", - project_id=GCP_PROJECT_ID, + project_id=PROJECT_ID_TRANSFER, expected_statuses={GcpTransferOperationStatus.SUCCESS}, ) list_operations = CloudDataTransferServiceListOperationsOperator( task_id="list_operations", request_filter={ - FILTER_PROJECT_ID: GCP_PROJECT_ID, + FILTER_PROJECT_ID: PROJECT_ID_TRANSFER, FILTER_JOB_NAMES: ["{{task_instance.xcom_pull('create_transfer')['name']}}"], }, ) @@ -141,14 +173,38 @@ delete_transfer = CloudDataTransferServiceDeleteJobOperator( task_id="delete_transfer_from_gcp_job", job_name="{{task_instance.xcom_pull('create_transfer')['name']}}", - project_id=GCP_PROJECT_ID, + project_id=PROJECT_ID_TRANSFER, + ) + + delete_bucket_dst = GCSDeleteBucketOperator( + task_id="delete_bucket", bucket_name=BUCKET_NAME_DST, trigger_rule=TriggerRule.ALL_DONE + ) + + delete_bucket_src = GCSDeleteBucketOperator( + task_id="delete_bucket_src", bucket_name=BUCKET_NAME_SRC, trigger_rule=TriggerRule.ALL_DONE ) chain( + create_bucket_src, + upload_file, + create_bucket_dst, create_transfer, wait_for_transfer, update_transfer, list_operations, get_operation, delete_transfer, + delete_bucket_src, + delete_bucket_dst, ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/google/cloud/storage_transfer/resources/__init__.py b/tests/system/providers/google/cloud/storage_transfer/resources/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/system/providers/google/cloud/storage_transfer/resources/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/system/providers/google/cloud/storage_transfer/resources/file b/tests/system/providers/google/cloud/storage_transfer/resources/file new file mode 100644 index 0000000000000..e69de29bb2d1d