Skip to content

Commit

Permalink
Merge c516034 into 3fd79dc
Browse files Browse the repository at this point in the history
  • Loading branch information
akursar committed Apr 8, 2021
2 parents 3fd79dc + c516034 commit 3c41a04
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 109 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
- uses: supercharge/redis-github-action@1.2.0
with:
redis-version: '6'
# - uses: rrainn/dynamodb-action@v2.0.0
- uses: rrainn/dynamodb-action@v2.0.0

# Cache packages per python version, and reuse until setup.py changes
- name: Cache pip packages
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ Several backends are available. If one isn't specified, a non-persistent in-memo
(requires [aioredis](https://github.com/aio-libs/aioredis-py))
* `MongoDBBackend`: Uses a [MongoDB](https://www.mongodb.com/) database
(requires [motor](https://motor.readthedocs.io))

**Incomplete:**
* `DynamoDBBackend`: Uses a [Amazon DynamoDB](https://aws.amazon.com/dynamodb/) database
(requires [boto3](https://github.com/boto/boto3))
(requires [aioboto3](https://github.com/terrycain/aioboto3))

**Incomplete:**
* `GridFSBackend`: Uses a [MongoDB GridFS](https://docs.mongodb.com/manual/core/gridfs/) database,
which enables storage of documents greater than 16MB
(requires [pymongo](https://pymongo.readthedocs.io/en/stable/))
Expand Down
228 changes: 124 additions & 104 deletions aiohttp_client_cache/backends/dynamodb.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
# TODO: Use BaseCache.serialize() and deserialize()
import pickle
from typing import Dict, Iterable
from typing import AsyncIterable, Dict

import boto3
from boto3.resources.base import ServiceResource
import aioboto3
from aioboto3.session import ResourceCreatorContext
from botocore.exceptions import ClientError

from aiohttp_client_cache.backends import BaseCache, CacheBackend, ResponseOrKey
Expand All @@ -21,129 +19,151 @@ class DynamoDBBackend(CacheBackend):
"""

@extend_signature(CacheBackend.__init__)
def __init__(self, cache_name: str = 'aiohttp-cache', **kwargs):
def __init__(
self,
cache_name: str = 'aiohttp-cache',
key_attr_name: str = 'k',
val_attr_name: str = 'v',
create_if_not_exists: bool = False,
context: ResourceCreatorContext = None,
**kwargs,
):
super().__init__(cache_name=cache_name, **kwargs)
self.responses = DynamoDbCache(cache_name, 'responses', **kwargs)
if not context:
context = aioboto3.resource("dynamodb")
self.responses = DynamoDbCache(
cache_name, 'resp', key_attr_name, val_attr_name, create_if_not_exists, context
)
self.redirects = DynamoDbCache(
cache_name, 'redirects', connection=self.responses.connection
cache_name, 'redir', key_attr_name, val_attr_name, create_if_not_exists, context
)


# TODO: Incomplete/untested
# TODO: Fully async implementation. Current implementation with boto3 uses blocking operations.
# Methods are currently defined as async only for compatibility with BaseCache API.
class DynamoDbCache(BaseCache):
"""An async-compatible interface for caching objects in a DynamoDB key-store
The actual key name on the dynamodb server will be ``namespace:table_name``.
In order to deal with how dynamodb stores data/keys, all values must be pickled.
The actual key name on the dynamodb server will be ``namespace:key``.
In order to deal with how dynamodb stores data/keys, all values must be serialized.
Args:
table_name: Table name to use
namespace: Name of the hash map stored in dynamodb
connection: An existing resource object to reuse instead of creating a new one
region_name: AWS region of DynamoDB database
kwargs: Additional keyword arguments for DynamoDB :py:class:`.ServiceResource`
namespace: Prefix to be prepended to key in the DynamoDB document
key_attr_name: The name of the field to use for keys in the DynamoDB document
val_attr_name: The name of the field to use for values in the DynamoDB document
create_if_not_exists: Whether or not to attempt to create the DynamoDB table
context: An existing ResourceCreatorContext (See aioboto3.resource() ) to reuse instead of creating a new one
"""

def __init__(
self,
table_name: str,
namespace: str = 'dynamodb_dict_data',
connection: ServiceResource = None,
region_name: str = 'us-east-1',
read_capacity_units: int = 1,
write_capacity_units: int = 1,
namespace: str,
key_attr_name: str,
val_attr_name: str,
create_if_not_exists: bool,
context: ResourceCreatorContext,
**kwargs,
):
super().__init__(**kwargs)
self.table_name = table_name
self.namespace = namespace
self.connection = connection or boto3.resource(
'dynamodb', region_name=region_name, **kwargs
self.key_attr_name = key_attr_name
self.val_attr_name = val_attr_name
self.create_if_not_exists = create_if_not_exists
self.context = context
self._table = None

async def get_table(self):
if not self._table:
# Re-use the service resource if it's already been created
if self.context.cls:
conn = self.context.cls
# otherwise create
else:
# should we try to call aexit later if we auto enter here?
conn = await self.context.__aenter__()

self._table = await conn.Table(self.table_name)
if self.create_if_not_exists:
try:
await conn.create_table(
AttributeDefinitions=[
{
'AttributeName': self.key_attr_name,
'AttributeType': 'S',
},
],
TableName=self.table_name,
KeySchema=[
{
'AttributeName': self.key_attr_name,
'KeyType': 'HASH',
},
],
BillingMode="PAY_PER_REQUEST",
)
await self._table.wait_until_exists()
except ClientError as e:
if e.response["Error"]["Code"] != "ResourceInUseException":
raise

return self._table

def _doc(self, key) -> Dict:
return {self.key_attr_name: f'{self.namespace}:{key}'}

async def _scan(self) -> AsyncIterable[Dict]:
table = await self.get_table()
client = table.meta.client
paginator = client.get_paginator('scan')
iterator = paginator.paginate(
TableName=table.name,
Select='ALL_ATTRIBUTES',
FilterExpression=f'begins_with({self.key_attr_name}, :namespace)',
ExpressionAttributeValues={':namespace': f'{self.namespace}:'},
)
async for result in iterator:
for item in result['Items']:
yield item

# Create the table if it doesn't already exist
try:
self.connection.create_table(
AttributeDefinitions=[
{
'AttributeName': 'namespace',
'AttributeType': 'S',
},
{
'AttributeName': 'key',
'AttributeType': 'S',
},
],
TableName=table_name,
KeySchema=[
{'AttributeName': 'namespace', 'KeyType': 'HASH'},
{'AttributeName': 'key', 'KeyType': 'RANGE'},
],
ProvisionedThroughput={
'ReadCapacityUnits': read_capacity_units,
'WriteCapacityUnits': write_capacity_units,
},
)
except ClientError:
pass

self._table = self.connection.Table(table_name)
self._table.wait_until_exists()

def _scan_table(self) -> Dict:
return self._table.query(
ExpressionAttributeValues={':Namespace': self.namespace},
ExpressionAttributeNames={'#N': 'namespace'},
KeyConditionExpression='#N = :Namespace',
)
async def delete(self, key: str) -> None:
doc = self._doc(key)
table = await self.get_table()
await table.delete_item(Key=doc)

@staticmethod
def unpickle(response_item: Dict) -> ResponseOrKey:
return BaseCache.unpickle((response_item or {}).get('value'))

async def clear(self):
response = self._scan_table()
for v in response['Items']:
composite_key = {'namespace': v['namespace'], 'key': v['key']}
self._table.delete_item(Key=composite_key)
async def read(self, key: str) -> ResponseOrKey:
table = await self.get_table()
response = await table.get_item(Key=self._doc(key), ProjectionExpression=self.val_attr_name)
item = response.get("Item")
if item:
return self.deserialize(item[self.val_attr_name].value)
return None

async def write(self, key: str, item: ResponseOrKey) -> None:
table = await self.get_table()
doc = self._doc(key)
doc[self.val_attr_name] = self.serialize(item)
await table.put_item(Item=doc)

async def clear(self) -> None:
async for key in self.keys():
await self.delete(key)

# TODO
async def contains(self, key: str) -> bool:
raise NotImplementedError

async def delete(self, key: str):
composite_key = {'namespace': self.namespace, 'key': str(key)}
response = self._table.delete_item(Key=composite_key, ReturnValues='ALL_OLD')
if 'Attributes' not in response:
raise KeyError
resp = await self.read(key)
return resp is not None

# TODO
async def keys(self) -> Iterable[str]:
raise NotImplementedError

async def read(self, key: str) -> ResponseOrKey:
response = self._table.get_item(Key={'namespace': self.namespace, 'key': str(key)})
return self.unpickle(response.get('Item'))
async def keys(self) -> AsyncIterable[str]:
len_prefix = len(self.namespace) + 1
async for item in self._scan():
yield item[self.key_attr_name][len_prefix:]

async def size(self) -> int:
expression_attribute_values = {':Namespace': self.namespace}
expression_attribute_names = {'#N': 'namespace'}
key_condition_expression = '#N = :Namespace'
return self._table.query(
Select='COUNT',
ExpressionAttributeValues=expression_attribute_values,
ExpressionAttributeNames=expression_attribute_names,
KeyConditionExpression=key_condition_expression,
)['Count']

async def values(self) -> Iterable[ResponseOrKey]:
response = self._scan_table()
return [self.unpickle(item) for item in response.get('Items', [])]

async def write(self, key: str, item: ResponseOrKey):
item_meta = {
'namespace': self.namespace,
'key': str(key),
'value': pickle.dumps(item, protocol=-1),
}
self._table.put_item(Item=item_meta)
count = 0
async for item in self._scan():
count += 1
return count

async def values(self) -> AsyncIterable[ResponseOrKey]:
async for item in self._scan():
yield self.deserialize(item[self.val_attr_name].value)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# Packages used for CI jobs
'build': ['coveralls', 'twine', 'wheel'],
# Packages for all supported backends
'backends': ['aiosqlite', 'boto3', 'motor', 'aioredis'],
'backends': ['aiosqlite', 'aioboto3', 'motor', 'aioredis'],
# Packages used for documentation builds
'docs': [
'm2r2~=0.2.5',
Expand Down
86 changes: 86 additions & 0 deletions test/integration/test_dynamodb_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import asyncio
import pytest
from datetime import datetime

import aioboto3

from aiohttp_client_cache.backends.dynamodb import DynamoDBBackend


def local_context():
return aioboto3.resource(
"dynamodb",
region_name='region',
aws_access_key_id='access_key_id',
aws_secret_access_key='secret_access_key',
endpoint_url="http://localhost:8000",
)


def is_dynamodb_running():
"""Test if a DynamoDB service is running locally"""

async def check_dynamodb():
async with local_context() as resource:
client = resource.meta.client
await client.describe_limits()

try:
asyncio.run(check_dynamodb())
return True
except OSError:
return False


pytestmark = [
pytest.mark.asyncio,
pytest.mark.skipif(
not is_dynamodb_running(), reason='local DynamoDB service required for integration tests'
),
]

test_data = {'key_1': 'item_1', 'key_2': datetime.now(), 'key_3': 3.141592654}


@pytest.fixture(autouse=True, scope='function')
async def cache_client():
"""Fixture that creates a new cache client for each test function"""
backend = DynamoDBBackend(create_if_not_exists=True, context=local_context())
cache_client = backend.responses
table = await cache_client.get_table()
yield cache_client
await table.delete()


async def test_write_read(cache_client):
for k, v in test_data.items():
await cache_client.write(k, v)
assert await cache_client.read(k) == v


async def test_delete(cache_client):
for k, v in test_data.items():
await cache_client.write(k, v)

for k in test_data.keys():
await cache_client.delete(k)
assert await cache_client.read(k) is None


async def test_keys_values_size(cache_client):
for k, v in test_data.items():
await cache_client.write(k, v)

assert await cache_client.size() == len(test_data)
assert {key async for key in cache_client.keys()} == set(test_data.keys())
assert {val async for val in cache_client.values()} == set(test_data.values())


async def test_clear(cache_client):
for k, v in test_data.items():
await cache_client.write(k, v)

await cache_client.clear()
assert await cache_client.size() == 0
assert [key async for key in cache_client.keys()] == []
assert [val async for val in cache_client.values()] == []

0 comments on commit 3c41a04

Please sign in to comment.