From 2772a9d45b569e41e213664d063df17097290e5c Mon Sep 17 00:00:00 2001 From: Joshua Carp Date: Sun, 6 Jan 2019 14:35:31 -0500 Subject: [PATCH] [AIRFLOW-3531] Add gcs to gcs transfer operator. (#4331) --- airflow/contrib/hooks/gcp_transfer_hook.py | 6 +- .../operators/gcs_to_gcs_transfer_operator.py | 127 +++++++++++++++++ .../operators/s3_to_gcs_transfer_operator.py | 8 +- docs/code.rst | 1 + docs/integration.rst | 8 ++ .../test_gcs_to_gcs_transfer_operator.py | 131 ++++++++++++++++++ .../test_s3_to_gcs_transfer_operator.py | 2 + 7 files changed, 276 insertions(+), 7 deletions(-) create mode 100644 airflow/contrib/operators/gcs_to_gcs_transfer_operator.py create mode 100644 tests/contrib/operators/test_gcs_to_gcs_transfer_operator.py diff --git a/airflow/contrib/hooks/gcp_transfer_hook.py b/airflow/contrib/hooks/gcp_transfer_hook.py index 906dba786fb9c..6966ec3ae244e 100644 --- a/airflow/contrib/hooks/gcp_transfer_hook.py +++ b/airflow/contrib/hooks/gcp_transfer_hook.py @@ -26,7 +26,7 @@ from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook # Time to sleep between active checks of the operation results -TIME_TO_SLEEP_IN_SECONDS = 1 +TIME_TO_SLEEP_IN_SECONDS = 10 # noinspection PyAbstractClass @@ -56,10 +56,10 @@ def get_conn(self): http=http_authorized, cache_discovery=False) return self._conn - def create_transfer_job(self, project_id, description, schedule, transfer_spec): + def create_transfer_job(self, description, schedule, transfer_spec, project_id=None): transfer_job = { 'status': 'ENABLED', - 'projectId': project_id, + 'projectId': project_id or self.project_id, 'description': description, 'transferSpec': transfer_spec, 'schedule': schedule or self._schedule_once_now(), diff --git a/airflow/contrib/operators/gcs_to_gcs_transfer_operator.py b/airflow/contrib/operators/gcs_to_gcs_transfer_operator.py new file mode 100644 index 0000000000000..410d65821d205 --- /dev/null +++ b/airflow/contrib/operators/gcs_to_gcs_transfer_operator.py @@ -0,0 +1,127 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.models import BaseOperator +from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook +from airflow.utils.decorators import apply_defaults + + +class GoogleCloudStorageToGoogleCloudStorageTransferOperator(BaseOperator): + """ + Copies objects from a bucket to another using the GCP Storage Transfer + Service. + + :param source_bucket: The source Google cloud storage bucket where the + object is. (templated) + :type source_bucket: str + :param destination_bucket: The destination Google cloud storage bucket + where the object should be. (templated) + :type destination_bucket: str + :param project_id: The ID of the Google Cloud Platform Console project that + owns the job + :type project_id: str + :param gcp_conn_id: Optional connection ID to use when connecting to Google Cloud + Storage. + :type gcp_conn_id: str + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have + domain-wide delegation enabled. + :type delegate_to: str + :param description: Optional transfer service job description + :type description: str + :param schedule: Optional transfer service schedule; see + https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs. + If not set, run transfer job once as soon as the operator runs + :type schedule: dict + :param object_conditions: Optional transfer service object conditions; see + https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#ObjectConditions + :type object_conditions: dict + :param transfer_options: Optional transfer service transfer options; see + https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#TransferOptions + :type transfer_options: dict + :param wait: Wait for transfer to finish; defaults to `True` + :type wait: bool + + **Example**: + + .. code-block:: python + + gcs_to_gcs_transfer_op = GoogleCloudStorageToGoogleCloudStorageTransferOperator( + task_id='gcs_to_gcs_transfer_example', + source_bucket='my-source-bucket', + destination_bucket='my-destination-bucket', + project_id='my-gcp-project', + dag=my_dag) + """ + + template_fields = ('source_bucket', 'destination_bucket', 'description', 'object_conditions') + ui_color = '#e09411' + + @apply_defaults + def __init__(self, + source_bucket, + destination_bucket, + project_id=None, + gcp_conn_id='google_cloud_default', + delegate_to=None, + description=None, + schedule=None, + object_conditions=None, + transfer_options=None, + wait=True, + *args, + **kwargs): + + super(GoogleCloudStorageToGoogleCloudStorageTransferOperator, self).__init__( + *args, + **kwargs) + self.source_bucket = source_bucket + self.destination_bucket = destination_bucket + self.project_id = project_id + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.description = description + self.schedule = schedule + self.object_conditions = object_conditions or {} + self.transfer_options = transfer_options or {} + self.wait = wait + + def execute(self, context): + transfer_hook = GCPTransferServiceHook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to) + + job = transfer_hook.create_transfer_job( + project_id=self.project_id, + description=self.description, + schedule=self.schedule, + transfer_spec={ + 'gcsDataSource': { + 'bucketName': self.source_bucket, + }, + 'gcsDataSink': { + 'bucketName': self.destination_bucket, + }, + 'objectConditions': self.object_conditions, + 'transferOptions': self.transfer_options, + } + ) + + if self.wait: + transfer_hook.wait_for_transfer_job(job) diff --git a/airflow/contrib/operators/s3_to_gcs_transfer_operator.py b/airflow/contrib/operators/s3_to_gcs_transfer_operator.py index b0cf2ae515572..c46a9460e701c 100644 --- a/airflow/contrib/operators/s3_to_gcs_transfer_operator.py +++ b/airflow/contrib/operators/s3_to_gcs_transfer_operator.py @@ -33,7 +33,7 @@ class S3ToGoogleCloudStorageTransferOperator(BaseOperator): :param gcs_bucket: The destination Google Cloud Storage bucket where you want to store the files. (templated) :type gcs_bucket: str - :param project_id: The ID of the Google Cloud Platform Console project that + :param project_id: Optional ID of the Google Cloud Platform Console project that owns the job :type project_id: str :param aws_conn_id: The source S3 connection @@ -51,10 +51,10 @@ class S3ToGoogleCloudStorageTransferOperator(BaseOperator): https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs. If not set, run transfer job once as soon as the operator runs :type schedule: dict - :param object_conditions: Transfer service object conditions; see + :param object_conditions: Optional transfer service object conditions; see https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec :type object_conditions: dict - :param transfer_options: Transfer service transfer options; see + :param transfer_options: Optional transfer service transfer options; see https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec :type transfer_options: dict :param wait: Wait for transfer to finish @@ -79,7 +79,7 @@ class S3ToGoogleCloudStorageTransferOperator(BaseOperator): def __init__(self, s3_bucket, gcs_bucket, - project_id, + project_id=None, aws_conn_id='aws_default', gcp_conn_id='google_cloud_default', delegate_to=None, diff --git a/docs/code.rst b/docs/code.rst index 6f82166b10ff9..1850bdb14b24f 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -179,6 +179,7 @@ Operators .. autoclass:: airflow.contrib.operators.gcs_operator.GoogleCloudStorageCreateBucketOperator .. autoclass:: airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator .. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator +.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageTransferOperator .. autoclass:: airflow.contrib.operators.gcs_to_s3.GoogleCloudStorageToS3Operator .. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPIOperator .. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPISendRoomNotificationOperator diff --git a/docs/integration.rst b/docs/integration.rst index a04d71d825cb6..c12e9ac57e62d 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -1243,6 +1243,7 @@ Storage Operators - :ref:`GoogleCloudStorageObjectCreateAclEntryOperator` : Creates a new ACL entry on the specified object. - :ref:`GoogleCloudStorageToBigQueryOperator` : Loads files from Google cloud storage into BigQuery. - :ref:`GoogleCloudStorageToGoogleCloudStorageOperator` : Copies objects from a bucket to another, with renaming if requested. +- :ref:`GoogleCloudStorageToGoogleCloudStorageTransferOperator` : Copies objects from a bucket to another using Google Transfer service. - :ref:`MySqlToGoogleCloudStorageOperator`: Copy data from any MySQL Database to Google cloud storage in JSON format. .. _FileToGoogleCloudStorageOperator: @@ -1301,6 +1302,13 @@ GoogleCloudStorageToGoogleCloudStorageOperator .. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator +.. _GoogleCloudStorageToGoogleCloudStorageTransferOperator: + +GoogleCloudStorageToGoogleCloudStorageTransferOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageTransferOperator + .. _MySqlToGoogleCloudStorageOperator: MySqlToGoogleCloudStorageOperator diff --git a/tests/contrib/operators/test_gcs_to_gcs_transfer_operator.py b/tests/contrib/operators/test_gcs_to_gcs_transfer_operator.py new file mode 100644 index 0000000000000..8c0cd4ebfcaa5 --- /dev/null +++ b/tests/contrib/operators/test_gcs_to_gcs_transfer_operator.py @@ -0,0 +1,131 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest + +from airflow.contrib.operators.gcs_to_gcs_transfer_operator import \ + GoogleCloudStorageToGoogleCloudStorageTransferOperator + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + + +TASK_ID = 'test-gcs-gcs-transfer-operator' +SOURCE_BUCKET = 'test-source-bucket' +DESTINATION_BUCKET = 'test-destination-bucket' +PROJECT_ID = 'test-project' +DESCRIPTION = 'test-description' +SCHEDULE = { + 'scheduleStartDate': {'month': 10, 'day': 1, 'year': 2018}, + 'scheduleEndDate': {'month': 10, 'day': 31, 'year': 2018}, +} + + +class GoogleCloudStorageToGoogleCloudStorageTransferOperatorTest(unittest.TestCase): + def test_constructor(self): + """Test GoogleCloudStorageToGoogleCloudStorageTransferOperator instance is properly initialized.""" + + operator = GoogleCloudStorageToGoogleCloudStorageTransferOperator( + task_id=TASK_ID, + source_bucket=SOURCE_BUCKET, + destination_bucket=DESTINATION_BUCKET, + project_id=PROJECT_ID, + description=DESCRIPTION, + schedule=SCHEDULE, + ) + + self.assertEqual(operator.task_id, TASK_ID) + self.assertEqual(operator.source_bucket, SOURCE_BUCKET) + self.assertEqual(operator.destination_bucket, DESTINATION_BUCKET) + self.assertEqual(operator.project_id, PROJECT_ID) + self.assertEqual(operator.description, DESCRIPTION) + self.assertEqual(operator.schedule, SCHEDULE) + + @mock.patch('airflow.contrib.operators.gcs_to_gcs_transfer_operator.GCPTransferServiceHook') + def test_execute(self, mock_transfer_hook): + """Test the execute function when the run is successful.""" + + operator = GoogleCloudStorageToGoogleCloudStorageTransferOperator( + task_id=TASK_ID, + source_bucket=SOURCE_BUCKET, + destination_bucket=DESTINATION_BUCKET, + project_id=PROJECT_ID, + description=DESCRIPTION, + schedule=SCHEDULE, + ) + + operator.execute(None) + + mock_transfer_hook.return_value.create_transfer_job.assert_called_once_with( + project_id=PROJECT_ID, + description=DESCRIPTION, + schedule=SCHEDULE, + transfer_spec={ + 'gcsDataSource': { + 'bucketName': SOURCE_BUCKET, + }, + 'gcsDataSink': { + 'bucketName': DESTINATION_BUCKET, + }, + 'objectConditions': {}, + 'transferOptions': {} + } + ) + + mock_transfer_hook.return_value.wait_for_transfer_job.assert_called_once_with( + mock_transfer_hook.return_value.create_transfer_job.return_value + ) + + @mock.patch('airflow.contrib.operators.gcs_to_gcs_transfer_operator.GCPTransferServiceHook') + def test_execute_skip_wait(self, mock_transfer_hook): + """Test the execute function when the run is successful.""" + + operator = GoogleCloudStorageToGoogleCloudStorageTransferOperator( + task_id=TASK_ID, + source_bucket=SOURCE_BUCKET, + destination_bucket=DESTINATION_BUCKET, + project_id=PROJECT_ID, + description=DESCRIPTION, + wait=False, + ) + + operator.execute(None) + + mock_transfer_hook.return_value.create_transfer_job.assert_called_once_with( + project_id=PROJECT_ID, + description=DESCRIPTION, + schedule=None, + transfer_spec={ + 'gcsDataSource': { + 'bucketName': SOURCE_BUCKET, + }, + 'gcsDataSink': { + 'bucketName': DESTINATION_BUCKET, + }, + 'objectConditions': {}, + 'transferOptions': {} + } + ) + + assert not mock_transfer_hook.return_value.wait_for_transfer_job.called diff --git a/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py b/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py index 2bf51c0707577..0825364884a78 100644 --- a/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py +++ b/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py @@ -59,6 +59,7 @@ def test_constructor(self): gcs_bucket=GCS_BUCKET, project_id=PROJECT_ID, description=DESCRIPTION, + schedule=SCHEDULE, ) self.assertEqual(operator.task_id, TASK_ID) @@ -66,6 +67,7 @@ def test_constructor(self): self.assertEqual(operator.gcs_bucket, GCS_BUCKET) self.assertEqual(operator.project_id, PROJECT_ID) self.assertEqual(operator.description, DESCRIPTION) + self.assertEqual(operator.schedule, SCHEDULE) @mock.patch('airflow.contrib.operators.s3_to_gcs_transfer_operator.GCPTransferServiceHook') @mock.patch('airflow.contrib.operators.s3_to_gcs_transfer_operator.S3Hook')