Skip to content

Commit

Permalink
Add missing ack to impl_qpid.
Browse files Browse the repository at this point in the history
Fix bug 1012374.

Johannes Erdfelt pointed out that impl_qpid wasn't acking messages that
it received.  This turned out to be a nasty oversight, resulting in
unbounded message queue growth inside of the python-qpid library.  This
fixes it.

Change-Id: I0370293807f0282e1dbdd59246f70be031e888a9
(cherry picked from commit d15f303)
  • Loading branch information
russellb authored and vishvananda committed Jun 14, 2012
1 parent c3376bb commit 1dc9f19
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
7 changes: 6 additions & 1 deletion nova/rpc/impl_qpid.py
Expand Up @@ -138,7 +138,12 @@ def reconnect(self, session):
def consume(self):
"""Fetch the message and pass it to the callback object"""
message = self.receiver.fetch()
self.callback(message.content)
try:
self.callback(message.content)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
self.session.acknowledge(message)

def get_receiver(self):
return self.receiver
Expand Down
4 changes: 4 additions & 0 deletions nova/tests/rpc/test_qpid.py
Expand Up @@ -265,23 +265,27 @@ def _test_call(self, multi):
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
{"result": "foo", "failure": False, "ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
if multi:
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(
qpid.messaging.Message(
{"result": "bar", "failure": False,
"ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(
qpid.messaging.Message(
{"result": "baz", "failure": False,
"ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
{"failure": False, "ending": True}))
self.mock_session.acknowledge(mox.IgnoreArg())
self.mock_session.close()
self.mock_connection.session().AndReturn(self.mock_session)

Expand Down

0 comments on commit 1dc9f19

Please sign in to comment.