Skip to content

Commit

Permalink
[AIRFLOW-3346] Add hook and operator for GCP transfer service (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
jmcarp authored and Alice Berard committed Jan 3, 2019
1 parent 3e857a1 commit 4855ec6
Show file tree
Hide file tree
Showing 6 changed files with 456 additions and 0 deletions.
107 changes: 107 additions & 0 deletions airflow/contrib/hooks/gcp_transfer_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import json
import time
import datetime
from googleapiclient.discovery import build

from airflow.exceptions import AirflowException
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook

# Time to sleep between active checks of the operation results
TIME_TO_SLEEP_IN_SECONDS = 1


# noinspection PyAbstractClass
class GCPTransferServiceHook(GoogleCloudBaseHook):
"""
Hook for GCP Storage Transfer Service.
"""
_conn = None

def __init__(self,
api_version='v1',
gcp_conn_id='google_cloud_default',
delegate_to=None):
super(GCPTransferServiceHook, self).__init__(gcp_conn_id, delegate_to)
self.api_version = api_version

def get_conn(self):
"""
Retrieves connection to Google Storage Transfer service.
:return: Google Storage Transfer service object
:rtype: dict
"""
if not self._conn:
http_authorized = self._authorize()
self._conn = build('storagetransfer', self.api_version,
http=http_authorized, cache_discovery=False)
return self._conn

def create_transfer_job(self, project_id, transfer_spec, **kwargs):
conn = self.get_conn()
now = datetime.datetime.utcnow()
transfer_job = {
'status': 'ENABLED',
'projectId': project_id,
'transferSpec': transfer_spec,
'schedule': {
'scheduleStartDate': {
'day': now.day,
'month': now.month,
'year': now.year,
},
'scheduleEndDate': {
'day': now.day,
'month': now.month,
'year': now.year,
}
}
}
transfer_job.update(kwargs)
result = conn.transferJobs().create(body=transfer_job).execute()
self.wait_for_transfer_job(result, conn=conn)

def wait_for_transfer_job(self, job, conn=None):
conn = conn or self.get_conn()
while True:
result = conn.transferOperations().list(
name='transferOperations',
filter=json.dumps({
'project_id': job['projectId'],
'job_names': [job['name']],
}),
).execute()
if self._check_operations_result(result):
return True
time.sleep(TIME_TO_SLEEP_IN_SECONDS)

def _check_operations_result(self, result):
operations = result.get('operations', [])
if len(operations) == 0:
return False
for operation in operations:
if operation['metadata']['status'] in {'FAILED', 'ABORTED'}:
raise AirflowException('Operation {} {}'.format(
operation['name'], operation['metadata']['status']))
if operation['metadata']['status'] != 'SUCCESS':
return False
return True
124 changes: 124 additions & 0 deletions airflow/contrib/operators/s3_to_gcs_transfer_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow.models import BaseOperator
from airflow.hooks.S3_hook import S3Hook
from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook
from airflow.utils.decorators import apply_defaults


class S3ToGoogleCloudStorageTransferOperator(BaseOperator):
"""
Synchronizes an S3 bucket with a Google Cloud Storage bucket using the
GCP Storage Transfer Service.
:param s3_bucket: The S3 bucket where to find the objects. (templated)
:type s3_bucket: str
:param gcs_bucket: The destination Google Cloud Storage bucket
where you want to store the files. (templated)
:type gcs_bucket: str
:param project_id: The ID of the Google Cloud Platform Console project that
owns the job
:type project_id: str
:param aws_conn_id: The source S3 connection
:type aws_conn_id: str
:param gcp_conn_id: The destination connection ID to use
when connecting to Google Cloud Storage.
:type gcp_conn_id: str
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have
domain-wide delegation enabled.
:type delegate_to: str
:param object_conditions: Transfer service object conditions; see
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec
:type object_conditions: dict
:param transfer_options: Transfer service transfer options; see
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec
:type transfer_options: dict
:param job_kwargs: Additional transfer job options; see
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs
:type job_kwargs: dict
**Example**:
.. code-block:: python
s3_to_gcs_transfer_op = S3ToGoogleCloudStorageTransferOperator(
task_id='s3_to_gcs_transfer_example',
s3_bucket='my-s3-bucket',
project_id='my-gcp-project',
gcs_bucket='my-gcs-bucket',
dag=my_dag)
"""

template_fields = ('s3_bucket', 'gcs_bucket')
ui_color = '#e09411'

@apply_defaults
def __init__(self,
s3_bucket,
gcs_bucket,
project_id,
aws_conn_id='aws_default',
gcp_conn_id='google_cloud_default',
delegate_to=None,
object_conditions=None,
transfer_options=None,
job_kwargs=None,
*args,
**kwargs):

super(S3ToGoogleCloudStorageTransferOperator, self).__init__(
*args,
**kwargs)
self.s3_bucket = s3_bucket
self.gcs_bucket = gcs_bucket
self.project_id = project_id
self.aws_conn_id = aws_conn_id
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.object_conditions = object_conditions or {}
self.transfer_options = transfer_options or {}
self.job_kwargs = job_kwargs or {}

def execute(self, context):
transfer_hook = GCPTransferServiceHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to)

s3_creds = S3Hook(aws_conn_id=self.aws_conn_id).get_credentials()

transfer_hook.create_transfer_job(
project_id=self.project_id,
transfer_spec={
'awsS3DataSource': {
'bucketName': self.s3_bucket,
'awsAccessKey': {
'accessKeyId': s3_creds.access_key,
'secretAccessKey': s3_creds.secret_key,
}
},
'gcsDataSink': {
'bucketName': self.gcs_bucket,
},
'objectConditions': self.object_conditions,
'transferOptions': self.transfer_options,
},
**self.job_kwargs
)
2 changes: 2 additions & 0 deletions docs/code.rst
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ Operators
.. autoclass:: airflow.contrib.operators.s3_delete_objects_operator.S3DeleteObjectsOperator
.. autoclass:: airflow.contrib.operators.s3_list_operator.S3ListOperator
.. autoclass:: airflow.contrib.operators.s3_to_gcs_operator.S3ToGoogleCloudStorageOperator
.. autoclass:: airflow.contrib.operators.s3_to_gcs_transfer_operator.S3ToGoogleCloudStorageTransferOperator
.. autoclass:: airflow.contrib.operators.sagemaker_base_operator.SageMakerBaseOperator
.. autoclass:: airflow.contrib.operators.sagemaker_endpoint_operator.SageMakerEndpointOperator
.. autoclass:: airflow.contrib.operators.sagemaker_endpoint_config_operator.SageMakerEndpointConfigOperator
Expand Down Expand Up @@ -434,6 +435,7 @@ Community contributed hooks
.. autoclass:: airflow.contrib.hooks.gcp_mlengine_hook.MLEngineHook
.. autoclass:: airflow.contrib.hooks.gcp_pubsub_hook.PubSubHook
.. autoclass:: airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook
.. autoclass:: airflow.contrib.hooks.gcp_transfer_hook.GCPTransferServiceHook
.. autoclass:: airflow.contrib.hooks.imap_hook.ImapHook
.. autoclass:: airflow.contrib.hooks.jenkins_hook.JenkinsHook
.. autoclass:: airflow.contrib.hooks.jira_hook.JiraHook
Expand Down
14 changes: 14 additions & 0 deletions docs/integration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ AWS S3
- :ref:`S3FileTransformOperator` : Copies data from a source S3 location to a temporary location on the local filesystem.
- :ref:`S3ListOperator` : Lists the files matching a key prefix from a S3 location.
- :ref:`S3ToGoogleCloudStorageOperator` : Syncs an S3 location with a Google Cloud Storage bucket.
- :ref:`S3ToGoogleCloudStorageTransferOperator` : Syncs an S3 bucket with a Google Cloud Storage bucket using the GCP Storage Transfer Service.
- :ref:`S3ToHiveTransfer` : Moves data from S3 to Hive. The operator downloads a file from S3, stores the file locally before loading it into a Hive table.

.. _S3Hook:
Expand Down Expand Up @@ -268,6 +269,13 @@ S3ToGoogleCloudStorageOperator

.. autoclass:: airflow.contrib.operators.s3_to_gcs_operator.S3ToGoogleCloudStorageOperator

.. _S3ToGoogleCloudStorageTransferOperator:

S3ToGoogleCloudStorageTransferOperator
""""""""""""""""""""""""""""""""""""""

.. autoclass:: airflow.contrib.operators.s3_to_gcs_operator.S3ToGoogleCloudStorageTransferOperator

.. _S3ToHiveTransfer:

S3ToHiveTransfer
Expand Down Expand Up @@ -1043,6 +1051,12 @@ GoogleCloudStorageHook
.. autoclass:: airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook
:members:

GCPTransferServiceHook
""""""""""""""""""""""

.. autoclass:: airflow.contrib.hooks.gcp_transfer_hook.GCPTransferServiceHook
:members:

Google Kubernetes Engine
''''''''''''''''''''''''

Expand Down
111 changes: 111 additions & 0 deletions tests/contrib/hooks/test_gcp_transfer_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import json
import datetime
import unittest

from airflow.exceptions import AirflowException
from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook
from airflow.contrib.hooks.gcp_transfer_hook import TIME_TO_SLEEP_IN_SECONDS

try:
from unittest import mock
except ImportError:
try:
import mock
except ImportError:
mock = None


class TestGCPTransferServiceHook(unittest.TestCase):
def setUp(self):
with mock.patch.object(GCPTransferServiceHook, '__init__', return_value=None):
self.conn = mock.Mock()
self.transfer_hook = GCPTransferServiceHook()
self.transfer_hook._conn = self.conn

@mock.patch('airflow.contrib.hooks.gcp_transfer_hook.GCPTransferServiceHook.wait_for_transfer_job')
def test_create_transfer_job(self, mock_wait):
mock_create = self.conn.transferJobs.return_value.create
mock_execute = mock_create.return_value.execute
mock_execute.return_value = {
'projectId': 'test-project',
'name': 'transferJobs/test-job',
}
now = datetime.datetime.utcnow()
transfer_spec = {
'awsS3DataSource': {'bucketName': 'test-s3-bucket'},
'gcsDataSink': {'bucketName': 'test-gcs-bucket'}
}
self.transfer_hook.create_transfer_job('test-project', transfer_spec)
mock_create.assert_called_once_with(body={
'status': 'ENABLED',
'projectId': 'test-project',
'transferSpec': transfer_spec,
'schedule': {
'scheduleStartDate': {
'day': now.day,
'month': now.month,
'year': now.year,
},
'scheduleEndDate': {
'day': now.day,
'month': now.month,
'year': now.year,
}
}
})
mock_wait.assert_called_once_with(mock_execute.return_value, conn=self.conn)

@mock.patch('time.sleep')
def test_wait_for_transfer_job(self, mock_sleep):
mock_list = self.conn.transferOperations.return_value.list
mock_execute = mock_list.return_value.execute
mock_execute.side_effect = [
{'operations': [{'metadata': {'status': 'IN_PROGRESS'}}]},
{'operations': [{'metadata': {'status': 'SUCCESS'}}]},
]
self.transfer_hook.wait_for_transfer_job({
'projectId': 'test-project',
'name': 'transferJobs/test-job',
})
self.assertTrue(mock_list.called)
list_args, list_kwargs = mock_list.call_args_list[0]
self.assertEqual(list_kwargs.get('name'), 'transferOperations')
self.assertEqual(
json.loads(list_kwargs.get('filter')),
{
'project_id': 'test-project',
'job_names': ['transferJobs/test-job']
},
)
mock_sleep.assert_called_once_with(TIME_TO_SLEEP_IN_SECONDS)

def test_wait_for_transfer_job_failed(self):
mock_list = self.conn.transferOperations.return_value.list
mock_execute = mock_list.return_value.execute
mock_execute.side_effect = [
{'operations': [{'name': 'test-job', 'metadata': {'status': 'FAILED'}}]},
]
with self.assertRaises(AirflowException):
self.transfer_hook.wait_for_transfer_job({
'projectId': 'test-project',
'name': 'transferJobs/test-job',
})

0 comments on commit 4855ec6

Please sign in to comment.