You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
A colleague of mine noticed that I had these peppered through some consumer daemons:
for m in self.iterconsume():
if self.closed:
break
To handle signals for a graceful daemon close, we close the channel asynchronously. Which will cause an exception in the iterconsume or when it tries to fetch the next message. I put the flag there to prevent some wild exception from bleeding through. As a test, we modified the pyamqplib backend to raise a StopIteration when the channel is closed (sorry for the lack of a diff):
def consume(self, limit=None):
"""Returns an iterator that waits for one message at a time."""
for total_message_count in count():
if limit and total_message_count >= limit:
raise StopIteration
if not self.channel.is_open:
raise StopIteration
self.channel.wait()
yield True
There is also the case where an amqp exception can be raised by the channel.wait call, but I defer the handling of that up to you (catch AMQPException and raise StopIteration or let the Exception bleed through; I am for the latter).
Let me know if you need any other information on this.
The text was updated successfully, but these errors were encountered:
A colleague of mine noticed that I had these peppered through some consumer daemons:
for m in self.iterconsume():
if self.closed:
break
To handle signals for a graceful daemon close, we close the channel asynchronously. Which will cause an exception in the iterconsume or when it tries to fetch the next message. I put the flag there to prevent some wild exception from bleeding through. As a test, we modified the pyamqplib backend to raise a StopIteration when the channel is closed (sorry for the lack of a diff):
def consume(self, limit=None):
"""Returns an iterator that waits for one message at a time."""
for total_message_count in count():
if limit and total_message_count >= limit:
raise StopIteration
There is also the case where an amqp exception can be raised by the channel.wait call, but I defer the handling of that up to you (catch AMQPException and raise StopIteration or let the Exception bleed through; I am for the latter).
Let me know if you need any other information on this.
The text was updated successfully, but these errors were encountered: