New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
channel.build_inbound_messages with break_on_empty=True randomly breaks loop #63
Comments
Thanks for the report @TomGudman . This does indeed look like a bug. I might have to update the documentation for break_on_empty, as there is no guarantee that it will actually consume until the queue is actually empty. If the network or RabbitMQ for some reason is delivering new messages slow, it may kill the for loop. I added an extra sleep to help compensate for this, but ideally you probably want to check the queue length once done to verify. I'll probably need to follow up with some test adjustments before I can release this, but if you have time, please test the above patch. |
So even with the bug fix this is going to be an issue. However, if you set no_ack to True you will be practically guaranteed to empty the queue, but with no_ack=True and QoS you will probably not have the same luck. |
Hi Erik,
Thank you for following up. I am sorry I have not found the time yet but
I promise I will test the below and the required patch. Note: Unless I
am wrong, I could not find in your docs how to get the queue count and I
have just seen in 'is_queue_empty' how to do so. I guess that would
great info to highlight.
Will reply soon (probably next week though).
Thanks again Erik for your help and support.
Thomas
On 22.10.2018 13:13, Erik Olof Gunnar Andersson wrote:
So even with the bug fix this is going to be an issue. However, if you set no_ack to True you will be practically guaranteed to empty the queue, but with no_ack=True and QoS you will probably not have the same luck.
A more robust solution would probably look something like this.
import logging
import amqpstorm
from amqpstorm import Connection
logging.basicConfig(level=logging.DEBUG)
def is_queue_empty(channel, queue):
try:
result = channel.queue.declare(queue, passive=True)
except amqpstorm.exception.AMQPChannelError:
return False
return not result['message_count']
def start_consumer():
with Connection('127.0.0.1', 'guest', 'guest') as connection:
with connection.channel() as channel:
# Declare the Queue, 'simple_queue'.
channel.queue.declare('simple_queue')
# Start consuming the queue 'simple_queue' using the callback
# 'on_message' and last require the message to be acknowledged.
channel.basic.consume(queue='simple_queue', no_ack=False)
while not is_queue_empty(channel, 'simple_queue'):
for message in channel.build_inbound_messages(
break_on_empty=True):
print("Message:", message.body)
message.ack()
if __name__ == '__main__':
start_consumer()
--
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub [1], or mute the thread [2].
|
I would recommend this example. Pretty sure this is the best practice. |
Hi,
break_on_empty=True
inchannel.build_inbound_messages
is set, it randomly gets between 1 to 15 messages before exiting whereas there are still thousands of messages in the queue. The queue contains 10k messages, no publishers, the queue is still. Each run of the below code would get a random amount of messages and then exit. Withoutbreak_on_empty
, it gets the 10k messages in less than 10sec but then it remains connected which I don't want.An edited version of the code I used.
ref: https://www.amqpstorm.io/api/channel.html#amqpstorm.Channel.build_inbound_messages
Thanks for this library.
The text was updated successfully, but these errors were encountered: