Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

add error handling for session token retrieval #3

Merged
merged 1 commit into from

2 participants

@danielhfrank

Man this was easier when sts just didn't return any errors...

async_sts will now pass back any errors it encounters to the caller. asyncdynamo has the logic to either raise the error immediately if it is a case of bad tokens, and otherwise backoff for about a minute or two (in case of network error) before raising an error

One thing that I missed - we do have a cache of pending requests in case we are unable to make some due to lack of current session token. I suppose it might be useful to somehow pass back the failed requests when raising an error, but I was not quite sure how. We can take another pass at that later

@danielhfrank

@jehiah this should now be totally ready to go.

@mreiferson mreiferson merged commit 965cc8a into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Mar 6, 2012
  1. @danielhfrank
This page is out of date. Refresh to see the latest.
View
3  CHANGELOG
@@ -1,3 +1,6 @@
+Version 0.2.5 - 2012-03-05
+ * Improve error handling from STS
+
Version 0.2.4 - 2012-02-23
* Fix RangeKeyAttribute bug in queries
* Add put_item method
View
4 asyncdynamo/__init__.py
@@ -28,5 +28,5 @@
except ImportError:
raise ImportError("boto library not installed. Install boto. https://github.com/boto/boto")
-version = "0.2.4"
-version_info = (0, 2, 4)
+version = "0.2.5"
+version_info = (0, 2, 5)
View
41 asyncdynamo/async_aws_sts.py
@@ -1,6 +1,6 @@
#!/bin/env python
#
-# Copyright 2010 bit.ly
+# Copyright 2012 bit.ly
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -26,9 +26,19 @@
import boto
from boto.sts.connection import STSConnection
from boto.sts.credentials import Credentials
+from boto.exception import BotoServerError
+
+class InvalidClientTokenIdError(BotoServerError):
+ '''
+ Error subclass to indicate that the client's token(s) is/are invalid
+ '''
+ pass
class AsyncAwsSts(STSConnection):
'''
+ Class that manages session tokens. Users of AsyncDynamoDB should not
+ need to worry about what goes on here.
+
Usage: Keep an instance of this class (though it should be cheap to
re instantiate) and periodically call get_session_token to get a new
Credentials object when, say, your session token expires
@@ -45,26 +55,35 @@ def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
proxy_user, proxy_pass, debug,
https_connection_factory, region, path, converter)
self.http_client = AsyncHTTPClient()
- # self._auth_handler *should* be set correctly by superclass.
def get_session_token(self, callback):
'''
Gets a new Credentials object with a session token, using this
- instance's aws keys. Callback should operate on the new Credentials obj
+ instance's aws keys. Callback should operate on the new Credentials obj,
+ or else a boto.exception.BotoServerError
'''
return self.get_object('GetSessionToken', {}, Credentials, verb='POST', callback=callback)
def get_object(self, action, params, cls, path="/", parent=None, verb="GET", callback=None):
+ '''
+ Get an instance of `cls` using `action`
+ '''
if not parent:
parent = self
self.make_request(action, params, path, verb,
functools.partial(self._finish_get_object, callback=callback, parent=parent, cls=cls))
- def _finish_get_object(self, response_body, callback, cls=None, parent=None):
+ def _finish_get_object(self, response_body, callback, cls=None, parent=None, error=None):
'''
- Process the body returned by STS. Expect things like network errors to have
- been handled by make_request
+ Process the body returned by STS. If an error is present, convert from a tornado error
+ to a boto error
'''
+ if error:
+ if error.code == 403:
+ error_class = InvalidClientTokenIdError
+ else:
+ error_class = BotoServerError
+ return callback(None, error=error_class(error.code, error.message, response_body))
obj = cls(parent)
h = boto.handler.XmlHandler(obj, parent)
xml.sax.parseString(response_body, h)
@@ -75,12 +94,12 @@ def make_request(self, action, params={}, path='/', verb='GET', callback=None):
Make an async request. This handles the logic of translating from boto params
to a tornado request obj, issuing the request, and passing back the body.
- The callback should operate on the body of the response
+ The callback should operate on the body of the response, and take an optional
+ error argument that will be a tornado error
'''
request = HTTPRequest('https://%s' % self.host,
method=verb)
request.params = params
- # request.path = '/' this one isn't necessary
request.auth_path = '/' # need this for auth
request.host = self.host # need this for auth
if action:
@@ -92,9 +111,5 @@ def make_request(self, action, params={}, path='/', verb='GET', callback=None):
def _finish_make_request(self, response, callback):
if response.error:
- print '!!!!!!!!!!!!!!!!!!!!!!!'
- print response.error
- print response.body
- print '!!!!!!!!!!!!!!!!!!!!!!!'
- return
+ return callback(response.body, error=response.error)
return callback(response.body)
View
245 asyncdynamo/asyncdynamo.py
@@ -1,6 +1,6 @@
#!/bin/env python
#
-# Copyright 2010 bit.ly
+# Copyright 2012 bit.ly
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -23,18 +23,39 @@
import simplejson as json
from tornado.httpclient import HTTPRequest
from tornado.httpclient import AsyncHTTPClient
+from tornado.ioloop import IOLoop
import functools
from collections import deque
+import time
+import logging
from boto.connection import AWSAuthConnection
from boto.exception import DynamoDBResponseError
from boto.auth import HmacAuthV3HTTPHandler
from boto.provider import Provider
-from async_aws_sts import AsyncAwsSts
+from async_aws_sts import AsyncAwsSts, InvalidClientTokenIdError
+PENDING_SESSION_TOKEN_UPDATE = "this is not your session token"
class AsyncDynamoDB(AWSAuthConnection):
+ """
+ The main class for asynchronous connections to DynamoDB.
+
+ The user should maintain one instance of this class (though more than one is ok),
+ parametrized with the user's access key and secret key. Make calls with make_request
+ or the helper methods, and AsyncDynamoDB will maintain session tokens in the background.
+
+
+ As in Boto Layer1:
+ "This is the lowest-level interface to DynamoDB. Methods at this
+ layer map directly to API requests and parameters to the methods
+ are either simple, scalar values or they are the Python equivalent
+ of the JSON input as defined in the DynamoDB Developer's Guide.
+ All responses are direct decoding of the JSON response bodies to
+ Python data structures via the json or simplejson modules."
+ """
+
DefaultHost = 'dynamodb.us-east-1.amazonaws.com'
"""The default DynamoDB API endpoint to connect to."""
@@ -56,7 +77,7 @@ class AsyncDynamoDB(AWSAuthConnection):
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
is_secure=True, port=None, proxy=None, proxy_port=None,
host=None, debug=0, session_token=None,
- authenticate_requests=True, validate_cert=True):
+ authenticate_requests=True, validate_cert=True, max_sts_attempts=3):
if not host:
host = self.DefaultHost
self.validate_cert = validate_cert
@@ -69,41 +90,97 @@ def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
self.http_client = AsyncHTTPClient()
self.pending_requests = deque()
self.sts = AsyncAwsSts(aws_access_key_id, aws_secret_access_key)
- if authenticate_requests and not session_token:
- self.sts.get_session_token(self._update_session_token_cb) # init the session token
+ assert (isinstance(max_sts_attempts, int) and max_sts_attempts >= 0)
+ self.max_sts_attempts = max_sts_attempts
+
+ def _init_session_token_cb(self, error=None):
+ if error:
+ logging.warn("Unable to get session token: %s" % error)
- def _required_auth_capability(self): # copied from boto layer1, looks important
+ def _required_auth_capability(self):
return ['hmac-v3-http']
- def _update_session_token_cb(self, creds, provider='aws', callback=None):
+ def _update_session_token(self, callback, attempts=0, bypass_lock=False):
+ '''
+ Begins the logic to get a new session token. Performs checks to ensure
+ that only one request goes out at a time and that backoff is respected, so
+ it can be called repeatedly with no ill effects. Set bypass_lock to True to
+ override this behavior.
+ '''
+ if self.provider.security_token == PENDING_SESSION_TOKEN_UPDATE and not bypass_lock:
+ return
+ self.provider.security_token = PENDING_SESSION_TOKEN_UPDATE # invalidate the current security token
+ return self.sts.get_session_token(
+ functools.partial(self._update_session_token_cb, callback=callback, attempts=attempts))
+
+ def _update_session_token_cb(self, creds, provider='aws', callback=None, error=None, attempts=0):
'''
Callback to use with `async_aws_sts`. The 'provider' arg is a bit misleading,
it is a relic from boto and should probably be left to its default. This will
take the new Credentials obj from `async_aws_sts.get_session_token()` and use
- it to update self.provider, and then will clear the deque of pending requests
+ it to update self.provider, and then will clear the deque of pending requests.
+
+ A callback is optional. If provided, it must be callable without any arguments,
+ but also accept an optional error argument that will be an instance of BotoServerError.
'''
- self.provider = Provider(provider,
- creds.access_key,
- creds.secret_key,
- creds.session_token)
- # force the correct auth, with the new provider
- self._auth_handler = HmacAuthV3HTTPHandler(self.host, None, self.provider)
- while self.pending_requests:
- request = self.pending_requests.pop()
- request()
- if callable(callback):
- return callback()
+ def raise_error():
+ # get out of locked state
+ self.provider.security_token = None
+ if callable(callback):
+ return callback(error=error)
+ else:
+ logging.error(error)
+ raise error
+ if error:
+ if isinstance(error, InvalidClientTokenIdError):
+ # no need to retry if error is due to bad tokens
+ raise_error()
+ else:
+ if attempts > self.max_sts_attempts:
+ raise_error()
+ else:
+ seconds_to_wait = (0.1*(2**attempts))
+ logging.warning("Got error[ %s ] getting session token, retrying in %.02f seconds" % (error, seconds_to_wait))
+ IOLoop.instance().add_timeout(time.time() + seconds_to_wait,
+ functools.partial(self._update_session_token, attempts=attempts+1, callback=callback, bypass_lock=True))
+ return
+ else:
+ self.provider = Provider(provider,
+ creds.access_key,
+ creds.secret_key,
+ creds.session_token)
+ # force the correct auth, with the new provider
+ self._auth_handler = HmacAuthV3HTTPHandler(self.host, None, self.provider)
+ while self.pending_requests:
+ request = self.pending_requests.pop()
+ request()
+ if callable(callback):
+ return callback()
def make_request(self, action, body='', callback=None, object_hook=None):
'''
Make an asynchronous HTTP request to DynamoDB. Callback should operate on
the decoded json response (with object hook applied, of course). It should also
- accept an error argument, that will be a boto.exception.DynamoDBResponseError
+ accept an error argument, which will be a boto.exception.DynamoDBResponseError.
+
+ If there is not a valid session token, this method will ensure that a new one is fetched
+ and cache the request when it is retrieved.
'''
this_request = functools.partial(self.make_request, action=action,
body=body, callback=callback,object_hook=object_hook)
- if self.authenticate_requests and not self.provider.security_token:
+ if self.authenticate_requests and self.provider.security_token in [None, PENDING_SESSION_TOKEN_UPDATE]:
+ # we will not be able to complete this request because we do not have a valid session token.
+ # queue it and try to get a new one. _update_session_token will ensure that only one request
+ # for a session token goes out at a time
self.pending_requests.appendleft(this_request)
+ def cb_for_update(error=None):
+ # create a callback to handle errors getting session token
+ # callback here is assumed to take a json response, and an instance of DynamoDBResponseError
+ if error:
+ return callback({}, error=DynamoDBResponseError(error.status, error.reason, error.body))
+ else:
+ return
+ self._update_session_token(cb_for_update)
return
headers = {'X-Amz-Target' : '%s_%s.%s' % (self.ServiceName,
self.Version, action),
@@ -122,27 +199,17 @@ def make_request(self, action, body='', callback=None, object_hook=None):
def _finish_make_request(self, response, callback, orig_request, token_used, object_hook=None):
'''
- Check for errors and decode the json response (in the tornado response body), then pass on to orig callback
+ Check for errors and decode the json response (in the tornado response body), then pass on to orig callback.
+ This method also contains some of the logic to handle reacquiring session tokens.
'''
json_response = json.loads(response.body, object_hook=object_hook)
if response.error:
if any((token_error in json_response.get('__type', []) \
for token_error in (self.ExpiredSessionError, self.UnrecognizedClientException))):
- if not self.provider.security_token:
- # this means that we have just asked for a new session token, but have not gotten it back yet.
- # consequently, we should add this to the list of requests to be retried when we get it back
- self.pending_requests.appendleft(orig_request)
- return
- elif token_used == self.provider.security_token:
- # This means that we used an expired token, and have not tried to get a new one yet
- # should insert logic to get a new session token and try again.
- self.provider.security_token = None # invalidate the current security token
- self.pending_requests.appendleft(orig_request) # schedule this request to be tried again
- return self.sts.get_session_token(self._update_session_token_cb)
- else:
- # the current session token is different from the one we used (ie it has been updated)
- # should just try again with the new one
- return orig_request()
+ if self.provider.security_token == token_used:
+ # the token that we used has expired. wipe it out
+ self.provider.security_token = None
+ return orig_request() # make_request will handle logic to get a new token if needed, and queue until it is fetched
else:
# because some errors are benign, include the response when an error is passed
return callback(json_response, error=DynamoDBResponseError(response.error.code,
@@ -152,8 +219,28 @@ def _finish_make_request(self, response, callback, orig_request, token_used, obj
def get_item(self, table_name, key, callback, attributes_to_get=None,
consistent_read=False, object_hook=None):
'''
- Issues an async tornado request to get an item
- '''
+ Return a set of attributes for an item that matches
+ the supplied key.
+
+ The callback should operate on a dict representing the decoded
+ response from DynamoDB (using the object_hook, if supplied)
+
+ :type table_name: str
+ :param table_name: The name of the table to delete.
+
+ :type key: dict
+ :param key: A Python version of the Key data structure
+ defined by DynamoDB.
+
+ :type attributes_to_get: list
+ :param attributes_to_get: A list of attribute names.
+ If supplied, only the specified attribute names will
+ be returned. Otherwise, all attributes will be returned.
+
+ :type consistent_read: bool
+ :param consistent_read: If True, a consistent read
+ request is issued. Otherwise, an eventually consistent
+ request is issued. '''
data = {'TableName': table_name,
'Key': key}
if attributes_to_get:
@@ -161,16 +248,53 @@ def get_item(self, table_name, key, callback, attributes_to_get=None,
if consistent_read:
data['ConsistentRead'] = True
return self.make_request('GetItem', body=json.dumps(data),
- callback=functools.partial(callback), object_hook=object_hook)
+ callback=callback, object_hook=object_hook)
def batch_get_item(self, request_items, callback):
+ """
+ Return a set of attributes for a multiple items in
+ multiple tables using their primary keys.
+
+ The callback should operate on a dict representing the decoded
+ response from DynamoDB (using the object_hook, if supplied)
+
+ :type request_items: dict
+ :param request_items: A Python version of the RequestItems
+ data structure defined by DynamoDB.
+ """
data = {'RequestItems' : request_items}
json_input = json.dumps(data)
self.make_request('BatchGetItem', json_input, callback)
def put_item(self, table_name, item, callback, expected=None, return_values=None, object_hook=None):
'''
- Issues an async request to create a new item or replace an old one.
+ Create a new item or replace an old item with a new
+ item (including all attributes). If an item already
+ exists in the specified table with the same primary
+ key, the new item will completely replace the old item.
+ You can perform a conditional put by specifying an
+ expected rule.
+
+ The callback should operate on a dict representing the decoded
+ response from DynamoDB (using the object_hook, if supplied)
+
+ :type table_name: str
+ :param table_name: The name of the table to delete.
+
+ :type item: dict
+ :param item: A Python version of the Item data structure
+ defined by DynamoDB.
+
+ :type expected: dict
+ :param expected: A Python version of the Expected
+ data structure defined by DynamoDB.
+
+ :type return_values: str
+ :param return_values: Controls the return of attribute
+ name-value pairs before then were changed. Possible
+ values are: None or 'ALL_OLD'. If 'ALL_OLD' is
+ specified and the item is overwritten, the content
+ of the old item is returned.
'''
data = {'TableName' : table_name,
'Item' : item}
@@ -187,7 +311,44 @@ def query(self, table_name, hash_key_value, callback, range_key_conditions=None,
scan_index_forward=True, exclusive_start_key=None,
object_hook=None):
'''
- Issues an async request to perform a query
+ Perform a query of DynamoDB. This version is currently punting
+ and expecting you to provide a full and correct JSON body
+ which is passed as is to DynamoDB.
+
+ The callback should operate on a dict representing the decoded
+ response from DynamoDB (using the object_hook, if supplied)
+
+ :type table_name: str
+ :param table_name: The name of the table to delete.
+
+ :type hash_key_value: dict
+ :param key: A DynamoDB-style HashKeyValue.
+
+ :type range_key_conditions: dict
+ :param range_key_conditions: A Python version of the
+ RangeKeyConditions data structure.
+
+ :type attributes_to_get: list
+ :param attributes_to_get: A list of attribute names.
+ If supplied, only the specified attribute names will
+ be returned. Otherwise, all attributes will be returned.
+
+ :type limit: int
+ :param limit: The maximum number of items to return.
+
+ :type consistent_read: bool
+ :param consistent_read: If True, a consistent read
+ request is issued. Otherwise, an eventually consistent
+ request is issued.
+
+ :type scan_index_forward: bool
+ :param scan_index_forward: Specified forward or backward
+ traversal of the index. Default is forward (True).
+
+ :type exclusive_start_key: list or tuple
+ :param exclusive_start_key: Primary key of the item from
+ which to continue an earlier query. This would be
+ provided as the LastEvaluatedKey in that query.
'''
data = {'TableName': table_name,
'HashKeyValue': hash_key_value}
View
2  setup.py
@@ -2,7 +2,7 @@
from distutils.core import setup
# also update version in __init__.py
-version = '0.2.4'
+version = '0.2.5'
setup(
name="asyncdynamo",
Something went wrong with that request. Please try again.