From 97d883bad3d103a9cd703e96610484b5fa77fb5f Mon Sep 17 00:00:00 2001 From: Joshua Carp Date: Sun, 16 Dec 2018 23:05:12 -0500 Subject: [PATCH 1/2] [AIRFLOW-3531] Add gcs to gcs transfer operator. --- airflow/contrib/hooks/gcp_transfer_hook.py | 6 +- .../operators/gcs_to_gcs_transfer_operator.py | 127 +++++++++++++++++ .../operators/s3_to_gcs_transfer_operator.py | 8 +- docs/code.rst | 1 + docs/integration.rst | 8 ++ .../test_gcs_to_gcs_transfer_operator.py | 131 ++++++++++++++++++ .../test_s3_to_gcs_transfer_operator.py | 2 + 7 files changed, 276 insertions(+), 7 deletions(-) create mode 100644 airflow/contrib/operators/gcs_to_gcs_transfer_operator.py create mode 100644 tests/contrib/operators/test_gcs_to_gcs_transfer_operator.py diff --git a/airflow/contrib/hooks/gcp_transfer_hook.py b/airflow/contrib/hooks/gcp_transfer_hook.py index 906dba786fb9c..6966ec3ae244e 100644 --- a/airflow/contrib/hooks/gcp_transfer_hook.py +++ b/airflow/contrib/hooks/gcp_transfer_hook.py @@ -26,7 +26,7 @@ from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook # Time to sleep between active checks of the operation results -TIME_TO_SLEEP_IN_SECONDS = 1 +TIME_TO_SLEEP_IN_SECONDS = 10 # noinspection PyAbstractClass @@ -56,10 +56,10 @@ def get_conn(self): http=http_authorized, cache_discovery=False) return self._conn - def create_transfer_job(self, project_id, description, schedule, transfer_spec): + def create_transfer_job(self, description, schedule, transfer_spec, project_id=None): transfer_job = { 'status': 'ENABLED', - 'projectId': project_id, + 'projectId': project_id or self.project_id, 'description': description, 'transferSpec': transfer_spec, 'schedule': schedule or self._schedule_once_now(), diff --git a/airflow/contrib/operators/gcs_to_gcs_transfer_operator.py b/airflow/contrib/operators/gcs_to_gcs_transfer_operator.py new file mode 100644 index 0000000000000..410d65821d205 --- /dev/null +++ b/airflow/contrib/operators/gcs_to_gcs_transfer_operator.py @@ -0,0 +1,127 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.models import BaseOperator +from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook +from airflow.utils.decorators import apply_defaults + + +class GoogleCloudStorageToGoogleCloudStorageTransferOperator(BaseOperator): + """ + Copies objects from a bucket to another using the GCP Storage Transfer + Service. + + :param source_bucket: The source Google cloud storage bucket where the + object is. (templated) + :type source_bucket: str + :param destination_bucket: The destination Google cloud storage bucket + where the object should be. (templated) + :type destination_bucket: str + :param project_id: The ID of the Google Cloud Platform Console project that + owns the job + :type project_id: str + :param gcp_conn_id: Optional connection ID to use when connecting to Google Cloud + Storage. + :type gcp_conn_id: str + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have + domain-wide delegation enabled. + :type delegate_to: str + :param description: Optional transfer service job description + :type description: str + :param schedule: Optional transfer service schedule; see + https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs. + If not set, run transfer job once as soon as the operator runs + :type schedule: dict + :param object_conditions: Optional transfer service object conditions; see + https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#ObjectConditions + :type object_conditions: dict + :param transfer_options: Optional transfer service transfer options; see + https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#TransferOptions + :type transfer_options: dict + :param wait: Wait for transfer to finish; defaults to `True` + :type wait: bool + + **Example**: + + .. code-block:: python + + gcs_to_gcs_transfer_op = GoogleCloudStorageToGoogleCloudStorageTransferOperator( + task_id='gcs_to_gcs_transfer_example', + source_bucket='my-source-bucket', + destination_bucket='my-destination-bucket', + project_id='my-gcp-project', + dag=my_dag) + """ + + template_fields = ('source_bucket', 'destination_bucket', 'description', 'object_conditions') + ui_color = '#e09411' + + @apply_defaults + def __init__(self, + source_bucket, + destination_bucket, + project_id=None, + gcp_conn_id='google_cloud_default', + delegate_to=None, + description=None, + schedule=None, + object_conditions=None, + transfer_options=None, + wait=True, + *args, + **kwargs): + + super(GoogleCloudStorageToGoogleCloudStorageTransferOperator, self).__init__( + *args, + **kwargs) + self.source_bucket = source_bucket + self.destination_bucket = destination_bucket + self.project_id = project_id + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.description = description + self.schedule = schedule + self.object_conditions = object_conditions or {} + self.transfer_options = transfer_options or {} + self.wait = wait + + def execute(self, context): + transfer_hook = GCPTransferServiceHook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to) + + job = transfer_hook.create_transfer_job( + project_id=self.project_id, + description=self.description, + schedule=self.schedule, + transfer_spec={ + 'gcsDataSource': { + 'bucketName': self.source_bucket, + }, + 'gcsDataSink': { + 'bucketName': self.destination_bucket, + }, + 'objectConditions': self.object_conditions, + 'transferOptions': self.transfer_options, + } + ) + + if self.wait: + transfer_hook.wait_for_transfer_job(job) diff --git a/airflow/contrib/operators/s3_to_gcs_transfer_operator.py b/airflow/contrib/operators/s3_to_gcs_transfer_operator.py index b0cf2ae515572..c46a9460e701c 100644 --- a/airflow/contrib/operators/s3_to_gcs_transfer_operator.py +++ b/airflow/contrib/operators/s3_to_gcs_transfer_operator.py @@ -33,7 +33,7 @@ class S3ToGoogleCloudStorageTransferOperator(BaseOperator): :param gcs_bucket: The destination Google Cloud Storage bucket where you want to store the files. (templated) :type gcs_bucket: str - :param project_id: The ID of the Google Cloud Platform Console project that + :param project_id: Optional ID of the Google Cloud Platform Console project that owns the job :type project_id: str :param aws_conn_id: The source S3 connection @@ -51,10 +51,10 @@ class S3ToGoogleCloudStorageTransferOperator(BaseOperator): https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs. If not set, run transfer job once as soon as the operator runs :type schedule: dict - :param object_conditions: Transfer service object conditions; see + :param object_conditions: Optional transfer service object conditions; see https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec :type object_conditions: dict - :param transfer_options: Transfer service transfer options; see + :param transfer_options: Optional transfer service transfer options; see https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec :type transfer_options: dict :param wait: Wait for transfer to finish @@ -79,7 +79,7 @@ class S3ToGoogleCloudStorageTransferOperator(BaseOperator): def __init__(self, s3_bucket, gcs_bucket, - project_id, + project_id=None, aws_conn_id='aws_default', gcp_conn_id='google_cloud_default', delegate_to=None, diff --git a/docs/code.rst b/docs/code.rst index e890adffec8a4..3e1322a341f82 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -181,6 +181,7 @@ Operators .. autoclass:: airflow.contrib.operators.gcs_operator.GoogleCloudStorageCreateBucketOperator .. autoclass:: airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator .. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator +.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageTransferOperator .. autoclass:: airflow.contrib.operators.gcs_to_s3.GoogleCloudStorageToS3Operator .. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPIOperator .. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPISendRoomNotificationOperator diff --git a/docs/integration.rst b/docs/integration.rst index f35c8e87ea605..3c72a2821a426 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -1184,6 +1184,7 @@ Storage Operators - :ref:`GoogleCloudStorageObjectCreateAclEntryOperator` : Creates a new ACL entry on the specified object. - :ref:`GoogleCloudStorageToBigQueryOperator` : Loads files from Google cloud storage into BigQuery. - :ref:`GoogleCloudStorageToGoogleCloudStorageOperator` : Copies objects from a bucket to another, with renaming if requested. +- :ref:`GoogleCloudStorageToGoogleCloudStorageTransferOperator` : Copies objects from a bucket to another. - :ref:`MySqlToGoogleCloudStorageOperator`: Copy data from any MySQL Database to Google cloud storage in JSON format. .. _FileToGoogleCloudStorageOperator: @@ -1242,6 +1243,13 @@ GoogleCloudStorageToGoogleCloudStorageOperator .. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator +.. _GoogleCloudStorageToGoogleCloudStorageTransferOperator: + +GoogleCloudStorageToGoogleCloudStorageTransferOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageTransferOperator + .. _MySqlToGoogleCloudStorageOperator: MySqlToGoogleCloudStorageOperator diff --git a/tests/contrib/operators/test_gcs_to_gcs_transfer_operator.py b/tests/contrib/operators/test_gcs_to_gcs_transfer_operator.py new file mode 100644 index 0000000000000..8c0cd4ebfcaa5 --- /dev/null +++ b/tests/contrib/operators/test_gcs_to_gcs_transfer_operator.py @@ -0,0 +1,131 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest + +from airflow.contrib.operators.gcs_to_gcs_transfer_operator import \ + GoogleCloudStorageToGoogleCloudStorageTransferOperator + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + + +TASK_ID = 'test-gcs-gcs-transfer-operator' +SOURCE_BUCKET = 'test-source-bucket' +DESTINATION_BUCKET = 'test-destination-bucket' +PROJECT_ID = 'test-project' +DESCRIPTION = 'test-description' +SCHEDULE = { + 'scheduleStartDate': {'month': 10, 'day': 1, 'year': 2018}, + 'scheduleEndDate': {'month': 10, 'day': 31, 'year': 2018}, +} + + +class GoogleCloudStorageToGoogleCloudStorageTransferOperatorTest(unittest.TestCase): + def test_constructor(self): + """Test GoogleCloudStorageToGoogleCloudStorageTransferOperator instance is properly initialized.""" + + operator = GoogleCloudStorageToGoogleCloudStorageTransferOperator( + task_id=TASK_ID, + source_bucket=SOURCE_BUCKET, + destination_bucket=DESTINATION_BUCKET, + project_id=PROJECT_ID, + description=DESCRIPTION, + schedule=SCHEDULE, + ) + + self.assertEqual(operator.task_id, TASK_ID) + self.assertEqual(operator.source_bucket, SOURCE_BUCKET) + self.assertEqual(operator.destination_bucket, DESTINATION_BUCKET) + self.assertEqual(operator.project_id, PROJECT_ID) + self.assertEqual(operator.description, DESCRIPTION) + self.assertEqual(operator.schedule, SCHEDULE) + + @mock.patch('airflow.contrib.operators.gcs_to_gcs_transfer_operator.GCPTransferServiceHook') + def test_execute(self, mock_transfer_hook): + """Test the execute function when the run is successful.""" + + operator = GoogleCloudStorageToGoogleCloudStorageTransferOperator( + task_id=TASK_ID, + source_bucket=SOURCE_BUCKET, + destination_bucket=DESTINATION_BUCKET, + project_id=PROJECT_ID, + description=DESCRIPTION, + schedule=SCHEDULE, + ) + + operator.execute(None) + + mock_transfer_hook.return_value.create_transfer_job.assert_called_once_with( + project_id=PROJECT_ID, + description=DESCRIPTION, + schedule=SCHEDULE, + transfer_spec={ + 'gcsDataSource': { + 'bucketName': SOURCE_BUCKET, + }, + 'gcsDataSink': { + 'bucketName': DESTINATION_BUCKET, + }, + 'objectConditions': {}, + 'transferOptions': {} + } + ) + + mock_transfer_hook.return_value.wait_for_transfer_job.assert_called_once_with( + mock_transfer_hook.return_value.create_transfer_job.return_value + ) + + @mock.patch('airflow.contrib.operators.gcs_to_gcs_transfer_operator.GCPTransferServiceHook') + def test_execute_skip_wait(self, mock_transfer_hook): + """Test the execute function when the run is successful.""" + + operator = GoogleCloudStorageToGoogleCloudStorageTransferOperator( + task_id=TASK_ID, + source_bucket=SOURCE_BUCKET, + destination_bucket=DESTINATION_BUCKET, + project_id=PROJECT_ID, + description=DESCRIPTION, + wait=False, + ) + + operator.execute(None) + + mock_transfer_hook.return_value.create_transfer_job.assert_called_once_with( + project_id=PROJECT_ID, + description=DESCRIPTION, + schedule=None, + transfer_spec={ + 'gcsDataSource': { + 'bucketName': SOURCE_BUCKET, + }, + 'gcsDataSink': { + 'bucketName': DESTINATION_BUCKET, + }, + 'objectConditions': {}, + 'transferOptions': {} + } + ) + + assert not mock_transfer_hook.return_value.wait_for_transfer_job.called diff --git a/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py b/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py index 2bf51c0707577..0825364884a78 100644 --- a/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py +++ b/tests/contrib/operators/test_s3_to_gcs_transfer_operator.py @@ -59,6 +59,7 @@ def test_constructor(self): gcs_bucket=GCS_BUCKET, project_id=PROJECT_ID, description=DESCRIPTION, + schedule=SCHEDULE, ) self.assertEqual(operator.task_id, TASK_ID) @@ -66,6 +67,7 @@ def test_constructor(self): self.assertEqual(operator.gcs_bucket, GCS_BUCKET) self.assertEqual(operator.project_id, PROJECT_ID) self.assertEqual(operator.description, DESCRIPTION) + self.assertEqual(operator.schedule, SCHEDULE) @mock.patch('airflow.contrib.operators.s3_to_gcs_transfer_operator.GCPTransferServiceHook') @mock.patch('airflow.contrib.operators.s3_to_gcs_transfer_operator.S3Hook') From d096240866eb1a9ba8ff9fbcfcdf9fff32ecc16c Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Sun, 6 Jan 2019 19:35:08 +0000 Subject: [PATCH 2/2] Update integration.rst --- docs/integration.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integration.rst b/docs/integration.rst index 3c72a2821a426..74f5552ae557e 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -1184,7 +1184,7 @@ Storage Operators - :ref:`GoogleCloudStorageObjectCreateAclEntryOperator` : Creates a new ACL entry on the specified object. - :ref:`GoogleCloudStorageToBigQueryOperator` : Loads files from Google cloud storage into BigQuery. - :ref:`GoogleCloudStorageToGoogleCloudStorageOperator` : Copies objects from a bucket to another, with renaming if requested. -- :ref:`GoogleCloudStorageToGoogleCloudStorageTransferOperator` : Copies objects from a bucket to another. +- :ref:`GoogleCloudStorageToGoogleCloudStorageTransferOperator` : Copies objects from a bucket to another using Google Transfer service. - :ref:`MySqlToGoogleCloudStorageOperator`: Copy data from any MySQL Database to Google cloud storage in JSON format. .. _FileToGoogleCloudStorageOperator: