Skip to content
Permalink
Browse files

AWS DynamoDB result backend (#3736)

* Add result backend for AWS DynamoDB

* Dependencies for DynamoDB result backend

* Add DynamoDB backend in aliases

* Test cases for DynamoDB result backend

* Documentation for DynamoDB backend

* Configurable endpoint URL for DynamoDB local instance

* Enable integration tests for DynamoDB result backend

- Run before_install script only for integration environments

* Fix invalid type error for primary key in Python3

* Add Python 3.6 in Travis CI build matrix

- Instruct Travis CI to include Python 3.6 interpreter in jobs
- Optimize Travis CI build matrix

* Optimize Travis CI build matrix

* Fix endless loop in logger_isa (Python 3.6)

* Add test cases for AWS client construction

- Add/improve log messages during table initialization
- Enable skipped unit tests due to missing dependency boto3

* Use explicit hash seed value for apicheck tox environment

- Related Sphinx issue: sphinx-doc/sphinx#2324
  • Loading branch information...
georgepsarakis authored and thedrow committed Jan 23, 2017
1 parent 7d1588e commit 9c950b47eca2b4e93fd2fe52cf80f158e6cf97ad
@@ -2,31 +2,42 @@ language: python
sudo: required
cache: false
python:
- '3.5'
- '2.7'
- '3.4'
- '3.5'
- '3.6'
os:
- linux
env:
global:
PYTHONUNBUFFERED=yes
- PYTHONUNBUFFERED=yes
matrix:
- TOXENV=2.7-unit
- TOXENV=2.7-integration-rabbitmq
- TOXENV=2.7-integration-redis
- TOXENV=3.4-unit
- TOXENV=3.4-integration-rabbitmq
- TOXENV=3.4-integration-redis
- TOXENV=3.5-unit
- TOXENV=3.5-integration-rabbitmq
- TOXENV=3.5-integration-redis
- TOXENV=pypy-unit PYPY_VERSION="5.3"
- TOXENV=pypy-integration-rabbitmq PYPY_VERSION="5.3"
- TOXENV=pypy-integration-redis PYPY_VERSION="5.3"
- TOXENV=flake8
- TOXENV=flakeplus
- TOXENV=apicheck
- TOXENV=configcheck
- TOXENV=pydocstyle
- MATRIX_TOXENV=unit
- MATRIX_TOXENV=integration-rabbitmq
- MATRIX_TOXENV=integration-redis
- MATRIX_TOXENV=integration-dynamodb
matrix:
include:
- python: '3.5'
env: TOXENV=pypy-unit PYPY_VERSION="5.3"
- python: '3.5'
env: TOXENV=pypy-integration-rabbitmq PYPY_VERSION="5.3"
- python: '3.5'
env: TOXENV=pypy-integration-redis PYPY_VERSION="5.3"
- python: '3.5'
env: TOXENV=pypy-integration-dynamodb PYPY_VERSION="5.3"
- python: '3.5'
env: TOXENV=flake8
- python: '3.5'
env: TOXENV=flakeplus
- python: '3.5'
env: TOXENV=apicheck
- python: '3.5'
env: TOXENV=configcheck
- python: '3.5'
env: TOXENV=pydocstyle
before_install:
- if [[ -v MATRIX_TOXENV ]]; then export TOXENV=${TRAVIS_PYTHON_VERSION}-${MATRIX_TOXENV}; fi; env
- |
if [ "$TOXENV" = "pypy" ]; then
export PYENV_ROOT="$HOME/.pyenv"
@@ -39,6 +50,20 @@ before_install:
virtualenv --python="$PYENV_ROOT/versions/pypy-$PYPY_VERSION/bin/python" "$HOME/virtualenvs/pypy-$PYPY_VERSION"
source "$HOME/virtualenvs/pypy-$PYPY_VERSION/bin/activate"
fi
- |
if [[ "$TOXENV" == *dynamodb ]]; then
sudo apt-get install -y default-jre supervisor
mkdir /opt/dynamodb-local
cd /opt/dynamodb-local && curl -L http://dynamodb-local.s3-website-us-west-2.amazonaws.com/dynamodb_local_latest.tar.gz | tar zx
cd -
echo '[program:dynamodb-local]' | sudo tee /etc/supervisor/conf.d/dynamodb-local.conf
echo 'command=java -Djava.library.path=./DynamoDBLocal_lib -jar DynamoDBLocal.jar -inMemory' | sudo tee -a /etc/supervisor/conf.d/dynamodb-local.conf
echo 'directory=/opt/dynamodb-local' | sudo tee -a /etc/supervisor/conf.d/dynamodb-local.conf
sudo service supervisor stop
sudo service supervisor start
sleep 10
curl localhost:8000
fi
after_success:
- .tox/$TRAVIS_PYTHON_VERSION/bin/coverage xml
- .tox/$TRAVIS_PYTHON_VERSION/bin/codecov -e TOXENV
@@ -29,7 +29,8 @@
'riak': 'celery.backends.riak:RiakBackend',
'file': 'celery.backends.filesystem:FilesystemBackend',
'disabled': 'celery.backends.base:DisabledBackend',
'consul': 'celery.backends.consul:ConsulBackend'
'consul': 'celery.backends.consul:ConsulBackend',
'dynamodb': 'celery.backends.dynamodb:DynamoDBBackend',
}


@@ -0,0 +1,275 @@
# -*- coding: utf-8 -*-
"""AWS DynamoDB result store backend."""
from __future__ import absolute_import, unicode_literals
from collections import namedtuple
from time import time, sleep

from kombu.utils.url import _parse_url as parse_url
from celery.exceptions import ImproperlyConfigured
from celery.utils.log import get_logger
from celery.five import string
from .base import KeyValueStoreBackend
try:
import boto3
from botocore.exceptions import ClientError
except ImportError: # pragma: no cover
boto3 = ClientError = None # noqa

__all__ = ['DynamoDBBackend']


# Helper class that describes a DynamoDB attribute
DynamoDBAttribute = namedtuple('DynamoDBAttribute', ('name', 'data_type'))

logger = get_logger(__name__)


class DynamoDBBackend(KeyValueStoreBackend):
"""AWS DynamoDB result backend.
Raises:
celery.exceptions.ImproperlyConfigured:
if module :pypi:`boto3` is not available.
"""

#: default DynamoDB table name (`default`)
table_name = 'celery'

#: Read Provisioned Throughput (`default`)
read_capacity_units = 1

#: Write Provisioned Throughput (`default`)
write_capacity_units = 1

#: AWS region (`default`)
aws_region = None

#: The endpoint URL that is passed to boto3 (local DynamoDB) (`default`)
endpoint_url = None

_key_field = DynamoDBAttribute(name='id', data_type='S')
_value_field = DynamoDBAttribute(name='result', data_type='B')
_timestamp_field = DynamoDBAttribute(name='timestamp', data_type='N')
_available_fields = None

def __init__(self, url=None, table_name=None, *args, **kwargs):
super(DynamoDBBackend, self).__init__(*args, **kwargs)

self.url = url
self.table_name = table_name or self.table_name

if not boto3:
raise ImproperlyConfigured(
'You need to install the boto3 library to use the '
'DynamoDB backend.')

aws_credentials_given = False
aws_access_key_id = None
aws_secret_access_key = None

if url is not None:
scheme, region, port, username, password, table, query = \
parse_url(url)

aws_access_key_id = username
aws_secret_access_key = password

access_key_given = aws_access_key_id is not None
secret_key_given = aws_secret_access_key is not None

if access_key_given != secret_key_given:
raise ImproperlyConfigured(
'You need to specify both the Access Key ID '
'and Secret.')

aws_credentials_given = access_key_given

if region == 'localhost':
# We are using the downloadable, local version of DynamoDB
self.endpoint_url = 'http://localhost:{}'.format(port)
self.aws_region = 'us-east-1'
logger.warning(
'Using local-only DynamoDB endpoint URL: {}'.format(
self.endpoint_url
)
)
else:
self.aws_region = region

self.read_capacity_units = int(
query.get(
'read',
self.read_capacity_units
)
)
self.write_capacity_units = int(
query.get(
'write',
self.write_capacity_units
)
)
self.table_name = table or self.table_name

self._available_fields = (
self._key_field,
self._value_field,
self._timestamp_field
)

self._client = None
if aws_credentials_given:
self._get_client(
access_key_id=aws_access_key_id,
secret_access_key=aws_secret_access_key
)

def _get_client(self, access_key_id=None, secret_access_key=None):
"""Get client connection."""
if self._client is None:
client_parameters = dict(
region_name=self.aws_region
)
if access_key_id is not None:
client_parameters.update(dict(
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key
))

if self.endpoint_url is not None:
client_parameters['endpoint_url'] = self.endpoint_url

self._client = boto3.client(
'dynamodb',
**client_parameters
)
self._get_or_create_table()
return self._client

def _get_table_schema(self):
"""Get the boto3 structure describing the DynamoDB table schema."""
return dict(
AttributeDefinitions=[
{
'AttributeName': self._key_field.name,
'AttributeType': self._key_field.data_type
}
],
TableName=self.table_name,
KeySchema=[
{
'AttributeName': self._key_field.name,
'KeyType': 'HASH'
}
],
ProvisionedThroughput={
'ReadCapacityUnits': self.read_capacity_units,
'WriteCapacityUnits': self.write_capacity_units
}
)

def _get_or_create_table(self):
"""Create table if not exists, otherwise return the description."""
table_schema = self._get_table_schema()
try:
table_description = self._client.create_table(**table_schema)
logger.info(
'DynamoDB Table {} did not exist, creating.'.format(
self.table_name
)
)
# In case we created the table, wait until it becomes available.
self._wait_for_table_status('ACTIVE')
logger.info(
'DynamoDB Table {} is now available.'.format(
self.table_name
)
)
return table_description
except ClientError as e:
error_code = e.response['Error'].get('Code', 'Unknown')

# If table exists, do not fail, just return the description.
if error_code == 'ResourceInUseException':
return self._client.describe_table(
TableName=self.table_name
)
else:
raise e

def _wait_for_table_status(self, expected='ACTIVE'):
"""Poll for the expected table status."""
achieved_state = False
while not achieved_state:
table_description = self.client.describe_table(
TableName=self.table_name
)
logger.debug(
'Waiting for DynamoDB table {} to become {}.'.format(
self.table_name,
expected
)
)
current_status = table_description['Table']['TableStatus']
achieved_state = current_status == expected
sleep(1)

def _prepare_get_request(self, key):
"""Construct the item retrieval request parameters."""
return dict(
TableName=self.table_name,
Key={
self._key_field.name: {
self._key_field.data_type: key
}
}
)

def _prepare_put_request(self, key, value):
"""Construct the item creation request parameters."""
return dict(
TableName=self.table_name,
Item={
self._key_field.name: {
self._key_field.data_type: key
},
self._value_field.name: {
self._value_field.data_type: value
},
self._timestamp_field.name: {
self._timestamp_field.data_type: str(time())
}
}
)

def _item_to_dict(self, raw_response):
"""Convert get_item() response to field-value pairs."""
if 'Item' not in raw_response:
return {}
return {
field.name: raw_response['Item'][field.name][field.data_type]
for field in self._available_fields
}

@property
def client(self):
return self._get_client()

def get(self, key):
key = string(key)
request_parameters = self._prepare_get_request(key)
item_response = self.client.get_item(**request_parameters)
item = self._item_to_dict(item_response)
return item.get(self._value_field.name)

def set(self, key, value):
key = string(key)
request_parameters = self._prepare_put_request(key, value)
self.client.put_item(**request_parameters)

def mget(self, keys):
return [self.get(key) for key in keys]

def delete(self, key):
key = string(key)
request_parameters = self._prepare_get_request(key)
self.client.delete_item(**request_parameters)
@@ -82,7 +82,7 @@ def logger_isa(l, p, max=1000):
else:
if this in seen:
raise RuntimeError(
'Logger {0!r} parents recursive'.format(l),
'Logger {0!r} parents recursive'.format(l.name),
)
seen.add(this)
this = this.parent
@@ -88,6 +88,9 @@ Transports and Backends
:``celery[riak]``:
for using Riak as a result backend.

:``celery[dynamodb]``:
for using AWS DynamoDB as a result backend.

:``celery[zookeeper]``:
for using Zookeeper as a message transport.

@@ -0,0 +1,11 @@
===========================================
``celery.backends.dynamodb``
===========================================

.. contents::
:local:
.. currentmodule:: celery.backends.dynamodb

.. automodule:: celery.backends.dynamodb
:members:
:undoc-members:

0 comments on commit 9c950b4

Please sign in to comment.
You can’t perform that action at this time.