Skip to content

Commit

Permalink
Add Google Cloud Storage (GCS) backend (#8868)
Browse files Browse the repository at this point in the history
* Add Google Cloud Storage (GCS) backend

* Add extra google-cloud-storage requirements

* Add gcs backend module

* Add gcs backend to userguide configuration

* Add gcs to backends in README

* Add gcs app Namespace

* Add configuration documentation

* isort

* Cosmetic: fix documentation

* Add tests coverage for .client() method

* Add tests coverage for missing storage import

* Add tests coverage for parse_url()

* Documentation: remove incorrect configuration param.

* Remove unused options
  • Loading branch information
haimjether committed Feb 28, 2024
1 parent 582e169 commit 06e91d9
Show file tree
Hide file tree
Showing 13 changed files with 430 additions and 2 deletions.
3 changes: 3 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ Transports and Backends
:``celery[s3]``:
for using S3 Storage as a result backend.

:``celery[gcs]``:
for using Google Cloud Storage as a result backend.

:``celery[couchbase]``:
for using Couchbase as a result backend.

Expand Down
1 change: 1 addition & 0 deletions celery/app/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
'azureblockblob': 'celery.backends.azureblockblob:AzureBlockBlobBackend',
'arangodb': 'celery.backends.arangodb:ArangoDbBackend',
's3': 'celery.backends.s3:S3Backend',
'gs': 'celery.backends.gcs:GCSBackend',
}


Expand Down
6 changes: 6 additions & 0 deletions celery/app/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ def __repr__(self):
connection_timeout=Option(20, type='int'),
read_timeout=Option(120, type='int'),
),
gcs=Namespace(
bucket=Option(type='string'),
project=Option(type='string'),
base_path=Option('', type='string'),
ttl=Option(0, type='float'),
),
control=Namespace(
queue_ttl=Option(300.0, type='float'),
queue_expires=Option(10.0, type='float'),
Expand Down
141 changes: 141 additions & 0 deletions celery/backends/gcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""Google Cloud Storage result store backend for Celery."""
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from os import getpid
from threading import RLock

from kombu.utils.encoding import bytes_to_str
from kombu.utils.functional import dictfilter
from kombu.utils.url import url_to_parts

from celery.exceptions import ImproperlyConfigured

from .base import KeyValueStoreBackend

try:
import requests
from google.cloud import storage
from google.cloud.storage import Client
from google.cloud.storage.retry import DEFAULT_RETRY
except ImportError:
storage = None

__all__ = ('GCSBackend',)


class GCSBackend(KeyValueStoreBackend):
"""Google Cloud Storage task result backend."""

def __init__(self, **kwargs):
super().__init__(**kwargs)
self._lock = RLock()
self._pid = getpid()
self._retry_policy = DEFAULT_RETRY
self._client = None

if not storage:
raise ImproperlyConfigured(
'You must install google-cloud-storage to use gcs backend'
)
conf = self.app.conf
if self.url:
url_params = self._params_from_url()
conf.update(**dictfilter(url_params))

self.bucket_name = conf.get('gcs_bucket')
if not self.bucket_name:
raise ImproperlyConfigured(
'Missing bucket name: specify gcs_bucket to use gcs backend'
)
self.project = conf.get('gcs_project')
if not self.project:
raise ImproperlyConfigured(
'Missing project:specify gcs_project to use gcs backend'
)
self.base_path = conf.get('gcs_base_path', '').strip('/')
self._threadpool_maxsize = int(conf.get('gcs_threadpool_maxsize', 10))
self.ttl = float(conf.get('gcs_ttl') or 0)
if self.ttl < 0:
raise ImproperlyConfigured(
f'Invalid ttl: {self.ttl} must be greater than or equal to 0'
)
elif self.ttl:
if not self._is_bucket_lifecycle_rule_exists():
raise ImproperlyConfigured(
f'Missing lifecycle rule to use gcs backend with ttl on '
f'bucket: {self.bucket_name}'
)

def get(self, key):
key = bytes_to_str(key)
blob = self._get_blob(key)
try:
return blob.download_as_bytes(retry=self._retry_policy)
except storage.blob.NotFound:
return None

def set(self, key, value):
key = bytes_to_str(key)
blob = self._get_blob(key)
if self.ttl:
blob.custom_time = datetime.utcnow() + timedelta(seconds=self.ttl)
blob.upload_from_string(value, retry=self._retry_policy)

def delete(self, key):
key = bytes_to_str(key)
blob = self._get_blob(key)
if blob.exists():
blob.delete(retry=self._retry_policy)

def mget(self, keys):
with ThreadPoolExecutor() as pool:
return list(pool.map(self.get, keys))

@property
def client(self):
"""Returns a storage client."""

# make sure it's thread-safe, as creating a new client is expensive
with self._lock:
if self._client and self._pid == getpid():
return self._client
# make sure each process gets its own connection after a fork
self._client = Client(project=self.project)
self._pid = getpid()

# config the number of connections to the server
adapter = requests.adapters.HTTPAdapter(
pool_connections=self._threadpool_maxsize,
pool_maxsize=self._threadpool_maxsize,
max_retries=3,
)
client_http = self._client._http
client_http.mount("https://", adapter)
client_http._auth_request.session.mount("https://", adapter)

return self._client

@property
def bucket(self):
return self.client.bucket(self.bucket_name)

def _get_blob(self, key):
key_bucket_path = f'{self.base_path}/{key}' if self.base_path else key
return self.bucket.blob(key_bucket_path)

def _is_bucket_lifecycle_rule_exists(self):
bucket = self.bucket
bucket.reload()
for rule in bucket.lifecycle_rules:
if rule['action']['type'] == 'Delete':
return True
return False

def _params_from_url(self):
url_parts = url_to_parts(self.url)

return {
'gcs_bucket': url_parts.hostname,
'gcs_base_path': url_parts.path,
**url_parts.query,
}
1 change: 1 addition & 0 deletions docs/getting-started/introduction.rst
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ Celery is…
- MongoDB, CouchDB, Couchbase, ArangoDB
- Amazon DynamoDB, Amazon S3
- Microsoft Azure Block Blob, Microsoft Azure Cosmos DB
- Google Cloud Storage
- File system

- **Serialization**
Expand Down
4 changes: 4 additions & 0 deletions docs/includes/installation.txt
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ Transports and Backends
You should probably not use this in your requirements, it's here
for informational purposes only.

:``celery[gcs]``:
for using the Google Cloud Storage as a result backend (*experimental*).



.. _celery-installing-from-source:

Expand Down
11 changes: 11 additions & 0 deletions docs/internals/reference/celery.backends.gcs.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
==========================================
``celery.backends.gcs``
==========================================

.. contents::
:local:
.. currentmodule:: celery.backends.gcs

.. automodule:: celery.backends.gcs
:members:
:undoc-members:
1 change: 1 addition & 0 deletions docs/internals/reference/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
celery.backends.filesystem
celery.backends.cosmosdbsql
celery.backends.s3
celery.backends.gcs
celery.app.trace
celery.app.annotations
celery.app.routes
Expand Down
99 changes: 97 additions & 2 deletions docs/userguide/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,10 @@ Can be one of the following:
Use the `S3`_ to store the results
See :ref:`conf-s3-result-backend`.

* ``gcs``
Use the `GCS`_ to store the results
See :ref:`conf-gcs-result-backend`.

.. warning:
While the AMQP result backend is very efficient, you must make sure
Expand All @@ -750,6 +754,7 @@ Can be one of the following:
.. _`Consul`: https://consul.io/
.. _`AzureBlockBlob`: https://azure.microsoft.com/en-us/services/storage/blobs/
.. _`S3`: https://aws.amazon.com/s3/
.. _`GCS`: https://cloud.google.com/storage/


.. setting:: result_backend_always_retry
Expand Down Expand Up @@ -1798,6 +1803,96 @@ Default: 120.

Timeout in seconds for reading of an azure block blob.

.. _conf-gcs-result-backend:

GCS backend settings
--------------------

.. note::

This gcs backend driver requires :pypi:`google-cloud-storage`.

To install, use :command:`gcs`:

.. code-block:: console
$ pip install celery[gcs]
See :ref:`bundles` for information on combining multiple extension
requirements.

GCS could be configured via the URL provided in :setting:`result_backend`, for example::

result_backend = 'gcs://mybucket/some-prefix?project=myproject&ttl=600'

This backend requires the following configuration directives to be set:

.. setting:: gcs_bucket

``gcs_bucket``
~~~~~~~~~~~~~~

Default: None.

The gcs bucket name. For example::

gcs_bucket = 'bucket_name'

.. setting:: gcs_project

``gcs_project``
~~~~~~~~~~~~~~~

Default: None.

The gcs project name. For example::

gcs_project = 'test-project'

.. setting:: gcs_base_path

``gcs_base_path``
~~~~~~~~~~~~~~~~~

Default: None.

A base path in the gcs bucket to use to store all result keys. For example::

gcs_base_path = '/prefix'

``gcs_ttl``
~~~~~~~~~~~

Default: 0.

The time to live in seconds for the results blobs.
Requires a GCS bucket with "Delete" Object Lifecycle Management action enabled.
Use it to automatically delete results from Cloud Storage Buckets.

For example to auto remove results after 24 hours::

gcs_ttl = 86400

``gcs_threadpool_maxsize``
~~~~~~~~~~~~~~~~~~~~~~~~~~

Default: 10.

Threadpool size for GCS operations. Same value defines the connection pool size.
Allows to control the number of concurrent operations. For example::

gcs_threadpool_maxsize = 20

Example configuration
~~~~~~~~~~~~~~~~~~~~~

.. code-block:: python
gcs_bucket = 'mybucket'
gcs_project = 'myproject'
gcs_base_path = '/celery_result_backend'
gcs_ttl = 86400
.. _conf-elasticsearch-result-backend:

Elasticsearch backend settings
Expand Down Expand Up @@ -2821,7 +2916,7 @@ to the AMQP broker.
If this is set to :const:`None`, we'll retry forever.

``broker_channel_error_retry``
~~~~~~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. versionadded:: 5.3

Expand Down Expand Up @@ -2984,7 +3079,7 @@ prefetch count to its maximum allowable value following a connection loss to the
broker. By default, this setting is enabled.

Upon a connection loss, Celery will attempt to reconnect to the broker automatically,
provided the :setting:`broker_connection_retry_on_startup` or :setting:`broker_connection_retry`
provided the :setting:`broker_connection_retry_on_startup` or :setting:`broker_connection_retry`
is not set to False. During the period of lost connection, the message broker does not keep track
of the number of tasks already fetched. Therefore, to manage the task load effectively and prevent
overloading, Celery reduces the prefetch count based on the number of tasks that are
Expand Down
1 change: 1 addition & 0 deletions requirements/extras/gcs.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
google-cloud-storage>=2.10.0
1 change: 1 addition & 0 deletions requirements/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ pre-commit>=3.6.1; python_version >= '3.9'
-r extras/yaml.txt
-r extras/msgpack.txt
-r extras/mongodb.txt
-r extras/gcs.txt
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
'elasticsearch',
'eventlet',
'gevent',
'gcs',
'librabbitmq',
'memcache',
'mongodb',
Expand Down

0 comments on commit 06e91d9

Please sign in to comment.