Skip to content

Commit

Permalink
Moves AzureBlobStorageToGCSOperator from Azure to Google provider (#…
Browse files Browse the repository at this point in the history
…32306)

* moved AzureBlobStorageToGCSOperator to google provider
  • Loading branch information
Adaverse committed Jul 8, 2023
1 parent 566bc1b commit 2571367
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 168 deletions.
118 changes: 118 additions & 0 deletions airflow/providers/google/cloud/transfers/azure_blob_to_gcs.py
@@ -0,0 +1,118 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import tempfile
from typing import TYPE_CHECKING, Sequence

from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook

if TYPE_CHECKING:
from airflow.utils.context import Context


class AzureBlobStorageToGCSOperator(BaseOperator):
"""
Operator transfers data from Azure Blob Storage to specified bucket in Google Cloud Storage.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:AzureBlobStorageToGCSOperator`
:param wasb_conn_id: Reference to the wasb connection.
:param gcp_conn_id: The connection ID to use when fetching connection info.
:param blob_name: Name of the blob
:param container_name: Name of the container
:param bucket_name: The bucket to upload to
:param object_name: The object name to set when uploading the file
:param filename: The local file path to the file to be uploaded
:param gzip: Option to compress local file or file data for upload
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account.
"""

def __init__(
self,
*,
wasb_conn_id="wasb_default",
gcp_conn_id: str = "google_cloud_default",
blob_name: str,
container_name: str,
bucket_name: str,
object_name: str,
filename: str,
gzip: bool,
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.wasb_conn_id = wasb_conn_id
self.gcp_conn_id = gcp_conn_id
self.blob_name = blob_name
self.container_name = container_name
self.bucket_name = bucket_name
self.object_name = object_name
self.filename = filename
self.gzip = gzip
self.impersonation_chain = impersonation_chain

template_fields: Sequence[str] = (
"blob_name",
"container_name",
"bucket_name",
"object_name",
"filename",
)

def execute(self, context: Context) -> str:
azure_hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
gcs_hook = GCSHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
)

with tempfile.NamedTemporaryFile() as temp_file:
self.log.info("Downloading data from blob: %s", self.blob_name)
azure_hook.get_file(
file_path=temp_file.name,
container_name=self.container_name,
blob_name=self.blob_name,
)
self.log.info(
"Uploading data from blob's: %s into GCP bucket: %s", self.object_name, self.bucket_name
)
gcs_hook.upload(
bucket_name=self.bucket_name,
object_name=self.object_name,
filename=temp_file.name,
gzip=self.gzip,
)
self.log.info(
"Resources have been uploaded from blob: %s to GCS bucket:%s",
self.blob_name,
self.bucket_name,
)
return f"gs://{self.bucket_name}/{self.object_name}"
5 changes: 4 additions & 1 deletion airflow/providers/google/provider.yaml
Expand Up @@ -1007,7 +1007,10 @@ transfers:
target-integration-name: Google Cloud Storage (GCS)
python-module: airflow.providers.google.cloud.transfers.mssql_to_gcs
how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/mssql_to_gcs.rst

- source-integration-name: Microsoft Azure Blob Storage
target-integration-name: Google Cloud Storage (GCS)
python-module: airflow.providers.google.cloud.transfers.azure_blob_to_gcs
how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst

connection-types:
- hook-class-name: airflow.providers.google.common.hooks.base_google.GoogleBaseHook
Expand Down
5 changes: 4 additions & 1 deletion airflow/providers/microsoft/azure/provider.yaml
Expand Up @@ -242,12 +242,15 @@ transfers:
python-module: airflow.providers.microsoft.azure.transfers.local_to_wasb
- source-integration-name: Microsoft Azure Blob Storage
target-integration-name: Google Cloud Storage (GCS)
how-to-guide: /docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst
python-module: airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs
- source-integration-name: SSH File Transfer Protocol (SFTP)
target-integration-name: Microsoft Azure Blob Storage
how-to-guide: /docs/apache-airflow-providers-microsoft-azure/transfer/sftp_to_wasb.rst
python-module: airflow.providers.microsoft.azure.transfers.sftp_to_wasb
- source-integration-name: Microsoft Azure Blob Storage
target-integration-name: Google Cloud Storage (GCS)
how-to-guide: /docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst
python-module: airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs


connection-types:
Expand Down
108 changes: 16 additions & 92 deletions airflow/providers/microsoft/azure/transfers/azure_blob_to_gcs.py
Expand Up @@ -17,102 +17,26 @@
# under the License.
from __future__ import annotations

import tempfile
from typing import TYPE_CHECKING, Sequence
import warnings

from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.google.cloud.transfers.azure_blob_to_gcs import (
AzureBlobStorageToGCSOperator as AzureBlobStorageToGCSOperatorFromGoogleProvider,
)

if TYPE_CHECKING:
from airflow.utils.context import Context


class AzureBlobStorageToGCSOperator(BaseOperator):
class AzureBlobStorageToGCSOperator(AzureBlobStorageToGCSOperatorFromGoogleProvider):
"""
Operator transfers data from Azure Blob Storage to specified bucket in Google Cloud Storage.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:AzureBlobStorageToGCSOperator`
:param wasb_conn_id: Reference to the wasb connection.
:param gcp_conn_id: The connection ID to use when fetching connection info.
:param blob_name: Name of the blob
:param container_name: Name of the container
:param bucket_name: The bucket to upload to
:param object_name: The object name to set when uploading the file
:param filename: The local file path to the file to be uploaded
:param gzip: Option to compress local file or file data for upload
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account.
This class is deprecated.
Please use `airflow.providers.google.cloud.transfers.azure_blob_to_gcs.AzureBlobStorageToGCSOperator`.
"""

def __init__(
self,
*,
wasb_conn_id="wasb_default",
gcp_conn_id: str = "google_cloud_default",
blob_name: str,
container_name: str,
bucket_name: str,
object_name: str,
filename: str,
gzip: bool,
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.wasb_conn_id = wasb_conn_id
self.gcp_conn_id = gcp_conn_id
self.blob_name = blob_name
self.container_name = container_name
self.bucket_name = bucket_name
self.object_name = object_name
self.filename = filename
self.gzip = gzip
self.impersonation_chain = impersonation_chain

template_fields: Sequence[str] = (
"blob_name",
"container_name",
"bucket_name",
"object_name",
"filename",
)

def execute(self, context: Context) -> str:
azure_hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
gcs_hook = GCSHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
def __init__(self, *args, **kwargs):
warnings.warn(
"""This class is deprecated.
Please use
`airflow.providers.google.cloud.transfers.azure_blob_to_gcs.AzureBlobStorageToGCSOperator`.""",
AirflowProviderDeprecationWarning,
stacklevel=2,
)

with tempfile.NamedTemporaryFile() as temp_file:
self.log.info("Downloading data from blob: %s", self.blob_name)
azure_hook.get_file(
file_path=temp_file.name,
container_name=self.container_name,
blob_name=self.blob_name,
)
self.log.info(
"Uploading data from blob's: %s into GCP bucket: %s", self.object_name, self.bucket_name
)
gcs_hook.upload(
bucket_name=self.bucket_name,
object_name=self.object_name,
filename=temp_file.name,
gzip=self.gzip,
)
self.log.info(
"Resources have been uploaded from blob: %s to GCS bucket:%s",
self.blob_name,
self.bucket_name,
)
return f"gs://{self.bucket_name}/{self.object_name}"
super().__init__(*args, **kwargs)
2 changes: 1 addition & 1 deletion docs/apache-airflow-providers-google/connections/gcp.rst
Expand Up @@ -298,4 +298,4 @@ Note that as domain-wide delegation is currently supported by most of the Google

* All of Google Cloud operators and hooks.
* Firebase hooks.
* All transfer operators that involve Google cloud in different providers, for example: :class:`airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs`.
* All transfer operators that involve Google cloud in different providers, for example: :class:`airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Operator`.
@@ -0,0 +1,53 @@

.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Azure Blob Storage to Google Cloud Storage (GCS) Transfer Operator
==================================================================
The `Google Cloud Storage <https://cloud.google.com/storage/>`__ (GCS) is used to store large data from various applications.
This is also the same with `Azure Blob Storage <https://docs.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api>`__.
This page shows how to transfer data from Azure Blob Storage to GCS.

Prerequisite Tasks
^^^^^^^^^^^^^^^^^^

.. include:: ../_partials/prerequisite_tasks.rst

.. _howto/operator:AzureBlobStorageToGCSOperator:

Transfer Data from Azure Blob Storage to Google Cloud Storage
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Operator transfers data from Azure Blob Storage to specified bucket in Google Cloud Storage

Use the :class:`~airflow.providers.google.cloud.transfers.azure_blob_to_gcs.AzureBlobStorageToGCSOperator`
to transfer data from Azure Blob Storage to Google Cloud Storage.

Example usage:

.. exampleinclude:: /../../tests/system/providers/google/cloud/azure/example_azure_blob_to_gcs.py
:language: python
:start-after: [START how_to_azure_blob_to_gcs]
:end-before: [END how_to_azure_blob_to_gcs]

Reference
^^^^^^^^^

For further information, look at:

* `GCS Client Library Documentation <https://googleapis.dev/python/storage/latest/index.html>`__
* `GCS Product Documentation <https://cloud.google.com/storage/docs/>`__
* `Azure Blob Storage Client Library Documentation <https://learn.microsoft.com/en-us/azure/storage/blobs/storage-quickstart-blobs-python>`__
@@ -1,3 +1,4 @@
connections/index.rst connections/azure.rst
secrets-backends/index.rst secrets-backends/azure-key-vault-secrets-backend.rst
logging.rst logging/index.rst
transfer/azure_blob_to_gcs.rst ../../apache-airflow-providers-google/latest/operators/transfer/azure_blob_to_gcs.rst
@@ -1,4 +1,3 @@

.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
Expand All @@ -16,45 +15,6 @@
specific language governing permissions and limitations
under the License.
Azure Blob Storage to Google Cloud Storage (GCS) Transfer Operator
==================================================================
The Blob service stores text and binary data as objects in the cloud.
The Blob service offers the following three resources: the storage account, containers, and blobs.
Within your storage account, containers provide a way to organize sets of blobs.
For more information about the service visit `Azure Blob Storage API documentation <https://docs.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api>`_.

Before you begin
^^^^^^^^^^^^^^^^
Before using Blob Storage within Airflow you need to authenticate your account with Token, Login and Password.
Please follow Azure
`instructions <https://docs.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage?tabs=azure-portal>`_
to do it.

TOKEN should be added to the Connection in Airflow in JSON format, Login and Password as plain text.
You can check `how to do such connection <https://airflow.apache.org/docs/apache-airflow/stable/howto/connection/index.html#editing-a-connection-with-the-ui>`_.

See following example.
Set values for these fields:

.. code-block::
Connection Id: wasb_default
Login: Storage Account Name
Password: KEY1
Extra: {"sas_token": "TOKEN"}
.. _howto/operator:AzureBlobStorageToGCSOperator:

Transfer Data from Blob Storage to Google Cloud Storage
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Operator transfers data from Azure Blob Storage to specified bucket in Google Cloud Storage

To get information about jobs within a Azure Blob Storage use:
:class:`~airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs.AzureBlobStorageToGCSOperator`

Example usage:
.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_azure_blob_to_gcs.py
:language: python
:start-after: [START how_to_azure_blob_to_gcs]
:end-before: [END how_to_azure_blob_to_gcs]
Upload data from Azure Blob Storage to Google Cloud Storage (Moved to Google Providers)
=======================================================================================

0 comments on commit 2571367

Please sign in to comment.