Skip to content

Commit

Permalink
[AIRFLOW-1618] Add feature to create GCS bucket
Browse files Browse the repository at this point in the history
- Added `create_bucket` method to `gcs_hook` and
created corresponding operator
`GoogleCloudStorageCreateBucket`
- Added tests
- Added documentation

Closes #3044 from kaxil/AIRFLOW-1618
  • Loading branch information
kaxil authored and Fokko Driesprong committed Feb 19, 2018
1 parent faaf0b8 commit 3fe06e9
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 0 deletions.
85 changes: 85 additions & 0 deletions airflow/contrib/hooks/gcs_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,91 @@ def get_md5hash(self, bucket, object):
if ex.resp['status'] == '404':
raise ValueError('Object Not Found')

def create_bucket(self,
bucket_name,
storage_class='MULTI_REGIONAL',
location='US',
project_id=None,
labels=None
):
"""
Creates a new bucket. Google Cloud Storage uses a flat namespace, so
you can't create a bucket with a name that is already in use.
.. seealso::
For more information, see Bucket Naming Guidelines:
https://cloud.google.com/storage/docs/bucketnaming.html#requirements
:param bucket_name: The name of the bucket.
:type bucket_name: string
:param storage_class: This defines how objects in the bucket are stored
and determines the SLA and the cost of storage. Values include
- ``MULTI_REGIONAL``
- ``REGIONAL``
- ``STANDARD``
- ``NEARLINE``
- ``COLDLINE``.
If this value is not specified when the bucket is
created, it will default to STANDARD.
:type storage_class: string
:param location: The location of the bucket.
Object data for objects in the bucket resides in physical storage
within this region. Defaults to US.
.. seealso::
https://developers.google.com/storage/docs/bucket-locations
:type location: string
:param project_id: The ID of the GCP Project.
:type project_id: string
:param labels: User-provided labels, in key/value pairs.
:type labels: dict
:return: If successful, it returns the ``id`` of the bucket.
"""

project_id = project_id if project_id is not None else self.project_id
storage_classes = [
'MULTI_REGIONAL',
'REGIONAL',
'NEARLINE',
'COLDLINE',
'STANDARD', # alias for MULTI_REGIONAL/REGIONAL, based on location
]

self.log.info('Creating Bucket: %s; Location: %s; Storage Class: %s',
bucket_name, location, storage_class)
assert storage_class in storage_classes, \
'Invalid value ({}) passed to storage_class. Value should be ' \
'one of {}'.format(storage_class, storage_classes)

service = self.get_conn()
bucket_resource = {
'name': bucket_name,
'location': location,
'storageClass': storage_class
}

self.log.info('The Default Project ID is %s', self.project_id)

if labels is not None:
bucket_resource['labels'] = labels

try:
response = service.buckets().insert(
project=project_id,
body=bucket_resource
).execute()

self.log.info('Bucket: %s created successfully.', bucket_name)

return response['id']

except errors.HttpError as ex:
raise AirflowException(
'Bucket creation failed. Error was: {}'.format(ex.content)
)


def _parse_gcs_url(gsurl):
"""
Expand Down
117 changes: 117 additions & 0 deletions airflow/contrib/operators/gcs_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.version import version


class GoogleCloudStorageCreateBucketOperator(BaseOperator):
"""
Creates a new bucket. Google Cloud Storage uses a flat namespace,
so you can't create a bucket with a name that is already in use.
.. seealso::
For more information, see Bucket Naming Guidelines:
https://cloud.google.com/storage/docs/bucketnaming.html#requirements
:param bucket_name: The name of the bucket.
:type bucket_name: string
:param storage_class: This defines how objects in the bucket are stored
and determines the SLA and the cost of storage. Values include
- ``MULTI_REGIONAL``
- ``REGIONAL``
- ``STANDARD``
- ``NEARLINE``
- ``COLDLINE``.
If this value is not specified when the bucket is
created, it will default to STANDARD.
:type storage_class: string
:param location: The location of the bucket.
Object data for objects in the bucket resides in physical storage
within this region. Defaults to US.
.. seealso::
https://developers.google.com/storage/docs/bucket-locations
:type location: string
:param project_id: The ID of the GCP Project.
:type project_id: string
:param labels: User-provided labels, in key/value pairs.
:type labels: dict
:param google_cloud_storage_conn_id: The connection ID to use when
connecting to Google cloud storage.
:type google_cloud_storage_conn_id: string
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must
have domain-wide delegation enabled.
:type delegate_to: string
**Example**:
The following Operator would create a new bucket ``test-bucket``
with ``MULTI_REGIONAL`` storage class in ``EU`` region ::
CreateBucket = GoogleCloudStorageCreateBucketOperator(
task_id='CreateNewBucket',
bucket_name='test-bucket',
storage_class='MULTI_REGIONAL',
location='EU',
labels={'env': 'dev', 'team': 'airflow'},
google_cloud_storage_conn_id='airflow-service-account'
)
"""

template_fields = ('bucket_name', 'storage_class',
'location', 'project_id')
ui_color = '#f0eee4'

@apply_defaults
def __init__(self,
bucket_name,
storage_class='MULTI_REGIONAL',
location='US',
project_id=None,
labels=None,
google_cloud_storage_conn_id='google_cloud_storage_default',
delegate_to=None,
*args,
**kwargs):
super(GoogleCloudStorageCreateBucketOperator, self).__init__(*args, **kwargs)
self.bucket_name = bucket_name
self.storage_class = storage_class
self.location = location
self.project_id = project_id
self.labels = labels

self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
self.delegate_to = delegate_to

def execute(self, context):
if self.labels is not None:
self.labels.update(
{'airflow-version': 'v' + version.replace('.', '-').replace('+', '-')}
)

hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to
)

hook.create_bucket(bucket_name=self.bucket_name,
storage_class=self.storage_class,
location=self.location,
project_id=self.project_id,
labels=self.labels)
1 change: 1 addition & 0 deletions docs/code.rst
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ Community-contributed Operators
.. autoclass:: airflow.contrib.operators.gcs_copy_operator.GoogleCloudStorageCopyOperator
.. autoclass:: airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator
.. autoclass:: airflow.contrib.operators.gcs_list_operator.GoogleCloudStorageListOperator
.. autoclass:: airflow.contrib.operators.gcs_operator.GoogleCloudStorageCreateBucketOperator
.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator
.. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubTopicCreateOperator
.. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubTopicDeleteOperator
Expand Down
8 changes: 8 additions & 0 deletions docs/integration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ Storage Operators

- :ref:`FileToGoogleCloudStorageOperator` : Uploads a file to Google Cloud Storage.
- :ref:`GoogleCloudStorageCopyOperator` : Copies objects (optionally from a directory) filtered by 'delimiter' (file extension for e.g .json) from a bucket to another bucket in a different directory, if required.
- :ref:`GoogleCloudStorageCreateBucketOperator` : Creates a new cloud storage bucket.
- :ref:`GoogleCloudStorageListOperator` : List all objects from the bucket with the give string prefix and delimiter in name.
- :ref:`GoogleCloudStorageDownloadOperator` : Downloads a file from Google Cloud Storage.
- :ref:`GoogleCloudStorageToBigQueryOperator` : Loads files from Google cloud storage into BigQuery.
Expand All @@ -706,6 +707,13 @@ GoogleCloudStorageCopyOperator

.. autoclass:: airflow.contrib.operators.gcs_copy_operator.GoogleCloudStorageCopyOperator

.. _GoogleCloudStorageCreateBucketOperator:

GoogleCloudStorageCreateBucketOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. autoclass:: airflow.contrib.operators.gcs_operator.GoogleCloudStorageCreateBucketOperator

.. _GoogleCloudStorageDownloadOperator:

GoogleCloudStorageDownloadOperator
Expand Down
50 changes: 50 additions & 0 deletions tests/contrib/operators/test_gcs_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest

from airflow.contrib.operators.gcs_operator import GoogleCloudStorageCreateBucketOperator

try:
from unittest import mock
except ImportError:
try:
import mock
except ImportError:
mock = None

TASK_ID = 'test-gcs-operator'
TEST_BUCKET = 'test-bucket'
TEST_PROJECT = 'test-project'


class GoogleCloudStorageCreateBucketTest(unittest.TestCase):

@mock.patch('airflow.contrib.operators.gcs_operator.GoogleCloudStorageHook')
def test_execute(self, mock_hook):
operator = GoogleCloudStorageCreateBucketOperator(
task_id=TASK_ID,
bucket_name=TEST_BUCKET,
storage_class='MULTI_REGIONAL',
location='EU',
labels={'env': 'prod'},
project_id=TEST_PROJECT
)

operator.execute(None)
mock_hook.return_value.create_bucket.assert_called_once_with(
bucket_name=TEST_BUCKET, storage_class='MULTI_REGIONAL',
location='EU', labels={'airflow-version': 'v1-10-0dev0-incubating',
'env': 'prod'}, project_id=TEST_PROJECT
)

0 comments on commit 3fe06e9

Please sign in to comment.