Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
tree: feee3362f5
Fetching contributors…

Cannot retrieve contributors at this time

file 272 lines (226 sloc) 10.778 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
"""
AsyncBoto

Thomas Parslow 2011
tom@almostobsolete.net

Make Boto operate in an Asynchronous way using Tornado.

I've only tested with SimpleDB so far (and then only a little bit)
but you should be able to do the same with all the other types of
connection in Boto.

THIS IS RATHER EARLY RIGHT NOW, IT'S NOT PROPERLY TESTED AND IT MAY
JUST BE A STUPID IDEA ALTOGETHER. I JUST WANT TO GET SOME FEEDBACK.
DON'T BE STUPID AND USE THIS FOR SOMETHING IMPORTANT. THANKS :)

"""
import boto.sdb.connection
import boto.s3.connection
import tornado.httpclient
import tornado.ioloop
import urlparse
import functools
import hashlib

class AsyncCallInprogress(Exception):
    pass

class AsyncHttpResponse(object):
    """
Emulates the parts of httplib's HTTPResponse that Boto uses.
"""
    def __init__(self, status, reason, body, headers):
        self.status = status
        self.reason = reason
        self.body = body
        self.headers = headers

    def read(self):
        return self.body
    
    def getheader(self, name):
        return self.headers.get(name,None)

class AsyncHttpConnection(object):
    """
This class replaces the httplib HTTPConnection used by boto. Only
the bits actually used by Boto are provided (at least I hope they
are!).

When a request is actually made (which happens in "getresponse")
the cache of previous requests (for this particular connection,
which would live no longer than a single boto async call) and if
one is found it returns it right away.

If no request is found one is started and a AsyncCallInprogress
exception is raised. This causes the call in progress to be
stopped for the moment. When the response actually comes through
the Boto call is made again, but this time when it gets to the bit
where it calls getresponse a response is ready and waiting.

This cycle could happen a few times during a an async call into
Boto but during the final call through all the requests will be in
the cache and thus the call will complete without raising an
AsyncCallInprogress.
"""
    def __init__(self, aws_connection, fn, callback, errback):
        self.aws_connection = aws_connection

        self.fn = fn
        self.callback = callback
        self.errback = errback
        
        self.host = None
        self.is_secure = None

        self.response_cache = {}

    def _get_request_sig(self):
        """
request_sig is a tuple that can be used to look up a request.
It consists of the request details minus the timestamp and the
signature (which will differ if they're called even a tiny bit
of time apart from eachother).

The request sig is based on the current values of the member
variables.
"""
        s = self.path.split("?",1)
        path_minus_query = s[0]
        query = s[1] if len(s) > 1 else ""
        query = [(k,tuple(v)) for k,v in urlparse.parse_qs(query).items() if k not in ["Timestamp","Signature"]]
        query.sort()
        headers = [(k,tuple(v)) for k,v in self.headers.items() if k not in ["Date", "Authorization"]]
        headers.sort()
        
        # Hash this so that large body's aren't kept around when not
        # needed
        return hashlib.md5(repr((self.is_secure, self.host, self.method,
                            path_minus_query, tuple(query), self.data,
                            tuple(headers)))).hexdigest()
        
    def request(self, method, path, data, headers):
        """
In the httplib version this would actually make the request,
but here we just store the params ready for use in the
getresponse method.
"""
        self.method = method
        self.path = path
        self.data = data
        self.headers = headers

    def _callback(self, request_sig, tornado_response):
        """
The Tornado httpclient callback. Is partially applied to the
request_sig so the response can be stuffed into response_cache.
"""
        response = AsyncHttpResponse(
            tornado_response.code, "???", tornado_response.body, tornado_response.headers)
        self.response_cache[request_sig] = response
        #self.aws_connection._async_http_connection = self
        #retry the call, with the response in place this time
        self.aws_connection._call_async(self.fn, callback=self.callback, errback=self.errback, async_http_connection=self)
        
    def getresponse(self):
        """
Checks response_cache, if we already have a response for this
request then return it otherwise start the Async call and
raise AsynCallInProgress.
"""
        request_sig = self._get_request_sig()
        if request_sig in self.response_cache:
            # We already made the request and got the response, carry
            # on!
            return self.response_cache[request_sig]
        # Ok, we need to make a request
        http_client = tornado.httpclient.AsyncHTTPClient()
        if self.is_secure:
            schema = "https"
        else:
            schema = "http"
        url = "%s://%s%s" % (schema, self.host, self.path)
        request = tornado.httpclient.HTTPRequest(url,self.method, self.headers, self.data or None)
        http_client.fetch(request, functools.partial(self._callback,request_sig))
        raise AsyncCallInprogress


class AsyncConnectionMixin(object):
    """
Mixin to replace get_http_connection and put_http_connection in a
subclass of AWSAuthConnection from Boto to create an Async version
of a connection class.

All calls to methods in the new Async version must be wrapped in
call_async calls to make then operate asynchronously. For example:

sdb_conn.call_async(
lambda : sdb_conn.get_domain("my_test_domain").get_attributes("hello"),
callback=hello)

call_async sets up AsyncHttpConnection with the callback and also
traps the AsyncCallInProgress exception.

Exceptions are trapped and send to the "errback" callback.
"""
    def call_async(self, fn, callback=lambda x : None, errback=None):
        return self._call_async(fn, callback, errback)
        
    def _call_async(self, fn, callback, errback, async_http_connection=None):
        self._async_http_connection = async_http_connection
        try:
            if not self._async_http_connection:
                self._async_http_connection = AsyncHttpConnection(self, fn, callback,errback)
            try:
                ret = fn()
            except AsyncCallInprogress:
                pass
            except Exception, e:
                if errback is None:
                    raise
                else:
                    tornado.ioloop.IOLoop.instance().add_callback(lambda : errback(e))
            else:
                # When a call finally succeeds without raising
                # AsyncCallInprogress we then need to pass control to the
                # callback.
                #
                # This could also happen first time if the call doesn't
                # involve any HTTP requests. But call_async would still
                # return right away and have the callback called on the
                # next interation of the IOLoop
                tornado.ioloop.IOLoop.instance().add_callback(lambda : callback(ret))
        finally:
            del self._async_http_connection
        
    def get_http_connection(self, host, is_secure):
        """
This is called to get an HTTP connection from the pool. This
is the point at which we inject our replacement http connection
"""
        if hasattr(self, "_async_http_connection"):
            self._async_http_connection.host = host
            self._async_http_connection.is_secure = is_secure
            return self._async_http_connection
        else:
            # This hasn't been called from within an async_call so
            # just allow it to do a normal synchronous call
            return super(AsyncConnectionMixin, self).get_http_connection(host,is_secure)

    def put_http_connection(self, *args, **kwargs):
        if not hasattr(self, "_async_http_connection"):
            super(AsyncConnectionMixin, self).put_http_connection(*args, **kwargs)

# I'm defining some connections here but you shoudl be able to use
# this with any boto connection object. It's just that I haven't
# tested the ones not here at all (as opposed to the literally
# *minutes* of testing that I have given these ones)
            
class AsyncSDBConnection(AsyncConnectionMixin, boto.sdb.connection.SDBConnection):
    pass

class AsyncS3Connection(AsyncConnectionMixin, boto.s3.connection.S3Connection):
    pass


if __name__ == "__main__":
    from tornado.options import define, options
    import random
    
    define("aws_access_key_id", type=str)
    define("aws_access_key_secret", type=str)
    tornado.options.parse_command_line()
    
    sdb_conn = AsyncSDBConnection(options.aws_access_key_id, options.aws_access_key_secret)
    def callback2(ret):
        print "Return from get was: ", ret
    def callback1(ret):
        print "Return from put was:", ret
        sdb_conn.call_async(lambda : sdb_conn.get_domain("mytest").get_attributes("boom"), callback=callback2)
        
    sdb_conn.call_async(lambda : sdb_conn.create_domain("mytest").put_attributes("boom", {"hello": "goodbye"}), callback=callback1)

    # Test errback
    def callback3(ret):
        assert False, "We were expecting an error!"
    def errback3(exception):
        print "Exception received: ", exception
    sdb_conn.call_async(lambda : sdb_conn.get_domain("i_do_not_exist"), callback=callback3, errback=errback3)

    s3_conn = AsyncS3Connection(options.aws_access_key_id, options.aws_access_key_secret)
    bucket_name = "test_bucket." + str(random.random())


    
    def s3callback1(bucket):
        print "The Bucket:", bucket
        def s3callback2(key):
            print "The Key:", key
            def s3callback3(_ignore):
                def s3callback4(contents):
                    print "Contents:", contents
                    # clean up by deleting the bucket
                    s3_conn.call_async(lambda : key.delete(), lambda _ : s3_conn.call_async(lambda :bucket.delete()))
                s3_conn.call_async(lambda : key.get_contents_as_string(), callback=s3callback4)
            s3_conn.call_async(lambda : key.set_contents_from_string("hello!"), callback=s3callback3)
        s3_conn.call_async(lambda : bucket.new_key("hello"), callback=s3callback2)
                                   
    s3_conn.call_async(lambda : s3_conn.create_bucket(bucket_name), callback=s3callback1)

    print "Start loop"
    tornado.ioloop.IOLoop.instance().start()
Something went wrong with that request. Please try again.