Skip to content

Commit

Permalink
Amazon SQS transport passing functional tests. Sponsored by the good …
Browse files Browse the repository at this point in the history
…guys at Yipit.com!
  • Loading branch information
ask committed May 15, 2011
1 parent 2644950 commit 1ab629c
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 3 deletions.
22 changes: 22 additions & 0 deletions funtests/tests/test_SQS.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import os

from nose import SkipTest

from funtests import transport


class test_SQS(transport.TransportCase):
transport = "SQS"
prefix = "sqs"
sep = "-" # SQS queue names cannot include '.'
event_loop_max = 100
message_size_limit = 4192 # SQS max body size / 2.

def before_connect(self):
if "AWS_ACCESS_KEY_ID" not in os.environ:
raise SkipTest("Missing envvar AWS_ACCESS_KEY_ID")
if "AWS_SECRET_ACCESS_KEY" not in os.environ:
raise SkipTest("Missing envvar AWS_SECRET_ACCESS_KEY")

def after_connect(self, connection):
connection.channel().client
9 changes: 8 additions & 1 deletion funtests/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ def callback(message_data, message):
class TransportCase(unittest.TestCase):
transport = None
prefix = None
sep = '.'
userid = None
password = None
event_loop_max = 100
connection_options = {}

Expand Down Expand Up @@ -86,6 +89,10 @@ def purge(self, names):
map(chan.queue_purge, names)

def get_connection(self, **options):
if self.userid:
options.setdefault("userid", self.userid)
if self.password:
options.setdefault("password", self.password)
return BrokerConnection(transport=self.transport, **options)

def do_connect(self):
Expand Down Expand Up @@ -160,7 +167,7 @@ def test_produce__consume_large_messages(self, bytes=1048576, n=10,
self.purge([self.queue.name])

def P(self, rest):
return "%s.%s" % (self.prefix, rest)
return "%s%s%s" % (self.prefix, self.sep, rest)

def test_produce__consume_multiple(self):
if not self.verify_alive():
Expand Down
80 changes: 80 additions & 0 deletions kombu/transport/SQS.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@

"""
kombu.transport.SQS
===================
Amazon SQS transport.
:copyright: (c) 2010 - 2011 by Ask Solem
:license: BSD, see LICENSE for more details.
"""
from Queue import Empty

from anyjson import serialize, deserialize
from boto.sqs.connection import SQSConnection
from boto.sqs.message import Message

from kombu.transport import virtual
from kombu.utils import cached_property

class Channel(virtual.Channel):
_client = None

def _new_queue(self, queue, **kwargs):
return self.client.create_queue(queue, self.visibility_timeout)

def _get(self, queue):
q = self._new_queue(queue)
rs = q.get_messages(1)
if rs:
return deserialize(rs[0].get_body())
raise Empty()

def _size(self, queue):
return self._new_queue(queue).count()

def _put(self, queue, message, **kwargs):
q = self._new_queue(queue)
m = Message()
m.set_body(serialize(message))
q.write(m)

def _purge(self, queue):
q = self._new_queue(queue)
size = q.count()
q.clear()
return size

def close(self):
super(Channel, self).close()
if self._client:
try:
self._client.close()
except AttributeError, exc: # FIXME ???
if "can't set attribute" not in str(exc):
raise

def _open(self):
conninfo = self.connection.client
return SQSConnection(conninfo.userid, conninfo.password)

@property
def client(self):
if self._client is None:
self._client = self._open()
return self._client

@cached_property
def visibility_timeout(self):
options = self.connection.client.transport_options
return options.get("visibility_timeout")


class Transport(virtual.Transport):
Channel = Channel

interval = 1
default_port = None
connection_errors = ()
channel_errors = ()
2 changes: 1 addition & 1 deletion kombu/transport/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ def __inner():
You should replace %r with simply: %r
""" % (name, gtransport, this))
print("TTT: %r" % ktransport)
return ktransport

return __inner
Expand All @@ -75,6 +74,7 @@ def __inner():
"syncpika": "kombu.transport.pypika.SyncTransport",
"memory": "kombu.transport.memory.Transport",
"redis": "kombu.transport.pyredis.Transport",
"SQS": "kombu.transport.SQS.Transport",
"beanstalk": "kombu.transport.beanstalk.Transport",
"mongodb": "kombu.transport.mongodb.Transport",
"couchdb": "kombu.transport.pycouchdb.Transport",
Expand Down
3 changes: 2 additions & 1 deletion kombu/transport/pypika.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ def on_data_available(self, buf):
class SyncTransport(base.Transport):
default_port = DEFAULT_PORT

connection_errors = (exceptions.ConnectionClosed,
connection_errors = (socket.error,
exceptions.ConnectionClosed,
exceptions.ChannelClosed,
exceptions.LoginError,
exceptions.NoFreeChannels,
Expand Down

0 comments on commit 1ab629c

Please sign in to comment.