Skip to content

Commit

Permalink
Merge pull request #340 from terrycain/dynamodb
Browse files Browse the repository at this point in the history
DynamoDB Tests + Examples
  • Loading branch information
jettify committed Sep 6, 2017
2 parents b08b975 + 291262b commit deff05f
Show file tree
Hide file tree
Showing 9 changed files with 351 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Changes

* Added SQS examples and tests #336
* Changed requirements.txt structure #336
* Added DynamoDB examples and tests #340

0.4.4 (2017-08-16)
^^^^^^^^^^^^^^^^^^
Expand Down
25 changes: 25 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,31 @@ Basic Example
loop.run_until_complete(go(loop))
Supported AWS Services
----------------------

This is a non-exuastive list of what tests aiobotocore runs against AWS services. Not all methods are tested but we aim to test the majority of
commonly used methods.

+----------------+-----------------------+
| Service | Status |
+================+=======================+
| S3 | Working |
+----------------+-----------------------+
| DynamoDB | Basic methods tested |
+----------------+-----------------------+
| SNS | Basic methods tested |
+----------------+-----------------------+
| SQS | Basic methods tested |
+----------------+-----------------------+
| CloudFormation | Stack creation tested |
+----------------+-----------------------+

Due to the way boto3 is implemented, its highly likely that even if services are not listed above that you can take any `boto3.client('service')` and
stick `await` infront of methods to make them async, e.g. `await client.list_named_queries()` would asynchronous list all of the named Athena queries.

If a service is not listed here and you could do with some tests or examples feel free to raise an issue.

Run Tests
---------

Expand Down
27 changes: 27 additions & 0 deletions docs/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,30 @@ The consumer will read off any messages on the queue, waiting up to 2 seconds fo
.. literalinclude:: ../examples/sqs_queue_producer.py

.. literalinclude:: ../examples/sqs_queue_consumer.py

DynamoDB
--------

Table Creation
++++++++++++++

When you create a DynamoDB table, it can take quite a while (especially if you add a few secondary index's). Instead of polling `describe_table` yourself,
boto3 came up with "waiters" that will do all the polling for you. The following snippet shows how to wait for a DynamoDB table to be created in an async way.

.. literalinclude:: ../examples/dynamodb_create_table.py

Batch Insertion
+++++++++++++++

Now if you have a massive amount of data to insert into Dynamo, I would suggest using an EMR data pipeline (theres even an example for exactly this). But
if you stubborn, here is an example of inserting lots of items into Dynamo (it's not really that complicated once you've read it).

What the code does is generates items (e.g. item0, item1, item2...) and writes them to a table "test" against a primary partition key called "pk"
(with 5 read and 5 write units, no auto-scaling).

The `batch_write_item` method only takes a max of 25 items at a time, so the script computes 25 items, writes them, then does it all over again.

After Dynamo has had enough, it will start throttling you and return any items that have not been written in the response. Once the script is
being throttled, it will start sleeping for 5 seconds until the failed items have been successfully written, after that it will exit.

.. literalinclude:: ../examples/dynamodb_batch_write.py
112 changes: 112 additions & 0 deletions examples/dynamodb_batch_write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Boto should get credentials from ~/.aws/credentials or the environment
import asyncio

import aiobotocore


def get_items(start_num, num_items):
"""
Generate a sequence of dynamo items
:param start_num: Start index
:type start_num: int
:param num_items: Number of items
:type num_items: int
:return: List of dictionaries
:rtype: list of dict
"""
result = []
for i in range(start_num, start_num+num_items):
result.append({'pk': {'S': 'item{0}'.format(i)}})
return result


def create_batch_write_structure(table_name, start_num, num_items):
"""
Create item structure for passing to batch_write_item
:param table_name: DynamoDB table name
:type table_name: str
:param start_num: Start index
:type start_num: int
:param num_items: Number of items
:type num_items: int
:return: dictionary of tables to write to
:rtype: dict
"""
return {
table_name: [
{'PutRequest': {'Item': item}}
for item in get_items(start_num, num_items)
]
}


@asyncio.coroutine
def go(loop):
session = aiobotocore.get_session(loop=loop)
client = session.create_client('dynamodb', region_name='us-west-2')
table_name = 'test'

print('Writing to dynamo')
start = 0
while True:
# Loop adding 25 items to dynamo at a time
request_items = create_batch_write_structure(table_name, start, 25)
response = yield from client.batch_write_item(
RequestItems=request_items
)
if len(response['UnprocessedItems']) == 0:
print('Writted 25 items to dynamo')
else:
# Hit the provisioned write limit
print('Hit write limit, backing off then retrying')
yield from asyncio.sleep(5)

# Items left over that haven't been inserted
unprocessed_items = response['UnprocessedItems']
print('Resubmitting items')
# Loop until unprocessed items are written
while len(unprocessed_items) > 0:
response = yield from client.batch_write_item(
RequestItems=unprocessed_items
)
# If any items are still left over, add them to the
# list to be written
unprocessed_items = response['UnprocessedItems']

# If there are items left over, we could do with
# sleeping some more
if len(unprocessed_items) > 0:
print('Backing off for 5 seconds')
yield from asyncio.sleep(5)

# Inserted all the unprocessed items, exit loop
print('Unprocessed items successfully inserted')
break

start += 25

# See if DynamoDB has the last item we inserted
final_item = 'item' + str(start + 24)
print('Item "{0}" should exist'.format(final_item))

response = yield from client.get_item(
TableName=table_name,
Key={'pk': {'S': final_item}}
)
print('Response: ' + str(response['Item']))

yield from client.close()


def main():
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))
except KeyboardInterrupt:
pass


if __name__ == '__main__':
main()
53 changes: 53 additions & 0 deletions examples/dynamodb_create_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Boto should get credentials from ~/.aws/credentials or the environment
import uuid
import asyncio

import aiobotocore


@asyncio.coroutine
def go(loop):
session = aiobotocore.get_session(loop=loop)
client = session.create_client('dynamodb', region_name='us-west-2')
# Create random table name
table_name = 'aiobotocore-' + str(uuid.uuid4())

print('Requesting table creation...')
yield from client.create_table(
TableName=table_name,
AttributeDefinitions=[
{
'AttributeName': 'testKey',
'AttributeType': 'S'
},
],
KeySchema=[
{
'AttributeName': 'testKey',
'KeyType': 'HASH'
},
],
ProvisionedThroughput={
'ReadCapacityUnits': 10,
'WriteCapacityUnits': 10
}
)

print("Waiting for table to be created...")
waiter = client.get_waiter('table_exists')
yield from waiter.wait(TableName=table_name)
print("Table {0} created".format(table_name))

yield from client.close()


def main():
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))
except KeyboardInterrupt:
pass


if __name__ == '__main__':
main()
3 changes: 2 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
-e .
Flask==0.12.2
coverage==4.4.1
flake8==3.4.1
Expand All @@ -10,4 +11,4 @@ botocore==1.6.4
multidict==3.1.3
wrapt==1.10.11
dill==0.2.7.1
packaging==16.8
packaging==16.8
2 changes: 1 addition & 1 deletion tests/mock_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def start_service(service_name, host, port):
process = sp.Popen(args, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.DEVNULL)
url = "http://{host}:{port}".format(host=host, port=port)

for i in range(0, 10):
for i in range(0, 30):
if process.poll() is not None:
break

Expand Down
19 changes: 0 additions & 19 deletions tests/test_basic_dynamodb.py

This file was deleted.

0 comments on commit deff05f

Please sign in to comment.