Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-3531] Add gcs to gcs transfer operator. #4331

Merged
merged 2 commits into from
Jan 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions airflow/contrib/hooks/gcp_transfer_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook

# Time to sleep between active checks of the operation results
TIME_TO_SLEEP_IN_SECONDS = 1
TIME_TO_SLEEP_IN_SECONDS = 10


# noinspection PyAbstractClass
Expand Down Expand Up @@ -56,10 +56,10 @@ def get_conn(self):
http=http_authorized, cache_discovery=False)
return self._conn

def create_transfer_job(self, project_id, description, schedule, transfer_spec):
def create_transfer_job(self, description, schedule, transfer_spec, project_id=None):
transfer_job = {
'status': 'ENABLED',
'projectId': project_id,
'projectId': project_id or self.project_id,
'description': description,
'transferSpec': transfer_spec,
'schedule': schedule or self._schedule_once_now(),
Expand Down
127 changes: 127 additions & 0 deletions airflow/contrib/operators/gcs_to_gcs_transfer_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow.models import BaseOperator
from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook
from airflow.utils.decorators import apply_defaults


class GoogleCloudStorageToGoogleCloudStorageTransferOperator(BaseOperator):
"""
Copies objects from a bucket to another using the GCP Storage Transfer
Service.

:param source_bucket: The source Google cloud storage bucket where the
object is. (templated)
:type source_bucket: str
:param destination_bucket: The destination Google cloud storage bucket
where the object should be. (templated)
:type destination_bucket: str
:param project_id: The ID of the Google Cloud Platform Console project that
owns the job
:type project_id: str
:param gcp_conn_id: Optional connection ID to use when connecting to Google Cloud
Storage.
:type gcp_conn_id: str
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have
domain-wide delegation enabled.
:type delegate_to: str
:param description: Optional transfer service job description
:type description: str
:param schedule: Optional transfer service schedule; see
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs.
If not set, run transfer job once as soon as the operator runs
:type schedule: dict
:param object_conditions: Optional transfer service object conditions; see
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#ObjectConditions
:type object_conditions: dict
:param transfer_options: Optional transfer service transfer options; see
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#TransferOptions
:type transfer_options: dict
:param wait: Wait for transfer to finish; defaults to `True`
:type wait: bool

**Example**:

.. code-block:: python

gcs_to_gcs_transfer_op = GoogleCloudStorageToGoogleCloudStorageTransferOperator(
task_id='gcs_to_gcs_transfer_example',
source_bucket='my-source-bucket',
destination_bucket='my-destination-bucket',
project_id='my-gcp-project',
dag=my_dag)
"""

template_fields = ('source_bucket', 'destination_bucket', 'description', 'object_conditions')
ui_color = '#e09411'

@apply_defaults
def __init__(self,
source_bucket,
destination_bucket,
project_id=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
description=None,
schedule=None,
object_conditions=None,
transfer_options=None,
wait=True,
*args,
**kwargs):

super(GoogleCloudStorageToGoogleCloudStorageTransferOperator, self).__init__(
*args,
**kwargs)
self.source_bucket = source_bucket
self.destination_bucket = destination_bucket
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.description = description
self.schedule = schedule
self.object_conditions = object_conditions or {}
self.transfer_options = transfer_options or {}
self.wait = wait

def execute(self, context):
transfer_hook = GCPTransferServiceHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to)

job = transfer_hook.create_transfer_job(
project_id=self.project_id,
description=self.description,
schedule=self.schedule,
transfer_spec={
'gcsDataSource': {
'bucketName': self.source_bucket,
},
'gcsDataSink': {
'bucketName': self.destination_bucket,
},
'objectConditions': self.object_conditions,
'transferOptions': self.transfer_options,
}
)

if self.wait:
transfer_hook.wait_for_transfer_job(job)
8 changes: 4 additions & 4 deletions airflow/contrib/operators/s3_to_gcs_transfer_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class S3ToGoogleCloudStorageTransferOperator(BaseOperator):
:param gcs_bucket: The destination Google Cloud Storage bucket
where you want to store the files. (templated)
:type gcs_bucket: str
:param project_id: The ID of the Google Cloud Platform Console project that
:param project_id: Optional ID of the Google Cloud Platform Console project that
owns the job
:type project_id: str
:param aws_conn_id: The source S3 connection
Expand All @@ -51,10 +51,10 @@ class S3ToGoogleCloudStorageTransferOperator(BaseOperator):
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs.
If not set, run transfer job once as soon as the operator runs
:type schedule: dict
:param object_conditions: Transfer service object conditions; see
:param object_conditions: Optional transfer service object conditions; see
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec
:type object_conditions: dict
:param transfer_options: Transfer service transfer options; see
:param transfer_options: Optional transfer service transfer options; see
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec
:type transfer_options: dict
:param wait: Wait for transfer to finish
Expand All @@ -79,7 +79,7 @@ class S3ToGoogleCloudStorageTransferOperator(BaseOperator):
def __init__(self,
s3_bucket,
gcs_bucket,
project_id,
project_id=None,
aws_conn_id='aws_default',
gcp_conn_id='google_cloud_default',
delegate_to=None,
Expand Down
1 change: 1 addition & 0 deletions docs/code.rst
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ Operators
.. autoclass:: airflow.contrib.operators.gcs_operator.GoogleCloudStorageCreateBucketOperator
.. autoclass:: airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator
.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageTransferOperator
.. autoclass:: airflow.contrib.operators.gcs_to_s3.GoogleCloudStorageToS3Operator
.. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPIOperator
.. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPISendRoomNotificationOperator
Expand Down
8 changes: 8 additions & 0 deletions docs/integration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,7 @@ Storage Operators
- :ref:`GoogleCloudStorageObjectCreateAclEntryOperator` : Creates a new ACL entry on the specified object.
- :ref:`GoogleCloudStorageToBigQueryOperator` : Loads files from Google cloud storage into BigQuery.
- :ref:`GoogleCloudStorageToGoogleCloudStorageOperator` : Copies objects from a bucket to another, with renaming if requested.
- :ref:`GoogleCloudStorageToGoogleCloudStorageTransferOperator` : Copies objects from a bucket to another using Google Transfer service.
- :ref:`MySqlToGoogleCloudStorageOperator`: Copy data from any MySQL Database to Google cloud storage in JSON format.

.. _FileToGoogleCloudStorageOperator:
Expand Down Expand Up @@ -1242,6 +1243,13 @@ GoogleCloudStorageToGoogleCloudStorageOperator

.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator

.. _GoogleCloudStorageToGoogleCloudStorageTransferOperator:

GoogleCloudStorageToGoogleCloudStorageTransferOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageTransferOperator

.. _MySqlToGoogleCloudStorageOperator:

MySqlToGoogleCloudStorageOperator
Expand Down
131 changes: 131 additions & 0 deletions tests/contrib/operators/test_gcs_to_gcs_transfer_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import unittest

from airflow.contrib.operators.gcs_to_gcs_transfer_operator import \
GoogleCloudStorageToGoogleCloudStorageTransferOperator

try:
from unittest import mock
except ImportError:
try:
import mock
except ImportError:
mock = None


TASK_ID = 'test-gcs-gcs-transfer-operator'
SOURCE_BUCKET = 'test-source-bucket'
DESTINATION_BUCKET = 'test-destination-bucket'
PROJECT_ID = 'test-project'
DESCRIPTION = 'test-description'
SCHEDULE = {
'scheduleStartDate': {'month': 10, 'day': 1, 'year': 2018},
'scheduleEndDate': {'month': 10, 'day': 31, 'year': 2018},
}


class GoogleCloudStorageToGoogleCloudStorageTransferOperatorTest(unittest.TestCase):
def test_constructor(self):
"""Test GoogleCloudStorageToGoogleCloudStorageTransferOperator instance is properly initialized."""

operator = GoogleCloudStorageToGoogleCloudStorageTransferOperator(
task_id=TASK_ID,
source_bucket=SOURCE_BUCKET,
destination_bucket=DESTINATION_BUCKET,
project_id=PROJECT_ID,
description=DESCRIPTION,
schedule=SCHEDULE,
)

self.assertEqual(operator.task_id, TASK_ID)
self.assertEqual(operator.source_bucket, SOURCE_BUCKET)
self.assertEqual(operator.destination_bucket, DESTINATION_BUCKET)
self.assertEqual(operator.project_id, PROJECT_ID)
self.assertEqual(operator.description, DESCRIPTION)
self.assertEqual(operator.schedule, SCHEDULE)

@mock.patch('airflow.contrib.operators.gcs_to_gcs_transfer_operator.GCPTransferServiceHook')
def test_execute(self, mock_transfer_hook):
"""Test the execute function when the run is successful."""

operator = GoogleCloudStorageToGoogleCloudStorageTransferOperator(
task_id=TASK_ID,
source_bucket=SOURCE_BUCKET,
destination_bucket=DESTINATION_BUCKET,
project_id=PROJECT_ID,
description=DESCRIPTION,
schedule=SCHEDULE,
)

operator.execute(None)

mock_transfer_hook.return_value.create_transfer_job.assert_called_once_with(
project_id=PROJECT_ID,
description=DESCRIPTION,
schedule=SCHEDULE,
transfer_spec={
'gcsDataSource': {
'bucketName': SOURCE_BUCKET,
},
'gcsDataSink': {
'bucketName': DESTINATION_BUCKET,
},
'objectConditions': {},
'transferOptions': {}
}
)

mock_transfer_hook.return_value.wait_for_transfer_job.assert_called_once_with(
mock_transfer_hook.return_value.create_transfer_job.return_value
)

@mock.patch('airflow.contrib.operators.gcs_to_gcs_transfer_operator.GCPTransferServiceHook')
def test_execute_skip_wait(self, mock_transfer_hook):
"""Test the execute function when the run is successful."""

operator = GoogleCloudStorageToGoogleCloudStorageTransferOperator(
task_id=TASK_ID,
source_bucket=SOURCE_BUCKET,
destination_bucket=DESTINATION_BUCKET,
project_id=PROJECT_ID,
description=DESCRIPTION,
wait=False,
)

operator.execute(None)

mock_transfer_hook.return_value.create_transfer_job.assert_called_once_with(
project_id=PROJECT_ID,
description=DESCRIPTION,
schedule=None,
transfer_spec={
'gcsDataSource': {
'bucketName': SOURCE_BUCKET,
},
'gcsDataSink': {
'bucketName': DESTINATION_BUCKET,
},
'objectConditions': {},
'transferOptions': {}
}
)

assert not mock_transfer_hook.return_value.wait_for_transfer_job.called
2 changes: 2 additions & 0 deletions tests/contrib/operators/test_s3_to_gcs_transfer_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ def test_constructor(self):
gcs_bucket=GCS_BUCKET,
project_id=PROJECT_ID,
description=DESCRIPTION,
schedule=SCHEDULE,
)

self.assertEqual(operator.task_id, TASK_ID)
self.assertEqual(operator.s3_bucket, S3_BUCKET)
self.assertEqual(operator.gcs_bucket, GCS_BUCKET)
self.assertEqual(operator.project_id, PROJECT_ID)
self.assertEqual(operator.description, DESCRIPTION)
self.assertEqual(operator.schedule, SCHEDULE)

@mock.patch('airflow.contrib.operators.s3_to_gcs_transfer_operator.GCPTransferServiceHook')
@mock.patch('airflow.contrib.operators.s3_to_gcs_transfer_operator.S3Hook')
Expand Down