forked from ask/ghettoq
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Ask Solem
authored and
Ask Solem Hoel
committed
Nov 5, 2009
1 parent
ff66b56
commit 56a2922
Showing
1 changed file
with
136 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
from carrot.backends.base import BaseBackend, BaseMessage | ||
from anyjson import serialize, deserialize | ||
from ghettoq.backends import Connection | ||
from itertools import count | ||
from ghettoq.messaging import Empty as QueueEmpty | ||
from uuid import uuid4 | ||
|
||
|
||
class Message(BaseMessage): | ||
|
||
def __init__(self, backend, payload, **kwargs): | ||
self.backend = backend | ||
|
||
payload = deserialize(payload) | ||
kwargs["body"] = payload.get("body") | ||
kwargs["delivery_tag"] = payload.get("delivery_tag") | ||
kwargs["content_type"] = payload.get("content_type") | ||
kwargs["content_encoding"] = payload.get("content_encoding") | ||
kwargs["priority"] = payload.get("priority") | ||
|
||
super(Message, self).__init__(backend, **kwargs) | ||
|
||
def ack(self): | ||
pass | ||
|
||
def reject(self): | ||
raise NotImplementedError( | ||
"The GhettoQ backend does not implement basic.reject") | ||
|
||
def requeue(self): | ||
raise NotImplementedError( | ||
"The GhettoQ backend does not implement requeue") | ||
|
||
|
||
class Backend(BaseBackend): | ||
Message = Message | ||
default_port = None | ||
type = "Redis" | ||
|
||
def __init__(self, connection, **kwargs): | ||
self.type = kwargs.get("type", self.type) | ||
self.connection = connection | ||
self._consumers = {} | ||
self._callbacks = {} | ||
|
||
def establish_connection(self): | ||
conninfo = self.connection | ||
return Connection(self.type, host=conninfo.hostname, | ||
user=conninfo.userid, | ||
password=conninfo.password, | ||
database=conninfo.virtual_host, | ||
port=conninfo.port) | ||
|
||
def close_connection(self, connection): | ||
connection.close() | ||
|
||
def queue_exists(self, queue): | ||
return True | ||
|
||
|
||
def queue_purge(self, queue, **kwargs): | ||
# TODO | ||
|
||
def declare_consumer(self, queue, no_ack, callback, consumer_tag, | ||
**kwargs): | ||
# FIXME | ||
self._consumers[consumer_tag] = queue | ||
self._callbacks[queue] = callback | ||
|
||
def consume(self, limit=None): | ||
it = self.channel.get_multi(self._consumers.values()) | ||
for total_message_count in count(): | ||
if limit and total_message_count >= limit: | ||
raise StopIteration | ||
while True: | ||
payload, queue = it.next() | ||
if payload: | ||
break | ||
|
||
if not queue or queue not in self._callbacks: | ||
continue | ||
|
||
# Process payload here (content-type etc) | ||
self._callbacks[queue](payload) | ||
|
||
yield True | ||
|
||
def queue_declare(self, queue, *args, **kwargs): | ||
pass | ||
|
||
def get(self, queue, **kwargs): | ||
try: | ||
payload = self.channel.Queue(queue).get() | ||
except QueueEmpty: | ||
return None | ||
else: | ||
return self.message_to_python(payload) | ||
|
||
def ack(self, frame): | ||
pass | ||
|
||
def message_to_python(self, raw_message): | ||
return self.Message(backend=self, frame=raw_message) | ||
|
||
def prepare_message(self, message_data, delivery_mode, priority=0, | ||
content_type=None, content_encoding=None): | ||
return {"body": message_data, | ||
"delivery_tag": str(uuid4()), | ||
"priority": priority or 0, | ||
"content-encoding": content_encoding, | ||
"content-type": content_type} | ||
|
||
def publish(self, message, exchange, routing_key, **kwargs): | ||
self.channel.Queue(exchange).put(serialize(message)) | ||
|
||
def cancel(self, consumer_tag): | ||
if not self._channel: | ||
return | ||
self._consumers.pop(consumer_tag, None) | ||
|
||
def close(self): | ||
for consumer_tag in self._consumers.keys(): | ||
self.cancel(consumer_tag) | ||
if self._channel: | ||
self._channel.close() | ||
self._channel = None | ||
|
||
@property | ||
def channel(self): | ||
if not self._channel: | ||
# Need one connection per channel (use AMQP if that is a problem) | ||
self._channel = self.establish_connection() | ||
return self._channel | ||
|
||
|
||
|