Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Improved error handling of errors in session token retrieval, and exp…

…iration.

Version 0.2.5
  • Loading branch information...
commit 6c4d2296fe2b8e44f34fbe70b64da9b168e53714 1 parent 20afa9c
@danielhfrank danielhfrank authored
View
3  CHANGELOG
@@ -1,3 +1,6 @@
+Version 0.2.5 - 2012-03-05
+ * Improve error handling from STS
+
Version 0.2.4 - 2012-02-23
* Fix RangeKeyAttribute bug in queries
* Add put_item method
View
4 asyncdynamo/__init__.py
@@ -28,5 +28,5 @@
except ImportError:
raise ImportError("boto library not installed. Install boto. https://github.com/boto/boto")
-version = "0.2.4"
-version_info = (0, 2, 4)
+version = "0.2.5"
+version_info = (0, 2, 5)
View
41 asyncdynamo/async_aws_sts.py
@@ -1,6 +1,6 @@
#!/bin/env python
#
-# Copyright 2010 bit.ly
+# Copyright 2012 bit.ly
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -26,9 +26,19 @@
import boto
from boto.sts.connection import STSConnection
from boto.sts.credentials import Credentials
+from boto.exception import BotoServerError
+
+class InvalidClientTokenIdError(BotoServerError):
+ '''
+ Error subclass to indicate that the client's token(s) is/are invalid
+ '''
+ pass
class AsyncAwsSts(STSConnection):
'''
+ Class that manages session tokens. Users of AsyncDynamoDB should not
+ need to worry about what goes on here.
+
Usage: Keep an instance of this class (though it should be cheap to
re instantiate) and periodically call get_session_token to get a new
Credentials object when, say, your session token expires
@@ -45,26 +55,35 @@ def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
proxy_user, proxy_pass, debug,
https_connection_factory, region, path, converter)
self.http_client = AsyncHTTPClient()
- # self._auth_handler *should* be set correctly by superclass.
def get_session_token(self, callback):
'''
Gets a new Credentials object with a session token, using this
- instance's aws keys. Callback should operate on the new Credentials obj
+ instance's aws keys. Callback should operate on the new Credentials obj,
+ or else a boto.exception.BotoServerError
'''
return self.get_object('GetSessionToken', {}, Credentials, verb='POST', callback=callback)
def get_object(self, action, params, cls, path="/", parent=None, verb="GET", callback=None):
+ '''
+ Get an instance of `cls` using `action`
+ '''
if not parent:
parent = self
self.make_request(action, params, path, verb,
functools.partial(self._finish_get_object, callback=callback, parent=parent, cls=cls))
- def _finish_get_object(self, response_body, callback, cls=None, parent=None):
+ def _finish_get_object(self, response_body, callback, cls=None, parent=None, error=None):
'''
- Process the body returned by STS. Expect things like network errors to have
- been handled by make_request
+ Process the body returned by STS. If an error is present, convert from a tornado error
+ to a boto error
'''
+ if error:
+ if error.code == 403:
+ error_class = InvalidClientTokenIdError
+ else:
+ error_class = BotoServerError
+ return callback(None, error=error_class(error.code, error.message, response_body))
obj = cls(parent)
h = boto.handler.XmlHandler(obj, parent)
xml.sax.parseString(response_body, h)
@@ -75,12 +94,12 @@ def make_request(self, action, params={}, path='/', verb='GET', callback=None):
Make an async request. This handles the logic of translating from boto params
to a tornado request obj, issuing the request, and passing back the body.
- The callback should operate on the body of the response
+ The callback should operate on the body of the response, and take an optional
+ error argument that will be a tornado error
'''
request = HTTPRequest('https://%s' % self.host,
method=verb)
request.params = params
- # request.path = '/' this one isn't necessary
request.auth_path = '/' # need this for auth
request.host = self.host # need this for auth
if action:
@@ -92,9 +111,5 @@ def make_request(self, action, params={}, path='/', verb='GET', callback=None):
def _finish_make_request(self, response, callback):
if response.error:
- print '!!!!!!!!!!!!!!!!!!!!!!!'
- print response.error
- print response.body
- print '!!!!!!!!!!!!!!!!!!!!!!!'
- return
+ return callback(response.body, error=response.error)
return callback(response.body)
View
245 asyncdynamo/asyncdynamo.py
@@ -1,6 +1,6 @@
#!/bin/env python
#
-# Copyright 2010 bit.ly
+# Copyright 2012 bit.ly
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -23,18 +23,39 @@
import simplejson as json
from tornado.httpclient import HTTPRequest
from tornado.httpclient import AsyncHTTPClient
+from tornado.ioloop import IOLoop
import functools
from collections import deque
+import time
+import logging
from boto.connection import AWSAuthConnection
from boto.exception import DynamoDBResponseError
from boto.auth import HmacAuthV3HTTPHandler
from boto.provider import Provider
-from async_aws_sts import AsyncAwsSts
+from async_aws_sts import AsyncAwsSts, InvalidClientTokenIdError
+PENDING_SESSION_TOKEN_UPDATE = "this is not your session token"
class AsyncDynamoDB(AWSAuthConnection):
+ """
+ The main class for asynchronous connections to DynamoDB.
+
+ The user should maintain one instance of this class (though more than one is ok),
+ parametrized with the user's access key and secret key. Make calls with make_request
+ or the helper methods, and AsyncDynamoDB will maintain session tokens in the background.
+
+
+ As in Boto Layer1:
+ "This is the lowest-level interface to DynamoDB. Methods at this
+ layer map directly to API requests and parameters to the methods
+ are either simple, scalar values or they are the Python equivalent
+ of the JSON input as defined in the DynamoDB Developer's Guide.
+ All responses are direct decoding of the JSON response bodies to
+ Python data structures via the json or simplejson modules."
+ """
+
DefaultHost = 'dynamodb.us-east-1.amazonaws.com'
"""The default DynamoDB API endpoint to connect to."""
@@ -56,7 +77,7 @@ class AsyncDynamoDB(AWSAuthConnection):
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
is_secure=True, port=None, proxy=None, proxy_port=None,
host=None, debug=0, session_token=None,
- authenticate_requests=True, validate_cert=True):
+ authenticate_requests=True, validate_cert=True, max_sts_attempts=3):
if not host:
host = self.DefaultHost
self.validate_cert = validate_cert
@@ -69,41 +90,97 @@ def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
self.http_client = AsyncHTTPClient()
self.pending_requests = deque()
self.sts = AsyncAwsSts(aws_access_key_id, aws_secret_access_key)
- if authenticate_requests and not session_token:
- self.sts.get_session_token(self._update_session_token_cb) # init the session token
+ assert (isinstance(max_sts_attempts, int) and max_sts_attempts >= 0)
+ self.max_sts_attempts = max_sts_attempts
+
+ def _init_session_token_cb(self, error=None):
+ if error:
+ logging.warn("Unable to get session token: %s" % error)
- def _required_auth_capability(self): # copied from boto layer1, looks important
+ def _required_auth_capability(self):
return ['hmac-v3-http']
- def _update_session_token_cb(self, creds, provider='aws', callback=None):
+ def _update_session_token(self, callback, attempts=0, bypass_lock=False):
+ '''
+ Begins the logic to get a new session token. Performs checks to ensure
+ that only one request goes out at a time and that backoff is respected, so
+ it can be called repeatedly with no ill effects. Set bypass_lock to True to
+ override this behavior.
+ '''
+ if self.provider.security_token == PENDING_SESSION_TOKEN_UPDATE and not bypass_lock:
+ return
+ self.provider.security_token = PENDING_SESSION_TOKEN_UPDATE # invalidate the current security token
+ return self.sts.get_session_token(
+ functools.partial(self._update_session_token_cb, callback=callback, attempts=attempts))
+
+ def _update_session_token_cb(self, creds, provider='aws', callback=None, error=None, attempts=0):
'''
Callback to use with `async_aws_sts`. The 'provider' arg is a bit misleading,
it is a relic from boto and should probably be left to its default. This will
take the new Credentials obj from `async_aws_sts.get_session_token()` and use
- it to update self.provider, and then will clear the deque of pending requests
+ it to update self.provider, and then will clear the deque of pending requests.
+
+ A callback is optional. If provided, it must be callable without any arguments,
+ but also accept an optional error argument that will be an instance of BotoServerError.
'''
- self.provider = Provider(provider,
- creds.access_key,
- creds.secret_key,
- creds.session_token)
- # force the correct auth, with the new provider
- self._auth_handler = HmacAuthV3HTTPHandler(self.host, None, self.provider)
- while self.pending_requests:
- request = self.pending_requests.pop()
- request()
- if callable(callback):
- return callback()
+ def raise_error():
+ # get out of locked state
+ self.provider.security_token = None
+ if callable(callback):
+ return callback(error=error)
+ else:
+ logging.error(error)
+ raise error
+ if error:
+ if isinstance(error, InvalidClientTokenIdError):
+ # no need to retry if error is due to bad tokens
+ raise_error()
+ else:
+ if attempts > self.max_sts_attempts:
+ raise_error()
+ else:
+ seconds_to_wait = (0.1*(2**attempts))
+ logging.warning("Got error[ %s ] getting session token, retrying in %.02f seconds" % (error, seconds_to_wait))
+ IOLoop.instance().add_timeout(time.time() + seconds_to_wait,
+ functools.partial(self._update_session_token, attempts=attempts+1, callback=callback, bypass_lock=True))
+ return
+ else:
+ self.provider = Provider(provider,
+ creds.access_key,
+ creds.secret_key,
+ creds.session_token)
+ # force the correct auth, with the new provider
+ self._auth_handler = HmacAuthV3HTTPHandler(self.host, None, self.provider)
+ while self.pending_requests:
+ request = self.pending_requests.pop()
+ request()
+ if callable(callback):
+ return callback()
def make_request(self, action, body='', callback=None, object_hook=None):
'''
Make an asynchronous HTTP request to DynamoDB. Callback should operate on
the decoded json response (with object hook applied, of course). It should also
- accept an error argument, that will be a boto.exception.DynamoDBResponseError
+ accept an error argument, which will be a boto.exception.DynamoDBResponseError.
+
+ If there is not a valid session token, this method will ensure that a new one is fetched
+ and cache the request when it is retrieved.
'''
this_request = functools.partial(self.make_request, action=action,
body=body, callback=callback,object_hook=object_hook)
- if self.authenticate_requests and not self.provider.security_token:
+ if self.authenticate_requests and self.provider.security_token in [None, PENDING_SESSION_TOKEN_UPDATE]:
+ # we will not be able to complete this request because we do not have a valid session token.
+ # queue it and try to get a new one. _update_session_token will ensure that only one request
+ # for a session token goes out at a time
self.pending_requests.appendleft(this_request)
+ def cb_for_update(error=None):
+ # create a callback to handle errors getting session token
+ # callback here is assumed to take a json response, and an instance of DynamoDBResponseError
+ if error:
+ return callback({}, error=DynamoDBResponseError(error.status, error.reason, error.body))
+ else:
+ return
+ self._update_session_token(cb_for_update)
return
headers = {'X-Amz-Target' : '%s_%s.%s' % (self.ServiceName,
self.Version, action),
@@ -122,27 +199,17 @@ def make_request(self, action, body='', callback=None, object_hook=None):
def _finish_make_request(self, response, callback, orig_request, token_used, object_hook=None):
'''
- Check for errors and decode the json response (in the tornado response body), then pass on to orig callback
+ Check for errors and decode the json response (in the tornado response body), then pass on to orig callback.
+ This method also contains some of the logic to handle reacquiring session tokens.
'''
json_response = json.loads(response.body, object_hook=object_hook)
if response.error:
if any((token_error in json_response.get('__type', []) \
for token_error in (self.ExpiredSessionError, self.UnrecognizedClientException))):
- if not self.provider.security_token:
- # this means that we have just asked for a new session token, but have not gotten it back yet.
- # consequently, we should add this to the list of requests to be retried when we get it back
- self.pending_requests.appendleft(orig_request)
- return
- elif token_used == self.provider.security_token:
- # This means that we used an expired token, and have not tried to get a new one yet
- # should insert logic to get a new session token and try again.
- self.provider.security_token = None # invalidate the current security token
- self.pending_requests.appendleft(orig_request) # schedule this request to be tried again
- return self.sts.get_session_token(self._update_session_token_cb)
- else:
- # the current session token is different from the one we used (ie it has been updated)
- # should just try again with the new one
- return orig_request()
+ if self.provider.security_token == token_used:
+ # the token that we used has expired. wipe it out
+ self.provider.security_token = None
+ return orig_request() # make_request will handle logic to get a new token if needed, and queue until it is fetched
else:
# because some errors are benign, include the response when an error is passed
return callback(json_response, error=DynamoDBResponseError(response.error.code,
@@ -152,8 +219,28 @@ def _finish_make_request(self, response, callback, orig_request, token_used, obj
def get_item(self, table_name, key, callback, attributes_to_get=None,
consistent_read=False, object_hook=None):
'''
- Issues an async tornado request to get an item
- '''
+ Return a set of attributes for an item that matches
+ the supplied key.
+
+ The callback should operate on a dict representing the decoded
+ response from DynamoDB (using the object_hook, if supplied)
+
+ :type table_name: str
+ :param table_name: The name of the table to delete.
+
+ :type key: dict
+ :param key: A Python version of the Key data structure
+ defined by DynamoDB.
+
+ :type attributes_to_get: list
+ :param attributes_to_get: A list of attribute names.
+ If supplied, only the specified attribute names will
+ be returned. Otherwise, all attributes will be returned.
+
+ :type consistent_read: bool
+ :param consistent_read: If True, a consistent read
+ request is issued. Otherwise, an eventually consistent
+ request is issued. '''
data = {'TableName': table_name,
'Key': key}
if attributes_to_get:
@@ -161,16 +248,53 @@ def get_item(self, table_name, key, callback, attributes_to_get=None,
if consistent_read:
data['ConsistentRead'] = True
return self.make_request('GetItem', body=json.dumps(data),
- callback=functools.partial(callback), object_hook=object_hook)
+ callback=callback, object_hook=object_hook)
def batch_get_item(self, request_items, callback):
+ """
+ Return a set of attributes for a multiple items in
+ multiple tables using their primary keys.
+
+ The callback should operate on a dict representing the decoded
+ response from DynamoDB (using the object_hook, if supplied)
+
+ :type request_items: dict
+ :param request_items: A Python version of the RequestItems
+ data structure defined by DynamoDB.
+ """
data = {'RequestItems' : request_items}
json_input = json.dumps(data)
self.make_request('BatchGetItem', json_input, callback)
def put_item(self, table_name, item, callback, expected=None, return_values=None, object_hook=None):
'''
- Issues an async request to create a new item or replace an old one.
+ Create a new item or replace an old item with a new
+ item (including all attributes). If an item already
+ exists in the specified table with the same primary
+ key, the new item will completely replace the old item.
+ You can perform a conditional put by specifying an
+ expected rule.
+
+ The callback should operate on a dict representing the decoded
+ response from DynamoDB (using the object_hook, if supplied)
+
+ :type table_name: str
+ :param table_name: The name of the table to delete.
+
+ :type item: dict
+ :param item: A Python version of the Item data structure
+ defined by DynamoDB.
+
+ :type expected: dict
+ :param expected: A Python version of the Expected
+ data structure defined by DynamoDB.
+
+ :type return_values: str
+ :param return_values: Controls the return of attribute
+ name-value pairs before then were changed. Possible
+ values are: None or 'ALL_OLD'. If 'ALL_OLD' is
+ specified and the item is overwritten, the content
+ of the old item is returned.
'''
data = {'TableName' : table_name,
'Item' : item}
@@ -187,7 +311,44 @@ def query(self, table_name, hash_key_value, callback, range_key_conditions=None,
scan_index_forward=True, exclusive_start_key=None,
object_hook=None):
'''
- Issues an async request to perform a query
+ Perform a query of DynamoDB. This version is currently punting
+ and expecting you to provide a full and correct JSON body
+ which is passed as is to DynamoDB.
+
+ The callback should operate on a dict representing the decoded
+ response from DynamoDB (using the object_hook, if supplied)
+
+ :type table_name: str
+ :param table_name: The name of the table to delete.
+
+ :type hash_key_value: dict
+ :param key: A DynamoDB-style HashKeyValue.
+
+ :type range_key_conditions: dict
+ :param range_key_conditions: A Python version of the
+ RangeKeyConditions data structure.
+
+ :type attributes_to_get: list
+ :param attributes_to_get: A list of attribute names.
+ If supplied, only the specified attribute names will
+ be returned. Otherwise, all attributes will be returned.
+
+ :type limit: int
+ :param limit: The maximum number of items to return.
+
+ :type consistent_read: bool
+ :param consistent_read: If True, a consistent read
+ request is issued. Otherwise, an eventually consistent
+ request is issued.
+
+ :type scan_index_forward: bool
+ :param scan_index_forward: Specified forward or backward
+ traversal of the index. Default is forward (True).
+
+ :type exclusive_start_key: list or tuple
+ :param exclusive_start_key: Primary key of the item from
+ which to continue an earlier query. This would be
+ provided as the LastEvaluatedKey in that query.
'''
data = {'TableName': table_name,
'HashKeyValue': hash_key_value}
View
2  setup.py
@@ -2,7 +2,7 @@
from distutils.core import setup
# also update version in __init__.py
-version = '0.2.4'
+version = '0.2.5'
setup(
name="asyncdynamo",
Please sign in to comment.
Something went wrong with that request. Please try again.