From 257136786c9a3eebbae717738637ab24fd6ab563 Mon Sep 17 00:00:00 2001 From: Akash Sharma <35839624+Adaverse@users.noreply.github.com> Date: Sat, 8 Jul 2023 10:31:20 +0530 Subject: [PATCH] Moves `AzureBlobStorageToGCSOperator` from Azure to Google provider (#32306) * moved AzureBlobStorageToGCSOperator to google provider --- .../cloud/transfers/azure_blob_to_gcs.py | 118 ++++++++++++++++++ airflow/providers/google/provider.yaml | 5 +- .../providers/microsoft/azure/provider.yaml | 5 +- .../azure/transfers/azure_blob_to_gcs.py | 108 +++------------- .../connections/gcp.rst | 2 +- .../operators/transfer/azure_blob_to_gcs.rst | 53 ++++++++ .../redirects.txt | 1 + .../transfer/azure_blob_to_gcs.rst | 44 +------ .../transfers/test_azure_blob_to_gcs.py | 8 +- .../cloud}/azure/example_azure_blob_to_gcs.py | 35 ++---- 10 files changed, 211 insertions(+), 168 deletions(-) create mode 100644 airflow/providers/google/cloud/transfers/azure_blob_to_gcs.py create mode 100644 docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst rename tests/providers/{microsoft/azure => google/cloud}/transfers/test_azure_blob_to_gcs.py (90%) rename tests/system/providers/{microsoft => google/cloud}/azure/example_azure_blob_to_gcs.py (71%) diff --git a/airflow/providers/google/cloud/transfers/azure_blob_to_gcs.py b/airflow/providers/google/cloud/transfers/azure_blob_to_gcs.py new file mode 100644 index 0000000000000..8ba6f2d6eb079 --- /dev/null +++ b/airflow/providers/google/cloud/transfers/azure_blob_to_gcs.py @@ -0,0 +1,118 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import tempfile +from typing import TYPE_CHECKING, Sequence + +from airflow.models import BaseOperator +from airflow.providers.google.cloud.hooks.gcs import GCSHook +from airflow.providers.microsoft.azure.hooks.wasb import WasbHook + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class AzureBlobStorageToGCSOperator(BaseOperator): + """ + Operator transfers data from Azure Blob Storage to specified bucket in Google Cloud Storage. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AzureBlobStorageToGCSOperator` + + :param wasb_conn_id: Reference to the wasb connection. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param blob_name: Name of the blob + :param container_name: Name of the container + :param bucket_name: The bucket to upload to + :param object_name: The object name to set when uploading the file + :param filename: The local file path to the file to be uploaded + :param gzip: Option to compress local file or file data for upload + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account. + """ + + def __init__( + self, + *, + wasb_conn_id="wasb_default", + gcp_conn_id: str = "google_cloud_default", + blob_name: str, + container_name: str, + bucket_name: str, + object_name: str, + filename: str, + gzip: bool, + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.wasb_conn_id = wasb_conn_id + self.gcp_conn_id = gcp_conn_id + self.blob_name = blob_name + self.container_name = container_name + self.bucket_name = bucket_name + self.object_name = object_name + self.filename = filename + self.gzip = gzip + self.impersonation_chain = impersonation_chain + + template_fields: Sequence[str] = ( + "blob_name", + "container_name", + "bucket_name", + "object_name", + "filename", + ) + + def execute(self, context: Context) -> str: + azure_hook = WasbHook(wasb_conn_id=self.wasb_conn_id) + gcs_hook = GCSHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + + with tempfile.NamedTemporaryFile() as temp_file: + self.log.info("Downloading data from blob: %s", self.blob_name) + azure_hook.get_file( + file_path=temp_file.name, + container_name=self.container_name, + blob_name=self.blob_name, + ) + self.log.info( + "Uploading data from blob's: %s into GCP bucket: %s", self.object_name, self.bucket_name + ) + gcs_hook.upload( + bucket_name=self.bucket_name, + object_name=self.object_name, + filename=temp_file.name, + gzip=self.gzip, + ) + self.log.info( + "Resources have been uploaded from blob: %s to GCS bucket:%s", + self.blob_name, + self.bucket_name, + ) + return f"gs://{self.bucket_name}/{self.object_name}" diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index 02168a12c9535..c240940dc9f9b 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -1007,7 +1007,10 @@ transfers: target-integration-name: Google Cloud Storage (GCS) python-module: airflow.providers.google.cloud.transfers.mssql_to_gcs how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/mssql_to_gcs.rst - + - source-integration-name: Microsoft Azure Blob Storage + target-integration-name: Google Cloud Storage (GCS) + python-module: airflow.providers.google.cloud.transfers.azure_blob_to_gcs + how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst connection-types: - hook-class-name: airflow.providers.google.common.hooks.base_google.GoogleBaseHook diff --git a/airflow/providers/microsoft/azure/provider.yaml b/airflow/providers/microsoft/azure/provider.yaml index a1070177bc11b..441f51a5786b1 100644 --- a/airflow/providers/microsoft/azure/provider.yaml +++ b/airflow/providers/microsoft/azure/provider.yaml @@ -242,12 +242,15 @@ transfers: python-module: airflow.providers.microsoft.azure.transfers.local_to_wasb - source-integration-name: Microsoft Azure Blob Storage target-integration-name: Google Cloud Storage (GCS) - how-to-guide: /docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst python-module: airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs - source-integration-name: SSH File Transfer Protocol (SFTP) target-integration-name: Microsoft Azure Blob Storage how-to-guide: /docs/apache-airflow-providers-microsoft-azure/transfer/sftp_to_wasb.rst python-module: airflow.providers.microsoft.azure.transfers.sftp_to_wasb + - source-integration-name: Microsoft Azure Blob Storage + target-integration-name: Google Cloud Storage (GCS) + how-to-guide: /docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst + python-module: airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs connection-types: diff --git a/airflow/providers/microsoft/azure/transfers/azure_blob_to_gcs.py b/airflow/providers/microsoft/azure/transfers/azure_blob_to_gcs.py index 8ba6f2d6eb079..20edde6e09381 100644 --- a/airflow/providers/microsoft/azure/transfers/azure_blob_to_gcs.py +++ b/airflow/providers/microsoft/azure/transfers/azure_blob_to_gcs.py @@ -17,102 +17,26 @@ # under the License. from __future__ import annotations -import tempfile -from typing import TYPE_CHECKING, Sequence +import warnings -from airflow.models import BaseOperator -from airflow.providers.google.cloud.hooks.gcs import GCSHook -from airflow.providers.microsoft.azure.hooks.wasb import WasbHook +from airflow.exceptions import AirflowProviderDeprecationWarning +from airflow.providers.google.cloud.transfers.azure_blob_to_gcs import ( + AzureBlobStorageToGCSOperator as AzureBlobStorageToGCSOperatorFromGoogleProvider, +) -if TYPE_CHECKING: - from airflow.utils.context import Context - -class AzureBlobStorageToGCSOperator(BaseOperator): +class AzureBlobStorageToGCSOperator(AzureBlobStorageToGCSOperatorFromGoogleProvider): """ - Operator transfers data from Azure Blob Storage to specified bucket in Google Cloud Storage. - - .. seealso:: - For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:AzureBlobStorageToGCSOperator` - - :param wasb_conn_id: Reference to the wasb connection. - :param gcp_conn_id: The connection ID to use when fetching connection info. - :param blob_name: Name of the blob - :param container_name: Name of the container - :param bucket_name: The bucket to upload to - :param object_name: The object name to set when uploading the file - :param filename: The local file path to the file to be uploaded - :param gzip: Option to compress local file or file data for upload - :param impersonation_chain: Optional service account to impersonate using short-term - credentials, or chained list of accounts required to get the access_token - of the last account in the list, which will be impersonated in the request. - If set as a string, the account must grant the originating account - the Service Account Token Creator IAM role. - If set as a sequence, the identities from the list must grant - Service Account Token Creator IAM role to the directly preceding identity, with first - account from the list granting this role to the originating account. + This class is deprecated. + Please use `airflow.providers.google.cloud.transfers.azure_blob_to_gcs.AzureBlobStorageToGCSOperator`. """ - def __init__( - self, - *, - wasb_conn_id="wasb_default", - gcp_conn_id: str = "google_cloud_default", - blob_name: str, - container_name: str, - bucket_name: str, - object_name: str, - filename: str, - gzip: bool, - impersonation_chain: str | Sequence[str] | None = None, - **kwargs, - ) -> None: - super().__init__(**kwargs) - self.wasb_conn_id = wasb_conn_id - self.gcp_conn_id = gcp_conn_id - self.blob_name = blob_name - self.container_name = container_name - self.bucket_name = bucket_name - self.object_name = object_name - self.filename = filename - self.gzip = gzip - self.impersonation_chain = impersonation_chain - - template_fields: Sequence[str] = ( - "blob_name", - "container_name", - "bucket_name", - "object_name", - "filename", - ) - - def execute(self, context: Context) -> str: - azure_hook = WasbHook(wasb_conn_id=self.wasb_conn_id) - gcs_hook = GCSHook( - gcp_conn_id=self.gcp_conn_id, - impersonation_chain=self.impersonation_chain, + def __init__(self, *args, **kwargs): + warnings.warn( + """This class is deprecated. + Please use + `airflow.providers.google.cloud.transfers.azure_blob_to_gcs.AzureBlobStorageToGCSOperator`.""", + AirflowProviderDeprecationWarning, + stacklevel=2, ) - - with tempfile.NamedTemporaryFile() as temp_file: - self.log.info("Downloading data from blob: %s", self.blob_name) - azure_hook.get_file( - file_path=temp_file.name, - container_name=self.container_name, - blob_name=self.blob_name, - ) - self.log.info( - "Uploading data from blob's: %s into GCP bucket: %s", self.object_name, self.bucket_name - ) - gcs_hook.upload( - bucket_name=self.bucket_name, - object_name=self.object_name, - filename=temp_file.name, - gzip=self.gzip, - ) - self.log.info( - "Resources have been uploaded from blob: %s to GCS bucket:%s", - self.blob_name, - self.bucket_name, - ) - return f"gs://{self.bucket_name}/{self.object_name}" + super().__init__(*args, **kwargs) diff --git a/docs/apache-airflow-providers-google/connections/gcp.rst b/docs/apache-airflow-providers-google/connections/gcp.rst index c8374d455604f..91e54d1b1cd64 100644 --- a/docs/apache-airflow-providers-google/connections/gcp.rst +++ b/docs/apache-airflow-providers-google/connections/gcp.rst @@ -298,4 +298,4 @@ Note that as domain-wide delegation is currently supported by most of the Google * All of Google Cloud operators and hooks. * Firebase hooks. -* All transfer operators that involve Google cloud in different providers, for example: :class:`airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs`. +* All transfer operators that involve Google cloud in different providers, for example: :class:`airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Operator`. diff --git a/docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst b/docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst new file mode 100644 index 0000000000000..1568283ce196a --- /dev/null +++ b/docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst @@ -0,0 +1,53 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Azure Blob Storage to Google Cloud Storage (GCS) Transfer Operator +================================================================== +The `Google Cloud Storage `__ (GCS) is used to store large data from various applications. +This is also the same with `Azure Blob Storage `__. +This page shows how to transfer data from Azure Blob Storage to GCS. + +Prerequisite Tasks +^^^^^^^^^^^^^^^^^^ + +.. include:: ../_partials/prerequisite_tasks.rst + +.. _howto/operator:AzureBlobStorageToGCSOperator: + +Transfer Data from Azure Blob Storage to Google Cloud Storage +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Operator transfers data from Azure Blob Storage to specified bucket in Google Cloud Storage + +Use the :class:`~airflow.providers.google.cloud.transfers.azure_blob_to_gcs.AzureBlobStorageToGCSOperator` +to transfer data from Azure Blob Storage to Google Cloud Storage. + +Example usage: + +.. exampleinclude:: /../../tests/system/providers/google/cloud/azure/example_azure_blob_to_gcs.py + :language: python + :start-after: [START how_to_azure_blob_to_gcs] + :end-before: [END how_to_azure_blob_to_gcs] + +Reference +^^^^^^^^^ + +For further information, look at: + +* `GCS Client Library Documentation `__ +* `GCS Product Documentation `__ +* `Azure Blob Storage Client Library Documentation `__ diff --git a/docs/apache-airflow-providers-microsoft-azure/redirects.txt b/docs/apache-airflow-providers-microsoft-azure/redirects.txt index 3ce11766b7704..998489b9fbd2e 100644 --- a/docs/apache-airflow-providers-microsoft-azure/redirects.txt +++ b/docs/apache-airflow-providers-microsoft-azure/redirects.txt @@ -1,3 +1,4 @@ connections/index.rst connections/azure.rst secrets-backends/index.rst secrets-backends/azure-key-vault-secrets-backend.rst logging.rst logging/index.rst +transfer/azure_blob_to_gcs.rst ../../apache-airflow-providers-google/latest/operators/transfer/azure_blob_to_gcs.rst diff --git a/docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst b/docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst index b4c4863ac5f85..df2cd0bcceea0 100644 --- a/docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst +++ b/docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst @@ -1,4 +1,3 @@ - .. Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information @@ -16,45 +15,6 @@ specific language governing permissions and limitations under the License. -Azure Blob Storage to Google Cloud Storage (GCS) Transfer Operator -================================================================== -The Blob service stores text and binary data as objects in the cloud. -The Blob service offers the following three resources: the storage account, containers, and blobs. -Within your storage account, containers provide a way to organize sets of blobs. -For more information about the service visit `Azure Blob Storage API documentation `_. - -Before you begin -^^^^^^^^^^^^^^^^ -Before using Blob Storage within Airflow you need to authenticate your account with Token, Login and Password. -Please follow Azure -`instructions `_ -to do it. - -TOKEN should be added to the Connection in Airflow in JSON format, Login and Password as plain text. -You can check `how to do such connection `_. - -See following example. -Set values for these fields: - -.. code-block:: - - Connection Id: wasb_default - Login: Storage Account Name - Password: KEY1 - Extra: {"sas_token": "TOKEN"} - -.. _howto/operator:AzureBlobStorageToGCSOperator: - -Transfer Data from Blob Storage to Google Cloud Storage -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Operator transfers data from Azure Blob Storage to specified bucket in Google Cloud Storage - -To get information about jobs within a Azure Blob Storage use: -:class:`~airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs.AzureBlobStorageToGCSOperator` - -Example usage: -.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_azure_blob_to_gcs.py - :language: python - :start-after: [START how_to_azure_blob_to_gcs] - :end-before: [END how_to_azure_blob_to_gcs] +Upload data from Azure Blob Storage to Google Cloud Storage (Moved to Google Providers) +======================================================================================= diff --git a/tests/providers/microsoft/azure/transfers/test_azure_blob_to_gcs.py b/tests/providers/google/cloud/transfers/test_azure_blob_to_gcs.py similarity index 90% rename from tests/providers/microsoft/azure/transfers/test_azure_blob_to_gcs.py rename to tests/providers/google/cloud/transfers/test_azure_blob_to_gcs.py index 4bc51895f3e4c..a0b3eae99d683 100644 --- a/tests/providers/microsoft/azure/transfers/test_azure_blob_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_azure_blob_to_gcs.py @@ -18,7 +18,7 @@ from unittest import mock -from airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs import AzureBlobStorageToGCSOperator +from airflow.providers.google.cloud.transfers.azure_blob_to_gcs import AzureBlobStorageToGCSOperator WASB_CONN_ID = "wasb_default" GCP_CONN_ID = "google_cloud_default" @@ -57,9 +57,9 @@ def test_init(self): assert operator.impersonation_chain == IMPERSONATION_CHAIN assert operator.task_id == TASK_ID - @mock.patch("airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs.WasbHook") - @mock.patch("airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs.GCSHook") - @mock.patch("airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs.tempfile") + @mock.patch("airflow.providers.google.cloud.transfers.azure_blob_to_gcs.WasbHook") + @mock.patch("airflow.providers.google.cloud.transfers.azure_blob_to_gcs.GCSHook") + @mock.patch("airflow.providers.google.cloud.transfers.azure_blob_to_gcs.tempfile") def test_execute(self, mock_temp, mock_hook_gcs, mock_hook_wasb): op = AzureBlobStorageToGCSOperator( wasb_conn_id=WASB_CONN_ID, diff --git a/tests/system/providers/microsoft/azure/example_azure_blob_to_gcs.py b/tests/system/providers/google/cloud/azure/example_azure_blob_to_gcs.py similarity index 71% rename from tests/system/providers/microsoft/azure/example_azure_blob_to_gcs.py rename to tests/system/providers/google/cloud/azure/example_azure_blob_to_gcs.py index 81fb9993b25dd..5d7649479df60 100644 --- a/tests/system/providers/microsoft/azure/example_azure_blob_to_gcs.py +++ b/tests/system/providers/google/cloud/azure/example_azure_blob_to_gcs.py @@ -21,11 +21,10 @@ from datetime import datetime from airflow import DAG +from airflow.providers.google.cloud.transfers.azure_blob_to_gcs import AzureBlobStorageToGCSOperator from airflow.providers.microsoft.azure.sensors.wasb import ( WasbBlobSensor, - WasbPrefixSensor, ) -from airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs import AzureBlobStorageToGCSOperator # Ignore missing args provided by default_args # type: ignore[call-arg] @@ -36,35 +35,23 @@ GCP_BUCKET_FILE_PATH = os.environ.get("GCP_BUCKET_FILE_PATH", "file.txt") GCP_BUCKET_NAME = os.environ.get("GCP_BUCKET_NAME", "INVALID BUCKET NAME") GCP_OBJECT_NAME = os.environ.get("GCP_OBJECT_NAME", "file.txt") -ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") DAG_ID = "example_azure_blob_to_gcs" -PREFIX_NAME = os.environ.get("AZURE_PREFIX_NAME", "20230421") -# [START how_to_azure_blob_to_gcs] with DAG( DAG_ID, schedule=None, start_date=datetime(2021, 1, 1), # Override to match your needs - default_args={ - # azure args - "container_name": AZURE_CONTAINER_NAME, - "blob_name": BLOB_NAME, - "prefix": PREFIX_NAME, - }, ) as dag: - wait_for_blob = WasbBlobSensor(task_id="wait_for_blob") - - wait_for_blob_async = WasbBlobSensor(task_id="wait_for_blob_async", deferrable=True) - - wait_for_blob_prefix = WasbPrefixSensor(task_id="wait_for_blob_prefix") - - wait_for_blob_prefix_async = WasbPrefixSensor( - task_id="wait_for_blob_prefix_async", - deferrable=True, + wait_for_blob = WasbBlobSensor( + task_id="wait_for_blob", container_name=AZURE_CONTAINER_NAME, blob_name=BLOB_NAME ) + # [START how_to_azure_blob_to_gcs] transfer_files_to_gcs = AzureBlobStorageToGCSOperator( task_id="transfer_files_to_gcs", + # azure args + container_name=AZURE_CONTAINER_NAME, + blob_name=BLOB_NAME, # GCP args bucket_name=GCP_BUCKET_NAME, object_name=GCP_OBJECT_NAME, @@ -74,13 +61,7 @@ ) # [END how_to_azure_blob_to_gcs] - ( - wait_for_blob - >> wait_for_blob_async - >> wait_for_blob_prefix - >> wait_for_blob_prefix_async - >> transfer_files_to_gcs - ) + (wait_for_blob >> transfer_files_to_gcs) from tests.system.utils.watcher import watcher