Permalink
Browse files

Added retry_handler mechanism to allow more flexibility in handling t…

…he new retry approaches required in DynamoDB.
  • Loading branch information...
1 parent 4719049 commit a2bbf639aa3cc9f40029302a5d8b9396852a905a @garnaat garnaat committed Jan 26, 2012
Showing with 117 additions and 58 deletions.
  1. +7 −0 boto/auth.py
  2. +36 −16 boto/connection.py
  3. +48 −37 boto/dynamodb/layer1.py
  4. +17 −4 boto/sts/credentials.py
  5. +9 −1 tests/dynamodb/test_layer1.py
View
@@ -75,6 +75,9 @@ def __init__(self, host, config, provider):
if provider.access_key is None or provider.secret_key is None:
raise boto.auth_handler.NotReadyToAuthenticate()
self.host = host
+ self.update_provider(provider)
+
+ def update_provider(self, provider):
self._provider = provider
self._hmac = hmac.new(self._provider.secret_key, digestmod=sha)
if sha256:
@@ -240,6 +243,10 @@ def add_auth(self, req, **kwargs):
:type req: :class`boto.connection.HTTPRequest`
:param req: The HTTPRequest object.
"""
+ # This could be a retry. Make sure the previous
+ # authorization header is removed first.
+ if 'X-Amzn-Authorization' in req.headers:
+ del req.headers['X-Amzn-Authorization']
req.headers['X-Amz-Date'] = formatdate(usegmt=True)
req.headers['X-Amz-Security-Token'] = self._provider.security_token
string_to_sign, headers_to_sign = self.string_to_sign(req)
View
@@ -696,14 +696,16 @@ def get_proxy_auth_header(self):
auth = base64.encodestring(self.proxy_user + ':' + self.proxy_pass)
return {'Proxy-Authorization': 'Basic %s' % auth}
- def _mexe(self, request, sender=None, override_num_retries=None):
+ def _mexe(self, request, sender=None, override_num_retries=None,
+ retry_handler=None):
"""
mexe - Multi-execute inside a loop, retrying multiple times to handle
transient Internet errors by simply trying again.
Also handles redirects.
This code was inspired by the S3Utils classes posted to the boto-users
Google group by Larry Bates. Thanks!
+
"""
boto.log.debug('Method: %s' % request.method)
boto.log.debug('Path: %s' % request.path)
@@ -724,35 +726,50 @@ def _mexe(self, request, sender=None, override_num_retries=None):
next_sleep = random.random() * (2 ** i)
try:
# we now re-sign each request before it is retried
+ boto.log.debug('Token: %s' % self.provider.security_token)
request.authorize(connection=self)
if callable(sender):
response = sender(connection, request.method, request.path,
request.body, request.headers)
else:
- connection.request(request.method, request.path, request.body,
- request.headers)
+ connection.request(request.method, request.path,
+ request.body, request.headers)
response = connection.getresponse()
location = response.getheader('location')
# -- gross hack --
# httplib gets confused with chunked responses to HEAD requests
# so I have to fake it out
- if request.method == 'HEAD' and getattr(response, 'chunked', False):
+ if request.method == 'HEAD' and getattr(response,
+ 'chunked', False):
response.chunked = 0
+ if callable(retry_handler):
+ status = retry_handler(response, i, next_sleep)
+ if status:
+ msg, i, next_sleep = status
+ if msg:
+ boto.log.debug(msg)
+ continue
if response.status == 500 or response.status == 503:
- boto.log.debug('received %d response, retrying in %3.1f seconds' %
- (response.status, next_sleep))
+ msg = 'Received %d response. '
+ msg += 'Retrying in %3.1f seconds' % (response.status,
+ next_sleep)
+ boto.log.debug(msg)
body = response.read()
elif response.status < 300 or response.status >= 400 or \
not location:
- self.put_http_connection(request.host, self.is_secure, connection)
+ self.put_http_connection(request.host, self.is_secure,
+ connection)
return response
else:
- scheme, request.host, request.path, params, query, fragment = \
- urlparse.urlparse(location)
+ scheme, request.host, request.path, \
+ params, query, fragment = urlparse.urlparse(location)
if query:
request.path += '?' + query
- boto.log.debug('Redirecting: %s' % scheme + '://' + request.host + request.path)
- connection = self.get_http_connection(request.host, scheme == 'https')
+ msg = 'Redirecting: %s' % scheme + '://'
+ msg += request.host + request.path
+ boto.log.debug(msg)
+ connection = self.get_http_connection(request.host,
+ scheme == 'https')
continue
except self.http_exceptions, e:
for unretryable in self.http_unretryable_exceptions:
@@ -763,18 +780,21 @@ def _mexe(self, request, sender=None, override_num_retries=None):
raise e
boto.log.debug('encountered %s exception, reconnecting' % \
e.__class__.__name__)
- connection = self.new_http_connection(request.host, self.is_secure)
+ connection = self.new_http_connection(request.host,
+ self.is_secure)
time.sleep(next_sleep)
i += 1
- # If we made it here, it's because we have exhausted our retries and stil haven't
- # succeeded. So, if we have a response object, use it to raise an exception.
- # Otherwise, raise the exception that must have already happened.
+ # If we made it here, it's because we have exhausted our retries
+ # and stil haven't succeeded. So, if we have a response object,
+ # use it to raise an exception.
+ # Otherwise, raise the exception that must have already h#appened.
if response:
raise BotoServerError(response.status, response.reason, body)
elif e:
raise e
else:
- raise BotoClientError('Please report this exception as a Boto Issue!')
+ msg = 'Please report this exception as a Boto Issue!'
+ raise BotoClientError(msg)
def build_base_http_request(self, method, path, auth_path,
params=None, headers=None, data='', host=None):
View
@@ -1,5 +1,5 @@
-# Copyright (c) 2011 Mitch Garnaat http://garnaat.org/
-# Copyright (c) 2011 Amazon.com, Inc. or its affiliates. All Rights Reserved
+# Copyright (c) 2012 Mitch Garnaat http://garnaat.org/
+# Copyright (c) 2012 Amazon.com, Inc. or its affiliates. All Rights Reserved
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
@@ -24,6 +24,7 @@
import boto
from boto.connection import AWSAuthConnection
from boto.exception import DynamoDBResponseError
+from boto.provider import Provider
from boto.dynamodb import exceptions as dynamodb_exceptions
from boto.dynamodb.table import Table
@@ -62,7 +63,7 @@ class Layer1(AWSAuthConnection):
ThruputError = "ProvisionedThroughputExceededException"
"""The error response returned when provisioned throughput is exceeded"""
- ExpiredSessionError = 'com.amazon.coral.service#ExpiredTokenException'
+ SessionExpiredError = 'com.amazon.coral.service#ExpiredTokenException'
"""The error response returned when session token has expired"""
ResponseError = DynamoDBResponseError
@@ -72,10 +73,10 @@ def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
host=None, debug=0, session_token=None):
if not host:
host = self.DefaultHost
+ self._passed_access_key = aws_access_key_id
+ self._passed_secret_key = aws_secret_access_key
if not session_token:
- self.sts = boto.connect_sts(aws_access_key_id,
- aws_secret_access_key)
- session_token = self.sts.get_session_token()
+ session_token = self._get_session_token()
self.creds = session_token
AWSAuthConnection.__init__(self, host,
self.creds.access_key,
@@ -84,6 +85,19 @@ def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
debug=debug,
security_token=self.creds.session_token)
+ def _update_provider(self):
+ self.provider = Provider('aws',
+ self.creds.access_key,
+ self.creds.secret_key,
+ self.creds.session_token)
+ self._auth_handler.update_provider(self.provider)
+
+ def _get_session_token(self):
+ boto.log.debug('Creating new Session Token')
+ sts = boto.connect_sts(self._passed_access_key,
+ self._passed_secret_key)
+ return sts.get_session_token()
+
def _required_auth_capability(self):
return ['hmac-v3-http']
@@ -93,42 +107,39 @@ def make_request(self, action, body='', object_hook=None):
"""
headers = {'X-Amz-Target' : '%s_%s.%s' % (self.ServiceName,
self.Version, action),
- 'Content-Type' : 'application/x-amz-json-1.0',
- 'Content-Length' : str(len(body))}
- numAttempts = 0
- while numAttempts < self.num_retries:
- http_request = self.build_base_http_request('POST', '/', '/',
- {}, headers, body, None)
- response = self._mexe(http_request, sender=None,
- override_num_retries=0)
+ 'Content-Type' : 'application/x-amz-json-1.0',
+ 'Content-Length' : str(len(body))}
+ http_request = self.build_base_http_request('POST', '/', '/',
+ {}, headers, body, None)
+ response = self._mexe(http_request, sender=None,
+ retry_handler=self._retry_handler)
+ response_body = response.read()
+ boto.log.debug(response_body)
+ return json.loads(response_body, object_hook=object_hook)
+
+ def _retry_handler(self, response, i, next_sleep):
+ status = None
+ if response.status == 400:
response_body = response.read()
boto.log.debug(response_body)
- json_response = json.loads(response_body, object_hook=object_hook)
- if response.status == 200:
- return json_response
- elif response.status == 400:
- # We only handle "soft" retries of ProvisionedThroughput error
- if self.ThruputError in json_response.get('__type'):
- boto.log.debug("%s, retry attempt %s" % (self.ThruputError,
- numAttempts))
- if numAttempts < self.num_retries:
- # Sleep a fractional amount of time, which corresponds
- # to 1 second for our last attempt, and zero time for
- # our first retry.
- time.sleep((1.0/self.num_retries)*numAttempts)
- numAttempts += 1
- continue
- raise self.ResponseError(response.status, response.reason,
- json_response)
+ json_response = json.loads(response_body)
+ if self.ThruputError in json_response.get('__type'):
+ msg = "%s, retry attempt %s" % (self.ThruputError, i)
+ # Sleep a fractional amount of time, which corresponds
+ # to 1 second for our last attempt, and zero time for
+ # our first retry.
+ next_sleep = (1.0/self.num_retries)*i
+ i += 1
+ status = (msg, i, next_sleep)
+ elif self.SessionExpiredError in json_response.get('__type'):
+ msg = 'Renewing Session Token'
+ self.creds = self._get_session_token()
+ self._update_provider()
+ status = (msg, i+self.num_retries-1, next_sleep)
else:
- if self.SessionExpiredError in json_response.get('__type'):
- # TODO: Retrieve new session token here
- raise dynamodb_exceptions.DynamoDBExpiredTokenError(
- response.status, json_response.get('message'),
- json_response)
-
raise self.ResponseError(response.status, response.reason,
json_response)
+ return status
def list_tables(self, limit=None, start_table=None):
"""
View
@@ -45,6 +45,21 @@ def __init__(self, parent=None):
self.expiration = None
@classmethod
+ def from_json(cls, json_doc):
+ """
+ Create and return a new Session Token based on the contents
+ of a JSON document.
+
+ :type json_doc: str
+ :param json_doc: A string containing a JSON document with a
+ previously saved Credentials object.
+ """
+ d = json.loads(json_doc)
+ token = cls()
+ token.__dict__.update(d)
+ return token
+
+ @classmethod
def load(cls, file_path):
"""
Create and return a new Session Token based on the contents
@@ -55,11 +70,9 @@ def load(cls, file_path):
file containing the previously saved Session Token information.
"""
fp = open(file_path)
- d = json.load(fp)
+ json_doc = fp.read()
fp.close()
- token = cls()
- token.__dict__.update(d)
- return token
+ return cls.from_json(json_doc)
def startElement(self, name, attrs, connection):
return None
@@ -28,12 +28,20 @@
import time
from boto.dynamodb.exceptions import DynamoDBKeyNotFoundError
from boto.dynamodb.layer1 import Layer1
+from boto.sts.credentials import Credentials
+
+json_doc = """{"access_key": "ASIAIV7R2NUUJ6SB7GKQ", "secret_key": "eIfijGxJlejHDSQiaGr6b7U805U0GKWmllCTt2ZM", "request_id": "28c17897-4555-11e1-8bb1-2529f165f2f0", "expiration": "2012-01-23T00:59:45.617Z", "session_token": "AQoDYXdzEPn//////////wEasAGDXeGY8bx36NLRSA1v3dy2x00k3FNA2KVsMEXkQuKY08gPTtYs2tefZTBsTjgjC+O6j8ieoB1on2bPyCq872+Yq3cipls8jna+PNSEcsXtC8CJBKai/FfYNg1XUHam6EUCtRiUHvqztOVgaGqUUS1UbrBKB7kKSXzgKrJ9AT0bvqi4hZS0ayaU8969f2HIbN9psXhRBKpJyB9FUPuVYpYYZsz9NY3y2kGtK+dgfrKvxyDxxfL4BA=="}"""
class DynamoDBLayer1Test (unittest.TestCase):
def test_layer1_basic(self):
print '--- running DynamoDB Layer1 tests ---'
- c = Layer1()
+
+ # Create a Layer1 connection with an expired set of
+ # credentials to test the automatic renewal of tokens
+
+ bad_creds = Credentials.from_json(json_doc)
+ c = Layer1(session_token=bad_creds)
# First create a table
table_name = 'test-%d' % int(time.time())

3 comments on commit a2bbf63

Contributor

gtaylor replied Jan 26, 2012

Does this mean this is no longer needed?:

class DynamoDBExpiredTokenError(BotoServerError):

Wasn't sure.

Owner

garnaat replied Jan 26, 2012

Probably not. Actually, I added a DynamoDBResponseError to boto/exceptions.py. We should reconcile these. Maybe I should be using the specific errors you created.

Contributor

gtaylor replied Jan 26, 2012

It seems like it's good to fall back to the more generic error if we don't specifically see something we can raise an existing exception class for. Error handling may vary based on what exception occured, and having explicit exception classes does document a lot of the possibile errors.

But that's just my opinion, so others may not agree.

Please sign in to comment.