Permalink
Browse files

Channel.basic_cancel, Channel.flow, Channel.recover, + (read on)

- Use array.array to keep track of unusued channel id's.

- All ConnectionType channel methods now consistently have
  channel as the first argument.

- Fixes compiler warning in basic_qos.

- Reset frame_max, channel_max and heartbeat after connection_tune.

- Renamed _librabbitmq.connection -> Connection

- More implementation at C level.

- basic_recv now raises socket.timeout directly.

- Refactor C code.

- Connection.__repr__

- Adds read-only _librabbitmq.Connection attributes: hostname, userid,
  password, virtual_host, port, frame_max, channel_max, heartbeat,
  connected.
  • Loading branch information...
1 parent 7af09a3 commit 7c46148629d1a6f2984594abd36365fe22319dab @ask ask committed May 23, 2012
Showing with 441 additions and 199 deletions.
  1. +5 −0 Makefile
  2. +6 −1 TODO
  3. +48 −54 librabbitmq/__init__.py
  4. +211 −76 librabbitmq/_rabbitmqmodule.c
  5. +110 −52 librabbitmq/_rabbitmqmodule.h
  6. +61 −16 setup.py
View
@@ -45,3 +45,8 @@ distclean: pyclean rabbitmq-distclean
dist: distclean rabbitmq-c
+
+
+rebuild:
+ python setup.py build
+ python setup.py install
View
7 TODO
@@ -1,2 +1,7 @@
-Please see our Issue Tracker at GitHub:
+- exchange_unbind
+- heartbeats
+- publisher confirms
+- capabilities
+
+Also please see our Issue Tracker at GitHub:
http://github.com/celery/librabbitmq/issues
View
@@ -3,6 +3,8 @@
import _librabbitmq
+from array import array
+
__version__ = _librabbitmq.__version__
__author__ = _librabbitmq.__author__
__contact__ = _librabbitmq.__contact__
@@ -49,14 +51,17 @@ def __init__(self, connection, channel_id):
self._callbacks = {}
def basic_qos(self, prefetch_size=0, prefetch_count=0, _global=False):
- return self.connection._basic_qos(prefetch_size, prefetch_count,
- _global, self.channel_id)
+ return self.connection._basic_qos(self.channel_id,
+ prefetch_size, prefetch_count, _global)
+
+ def flow(self, active):
+ return self.connection._flow(self.channel_id, active)
- def flow(self, enabled):
- pass
+ def recover(self, requeue=True):
+ return self.connection._basic_recover(self.channel_id, requeue)
def basic_get(self, queue="", no_ack=False):
- frame = self.connection._basic_get(queue, no_ack, self.channel_id)
+ frame = self.connection._basic_get(self.channel_id, queue, no_ack)
if frame is not None:
return(self.Message(frame["body"],
frame["properties"],
@@ -66,11 +71,9 @@ def basic_get(self, queue="", no_ack=False):
def basic_consume(self, queue="", consumer_tag=None, no_local=False,
no_ack=False, exclusive=False, callback=None, nowait=False):
if consumer_tag is None:
- consumer_tag = str(self.next_consumer_tag())
- consumer_tag = self.connection._basic_consume(queue, consumer_tag,
- no_local, no_ack,
- exclusive,
- self.channel_id)
+ consumer_tag = self.next_consumer_tag()
+ consumer_tag = self.connection._basic_consume(self.channel_id,
+ queue, str(consumer_tag), no_local, no_ack, exclusive)
self._callbacks[consumer_tag] = callback
if no_ack:
self.no_ack_consumers.add(consumer_tag)
@@ -88,15 +91,16 @@ def _event(self, event):
raise ChannelError("Message to unknown consumer tag %r" % (tag, ))
def basic_ack(self, delivery_tag, multiple=False):
- return self.connection._basic_ack(delivery_tag, multiple,
- self.channel_id)
+ return self.connection._basic_ack(self.channel_id,
+ delivery_tag, multiple)
def basic_reject(self, delivery_tag, requeue=True):
- return self.connection._basic_reject(delivery_tag, requeue,
- self.channel_id)
+ return self.connection._basic_reject(self.channel_id,
+ delivery_tag, requeue)
def basic_cancel(self, consumer_tag, **kwargs):
self.no_ack_consumers.discard(consumer_tag)
+ self.connection._basic_cancel(self.channel_id, consumer_tag)
def basic_publish(self, message, exchange="", routing_key="",
mandatory=False, immediate=False):
@@ -105,43 +109,43 @@ def basic_publish(self, message, exchange="", routing_key="",
mandatory, immediate)
def queue_purge(self, queue, nowait=False):
- return self.connection._queue_purge(queue, nowait, self.channel_id)
+ return self.connection._queue_purge(self.channel_id, queue, nowait)
def exchange_declare(self, exchange="", type="direct",
passive=False, durable=False, auto_delete=False, arguments=None,
nowait=False):
- return self.connection._exchange_declare(exchange, type,
- self.channel_id, passive, durable, auto_delete)
+ return self.connection._exchange_declare(self.channel_id,
+ exchange, type, passive, durable, auto_delete)
def exchange_delete(self, exchange="", if_unused=False):
- return self.connection._exchange_delete(exchange,
- self.channel_id, if_unused)
+ return self.connection._exchange_delete(self.channel_id,
+ exchange, if_unused)
def queue_declare(self, queue="", passive=False, durable=False,
exclusive=False, auto_delete=False, arguments=None,
nowait=False):
- return self.connection._queue_declare(queue,
- self.channel_id, passive, durable, exclusive, auto_delete)
+ return self.connection._queue_declare(self.channel_id,
+ queue, passive, durable, exclusive, auto_delete)
def queue_bind(self, queue="", exchange="", routing_key="",
arguments=None, nowait=False):
- return self.connection._queue_bind(queue, exchange, routing_key,
- self.channel_id)
+ return self.connection._queue_bind(self.channel_id,
+ queue, exchange, routing_key)
def queue_unbind(self, queue="", exchange="", binding_key="",
nowait=False):
- return self.connection._queue_unbind(queue, exchange, binding_key,
- self.channel_id)
+ return self.connection._queue_unbind(self.channel_id,
+ queue, exchange, binding_key)
def queue_delete(self, queue="", if_unused=False, if_empty=False):
- return self.connection._queue_delete(queue, self.channel_id,
- if_unused, if_empty)
+ return self.connection._queue_delete(self.channel_id,
+ queue, if_unused, if_empty)
def close(self):
self.connection._remove_channel(self)
-class Connection(_librabbitmq.connection):
+class Connection(_librabbitmq.Connection):
"""Create a connection to the specified host, which should be
a ``'host[:port]'`` string, such as ``'localhost'``, or ``'1.2.3.4:5672'``
@@ -150,30 +154,22 @@ class Connection(_librabbitmq.connection):
"""
Channel = Channel
-
channels = {}
- channel_max = 0xffff
- frame_max = 131072
- heartbeat = 0
def __init__(self, host="localhost", userid="guest", password="guest",
- virtual_host="/", port=5672, **kwargs):
+ virtual_host="/", port=5672, channel_max=0xffff,
+ frame_max=131072, heartbeat=0, **kwargs):
if ":" in host:
host, port = host.split(":")
-
- self.hostname = host
- self.port = int(port)
- self.userid = userid
- self.password = password
- self.virtual_host = virtual_host
- super(Connection, self).__init__(hostname=host, port=self.port,
- userid=userid, password=password,
- virtual_host=virtual_host,
- channel_max=self.channel_max,
- frame_max=self.frame_max,
- heartbeat=self.heartbeat)
+ super(Connection, self).__init__(hostname=host, port=int(port),
+ userid=userid, password=password,
+ virtual_host=virtual_host,
+ channel_max=channel_max,
+ frame_max=frame_max,
+ heartbeat=heartbeat)
self._do_connect()
+ self._avail_channel_ids = array('H', xrange(self.channel_max, 0, -1))
def reconnect(self):
self.close()
@@ -188,17 +184,14 @@ def drain_events(self, timeout=None):
timeout = 0.0
else:
timeout = float(timeout)
- try:
- event = self._basic_recv(timeout=timeout)
- except _librabbitmq.TimeoutError:
- raise socket.timeout(timeout)
+ event = self._basic_recv(timeout=timeout)
if event:
self.channels[event["channel"]]._event(event)
def channel(self, channel_id=None):
if channel_id is None:
channel_id = self._get_free_channel_id()
- if channel_id in self.channels:
+ elif channel_id in self.channels:
return self.channels[channel_id]
self._channel_open(channel_id)
@@ -214,12 +207,13 @@ def _remove_channel(self, channel):
except ChannelError:
pass
self.channels.pop(channel.channel_id, None)
+ self._avail_channel_ids.append(channel.id)
def _get_free_channel_id(self):
- for i in xrange(1, self.channel_max + 1):
- if i not in self.channels:
- return i
- raise ConnectionError(
+ try:
+ return self._avail_channel_ids.pop()
+ except IndexError:
+ raise ConnectionError(
"No free channel ids, current=%d, channel_max=%d" % (
len(self.channels), self.channel_max))
Oops, something went wrong.

0 comments on commit 7c46148

Please sign in to comment.