Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Out of order ack-ing? #33

Closed
fake-name opened this issue Nov 19, 2016 · 49 comments
Closed

Out of order ack-ing? #33

fake-name opened this issue Nov 19, 2016 · 49 comments
Labels
Milestone

Comments

@fake-name
Copy link

fake-name commented Nov 19, 2016

Whoooo, more wierdness!

.Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': False, 'delivery_tag': 5, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
ACK For delivery tag: 5
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
Main.Connector.Internal(/rpcsys).Thread-23 - INFO - Timeout watcher loop. Current message counts: 0 (out: 0, in: 0)
Main.Connector.Container(/rpcsys).Thread-23 - INFO - Interface timeout thread. Ages: heartbeat -> 4.83, last message -> 32.51.
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Received message!
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': False, 'delivery_tag': 6, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
ACK For delivery tag: 6
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Received message!
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': True, 'delivery_tag': 7, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
ACK For delivery tag: 7
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - Error while in rx runloop!
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -       Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 6
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - Traceback (most recent call last):
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector/__init__.py", line 546, in _rx_poll
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     self.interface.process_rx_events()
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector/__init__.py", line 171, in process_rx_events
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     self.storm_channel.process_data_events(to_tuple=False)
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 266, in process_data_events
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     for message in self.build_inbound_messages(break_on_empty=True):
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 113, in build_inbound_messages
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     self.check_for_errors()
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 188, in check_for_errors
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     raise exception
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - amqpstorm.exception.AMQPChannelError: Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 6
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -
Main.Connector.Internal(/rpcsys).Thread-21 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
Main.Connector.Internal(/rpcsys).Thread-21 - INFO - RX Poll process dying. Threads_live: 1, had exception 1, should_die True
Main.Connector.Internal(/rpcsys).Thread-22 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
Main.Connector.Internal(/rpcsys).Thread-22 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
Main.Connector.Internal(/rpcsys).Thread-22 - INFO - TX Poll process dying (should die: True). Threads_live: 1, runstate: 1, resp queue size: 0, had exception 1.
Main.Connector.Internal(/rpcsys).Thread-23 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
Disconnecting!
Main.Connector.Container(/rpcsys).Thread-2 - INFO - Closing channel
Main.Connector.Container(/rpcsys).Thread-2 - INFO - Closing connection
Running state: True, thread alive: True, thread id:140037440014080
Joining _inbound_thread. Runstate: %s False

Context:

I have a connection with a thread processing the receiving messages. I have instrumented Message.ack() with a print statement that prints the delivery tag that it's acking.

It appears I'm calling [delivery tag 6].ack(), [delivery tag 7].ack(), and somehow the ack for delivery tag 7 is getting received by the rabbitmq server /first/, resulting in a PRECONDITION_FAILED error because acking 7 implicitly acks previous tags, and therefore 6 is not a valid delivery tag anymore.

I'm working on pulling out a testable example, but it's certainly odd.


Incidentally, the new docs pages are fancypants!

@eandersson eandersson added this to the 2.1.1 milestone Nov 19, 2016
@fake-name
Copy link
Author

fake-name commented Nov 19, 2016

Poking around in the transport:

Main.Connector.Container(/rpcsys).Thread-5 - INFO - Received message!
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7efdc5f8b5e8>
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Message properties: {'message_type': '', 'reply_to': '', 'content_type': '', 'expiration': '', 'content_encoding': 'utf-8', 'cluster_id': '', 'priority': None, 'delivery_mode': None, 'user_id': '', 'timestamp': None, 'headers': None, 'message_id': '', 'correlation_id': 'keepalive', 'app_id': ''}
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7efdc5f8b5e8>
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Message _method: {'routing_key': b'nak', 'delivery_tag': 6, 'consumer_tag': b'amq.ctag-avHnCiP0g_F5QXXney8Yng', 'redelivered': False, 'exchange': b'keepalive_exchange139628461245864'}
ACK For delivery tag: 6 time: 1479537186.62291
Writing ack frame:  <pamqp.specification.Basic.Ack object at 0x7efdc47685f8> 6
Writing:  b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00\x06\x00\xce'
Write finished.
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Heartbeat packet received! wat
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Received message!
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7efdc5f8b5e8>
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Message properties: {'message_type': '', 'reply_to': '', 'content_type': '', 'expiration': '', 'content_encoding': 'utf-8', 'cluster_id': '', 'priority': None, 'delivery_mode': None, 'user_id': '', 'timestamp': None, 'headers': None, 'message_id': '', 'correlation_id': 'keepalive', 'app_id': ''}
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7efdc5f8b5e8>
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Message _method: {'routing_key': b'nak', 'delivery_tag': 7, 'consumer_tag': b'amq.ctag-avHnCiP0g_F5QXXney8Yng', 'redelivered': True, 'exchange': b'keepalive_exchange139628461245864'}
ACK For delivery tag: 7 time: 1479537187.0323284
Writing ack frame:  <pamqp.specification.Basic.Ack object at 0x7efdc47685c0> 7
Writing:  b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00\x07\x00\xce'
Write finished.
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Heartbeat packet received! wat
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR - Error while in rx runloop!
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -        Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 6
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR - Traceback (most recent call last):
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector/__init__.py", line 546, in _rx_poll
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -     self.interface.process_rx_events()
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector/__init__.py", line 171, in process_rx_events
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -     self.storm_channel.process_data_events(to_tuple=False)
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 266, in process_data_events
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -     for message in self.build_inbound_messages(break_on_empty=True):
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 113, in build_inbound_messages
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -     self.check_for_errors()
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 188, in check_for_errors
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -     raise exception
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR - amqpstorm.exception.AMQPChannelError: Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 6
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -
Main.Connector.Internal(/rpcsys).Thread-5 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
Main.Connector.Internal(/rpcsys).Thread-5 - INFO - RX Poll process dying. Threads_live: 1, had exception 1, should_die True
Main.Connector.Internal(/rpcsys).Thread-6 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
Main.Connector.Internal(/rpcsys).Thread-6 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
Main.Connector.Internal(/rpcsys).Thread-6 - INFO - TX Poll process dying (should die: True). Threads_live: 1, runstate: 1, resp queue size: 0, had exception 1.
Disconnecting!

I'm not sure /what's/ going on. It looks like everything correct is getting sent. The data going out the socket is correctly ordered.

@eandersson eandersson modified the milestones: 2.2.1, 2.1.1 Nov 19, 2016
@eandersson
Copy link
Owner

Hmm, that is odd. What version of RabbitMQ is this?

@fake-name
Copy link
Author

RabbitMQ 3.6.5, Erlang 19.1

@fake-name
Copy link
Author

Traceback to the error with transmit and received messages:

hange140231282460296', 'delivery_tag': 6, 'redelivered': False}
ACK For delivery tag: 6 time: 1479537621.3057892
Writing ack frame:  <pamqp.specification.Basic.Ack object at 0x7f8a206865c0> 6
Writing:  b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00\x06\x00\xce'
Write finished.
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Heartbeat packet received! wat
Writing:  b'\x01\x00\x01\x00\x00\x00-\x00<\x00(\x00\x00!keepalive_exchange140231282460296\x03nak\x00\xce\x02\x00\x01\x00\x00\x00\x1e\x00<\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03D\x00\x05utf-8\tkeepalive\xce\x03\x00\x01\x00\x00\x00\x03wat\xce'
Write finished.
Received data:  b'\x01\x00\x01\x00\x00\x00S\x00<\x00<\x1famq.ctag-0bpYUAPZNbw1l93izEDVEA\x00\x00\x00\x00\x00\x00\x00\x07\x00!keepalive_exchange140231282460296\x03nak\xce\x02\x00\x01\x00\x00\x00\x1e\x00<\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03D\x00\x05utf-8\tkeepalive\xce\x03\x00\x01\x00\x00\x00\x03wat\xce'
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Received message!
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f8a20ea75e8>
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Message properties: {'app_id': '', 'priority': None, 'content_encoding': 'utf-8', 'delivery_mode': None, 'reply_to': '', 'timestamp': None, 'message_id': '', 'cluster_id': '', 'message_type': '', 'content_type': '', 'expiration': '', 'correlation_id': 'keepalive', 'headers': None, 'user_id': ''}
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f8a20ea75e8>
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Message _method: {'routing_key': b'nak', 'consumer_tag': b'amq.ctag-0bpYUAPZNbw1l93izEDVEA', 'exchange': b'keepalive_exchange140231282460296', 'delivery_tag': 7, 'redelivered': False}
ACK For delivery tag: 7 time: 1479537626.4834366
Writing ack frame:  <pamqp.specification.Basic.Ack object at 0x7f8a2066dcc0> 7
Writing:  b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00\x07\x00\xce'
Write finished.
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Heartbeat packet received! wat
Received data:  b'\x08\x00\x00\x00\x00\x00\x00\xce'
Writing:  b'\x01\x00\x01\x00\x00\x00-\x00<\x00(\x00\x00!keepalive_exchange140231282460296\x03nak\x00\xce\x02\x00\x01\x00\x00\x00\x1e\x00<\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03D\x00\x05utf-8\tkeepalive\xce\x03\x00\x01\x00\x00\x00\x03wat\xce'
Write finished.
Main.Connector.Container(/rpcsys).Thread-7 - INFO - Interface timeout thread. Ages: heartbeat -> 4.83, last message -> 42.51.
Writing:  b'\x01\x00\x01\x00\x00\x00\x05\x00<\x00n\x01\xce'
Write finished.
Received data:  b'\x01\x00\x01\x00\x00\x00S\x00<\x00<\x1famq.ctag-0bpYUAPZNbw1l93izEDVEA\x00\x00\x00\x00\x00\x00\x00\x08\x00!keepalive_exchange140231282460296\x03nak\xce\x02\x00\x01\x00\x00\x00\x1e\x00<\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03D\x00\x05utf-8\tkeepalive\xce\x03\x00\x01\x00\x00\x00\x03wat\xce'
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Received message!
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f8a20ea75e8>
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Message properties: {'app_id': '', 'priority': None, 'content_encoding': 'utf-8', 'delivery_mode': None, 'reply_to': '', 'timestamp': None, 'message_id': '', 'cluster_id': '', 'message_type': '', 'content_type': '', 'expiration': '', 'correlation_id': 'keepalive', 'headers': None, 'user_id': ''}
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f8a20ea75e8>
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Message _method: {'routing_key': b'nak', 'consumer_tag': b'amq.ctag-0bpYUAPZNbw1l93izEDVEA', 'exchange': b'keepalive_exchange140231282460296', 'delivery_tag': 8, 'redelivered': False}
ACK For delivery tag: 8 time: 1479537631.497056
Writing ack frame:  <pamqp.specification.Basic.Ack object at 0x7f8a206865c0> 8
Writing:  b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00\x08\x00\xce'
Write finished.
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Heartbeat packet received! wat
Received data:  b'\x01\x00\x01\x00\x00\x00\x04\x00<\x00o\xce'
Received data:  b'\x01\x00\x01\x00\x00\x00S\x00<\x00<\x1famq.ctag-0bpYUAPZNbw1l93izEDVEA\x00\x00\x00\x00\x00\x00\x00\t\x01!keepalive_exchange140231282460296\x03nak\xce\x02\x00\x01\x00\x00\x00\x1e\x00<\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03D\x00\x05utf-8\tkeepalive\xce\x03\x00\x01\x00\x00\x00\x03wat\xce'
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Received message!
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f8a20ea75e8>
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Message properties: {'app_id': '', 'priority': None, 'content_encoding': 'utf-8', 'delivery_mode': None, 'reply_to': '', 'timestamp': None, 'message_id': '', 'cluster_id': '', 'message_type': '', 'content_type': '', 'expiration': '', 'correlation_id': 'keepalive', 'headers': None, 'user_id': ''}
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f8a20ea75e8>
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Message _method: {'routing_key': b'nak', 'consumer_tag': b'amq.ctag-0bpYUAPZNbw1l93izEDVEA', 'exchange': b'keepalive_exchange140231282460296', 'delivery_tag': 9, 'redelivered': True}
ACK For delivery tag: 9 time: 1479537631.5497563
Writing ack frame:  <pamqp.specification.Basic.Ack object at 0x7f8a206865c0> 9
Writing:  b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00\t\x00\xce'
Write finished.
Main.Connector.Container(/rpcsys).Thread-5 - INFO - Heartbeat packet received! wat
Received data:  b'\x01\x00\x01\x00\x00\x007\x00\x14\x00(\x01\x96,PRECONDITION_FAILED - unknown delivery tag 8\x00<\x00P\xce'
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR - Error while in rx runloop!
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -        Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 8
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR - Traceback (most recent call last):
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector/__init__.py", line 546, in _rx_poll
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -     self.interface.process_rx_events()
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector/__init__.py", line 171, in process_rx_events
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -     self.storm_channel.process_data_events(to_tuple=False)
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 266, in process_data_events
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -     for message in self.build_inbound_messages(break_on_empty=True):
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 113, in build_inbound_messages
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -     self.check_for_errors()
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 188, in check_for_errors
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -     raise exception
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR - amqpstorm.exception.AMQPChannelError: Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 8
Main.Connector.Internal(/rpcsys).Thread-5 - ERROR -
Main.Connector.Internal(/rpcsys).Thread-5 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
Main.Connector.Internal(/rpcsys).Thread-5 - INFO - RX Poll process dying. Threads_live: 1, had exception 1, should_die True
Main.Connector.Internal(/rpcsys).Thread-6 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
Main.Connector.Internal(/rpcsys).Thread-6 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
Main.Connector.Internal(/rpcsys).Thread-6 - INFO - TX Poll process dying (should die: True). Threads_live: 1, runstate: 1, resp queue size: 0, had exception 1.
Disconnecting!
Main.Connector.Container(/rpcsys).Thread-2 - INFO - Closing channel
Main.Connector.Container(/rpcsys).Thread-2 - INFO - Closing connection
Writing:  b'\x01\x00\x00\x00\x00\x00\x0b\x00\n\x002\x00\x00\x00\x00\x00\x00\x00\xce'
Write finished.
Running state: True, thread alive: True, thread id:140231118403328
Joining _inbound_thread. Runstate: %s False

@eandersson
Copy link
Owner

How about on the RabbitMQ side?

I found some similar errors on google, but they are pretty old (2-3 years ago at least).

@fake-name
Copy link
Author

fake-name commented Nov 19, 2016

Only thing in the rabbitmq logs is:


=ERROR REPORT==== 19-Nov-2016::07:41:22 ===
Channel error on connection <0.11655.1> (>snip<:56697 -> >snip<:5671, vhost: '/rpcsys', user: 'rpc_client'), channel 1:
operation basic.ack caused a channel exception precondition_failed: "unknown delivery tag 8"

=INFO REPORT==== 19-Nov-2016::07:41:24 ===
closing AMQP connection <0.11655.1> (>snip<:56697 -> >snip<:5671)

=INFO REPORT==== 19-Nov-2016::07:41:29 ===
accepting AMQP connection <0.12135.1> (>snip<:39801 -> >snip<:5671)

=ERROR REPORT==== 19-Nov-2016::07:42:13 ===
Channel error on connection <0.12135.1> (>snip<:39801 -> >snip<:5671, vhost: '/rpcsys', user: 'rpc_client'), channel 1:
operation basic.ack caused a channel exception precondition_failed: "unknown delivery tag 8"

One thing of note is that I'm on a link that is.... not great. RTT is ~170 ms, with periodic spikes (I'm on the west coast of the US, server is in france (I think).

I'm just confused because TCP should guarantee in-order delivery, so we should be able to assume the underlying transport is reliable, if not timely, and it seems like that's not true here.

Let me see if I can convince rabbitmq to output more diagnostics.

@eandersson
Copy link
Owner

Are you using multiple threads per channel?

Reading online it suggests a threading issue, but based on the logs I can't see how that could possible be.

@fake-name
Copy link
Author

fake-name commented Nov 19, 2016

Hmmmm, I have 3 threads per channel. One for rx, one for tx, and one for management/monitoring.

The acks should be strictly ordered, though.


Actually, let me idiot check.


Ok, yeah, the sequential writes for the two acks leading up to the exception are from the same thread. Unfortunately, there isn't (or at least wasn't) a way to do ping-pong transmit/receive, so I think you always need two threads (one for tx, one for rx)?

@eandersson
Copy link
Owner

eandersson commented Nov 19, 2016

The only scenario that I can visualize in my head would be that two threads are competing for the write lock. Since unlike rx, there is no outgoing outgoing queue. So unless the lock is already owned, it should always write directly.

Where are you outputting the write logs? Make sure that you log after the lock is acquired. If you are not using SSL, you could even try to remove the thread lock in write_to_socket.

@fake-name
Copy link
Author

fake-name commented Nov 19, 2016

Already there - In io.py


    def write_to_socket(self, frame_data):
        """Write data to the socket.

        :param str frame_data:
        :return:
        """
        self._lock.acquire()
        try:
            print("Writing: ", frame_data, threading.get_ident())
            total_bytes_written = 0
            bytes_to_send = len(frame_data)
            while total_bytes_written < bytes_to_send:
                try:
                    if not self.socket:
                        raise socket.error('connection/socket error')
                    bytes_written = \
                        self.socket.send(frame_data[total_bytes_written:])
                    if bytes_written == 0:
                        raise socket.error('connection/socket error')
                    total_bytes_written += bytes_written
                except socket.timeout:
                    print("Write timed out!")
                    pass
                except socket.error as why:
                    print("Socket error: ", why)
                    if why.args[0] in (EWOULDBLOCK, EAGAIN):
                        continue
                    self._exceptions.append(AMQPConnectionError(why))
                    return
            print("Write finished.")
        finally:
            self._lock.release()

I am using SSL.

@eandersson
Copy link
Owner

Yea - this is making absolutely no sense. Could you capture some packets with wireshark or tcpdump and verify the order?

@fake-name
Copy link
Author

Working on something like that. Let me try to produce a sane minimal example before I go any further. It could be something really silly in my environment.

@eandersson
Copy link
Owner

Sounds good - let me know what you find!

@fake-name
Copy link
Author

fake-name commented Nov 19, 2016

will do


Aaand of course, my minimal example doesn't crash.

@eandersson
Copy link
Owner

eandersson commented Nov 19, 2016

I wonder if this is related to a single channel consuming two queues? I can reproduce it I think with two channel.basic.consume defined on the same channel.

@fake-name
Copy link
Author

fake-name commented Nov 19, 2016

That sounds likely, but I was testing it myself a bit, without seeing the crash.

OTOH:


    def start_consume(self, config):
        self.log.info("Bound. Triggering consume")
        self.storm_channel.basic.consume(self.handle_rx, queue=config['response_queue_name'],         no_ack=False)
        self.storm_channel.basic.consume(self.handle_rx, queue=self.keepalive_exchange_name+'.nak.q', no_ack=False)
        self.log.info("Consume triggered.")

My test harness is here, if it's worth anything: https://github.com/fake-name/ReadableWebProxy/blob/everything-is-fire/amqpstorm_test.py

I think I'm going to rip up and rewrite most of my amqp stuff tomorrow. It's been getting more and more creaky and brittle as I add new things and (don't) refactor, and it's /probably/ at least in part responsible for this weirdness.

@eandersson
Copy link
Owner

Actually, that was a false alarm from my side. I was deferring the acks, but forgot to pop them from the list. So it was trying to ack the same messages over and over again.

@eandersson
Copy link
Owner

eandersson commented Nov 19, 2016

Feel free to take a look at some of the rpc examples I have available - assuming that anyone of them fit your use case. If not, feel free to provide me with specs and I'll add it as an example.
https://github.com/eandersson/amqpstorm/blob/master/examples/scalable_rpc_server.py

@fake-name
Copy link
Author

fake-name commented Nov 20, 2016

Well, I ripped out all my old overly-threaded stuff, and replaced it with a simple ping-pong tx/rx system, and the problem seems to have vanished.

So..... huh.

@fake-name
Copy link
Author

fake-name commented Nov 20, 2016

OTOH, now stuck in:

<Thread(Thread-2, started daemon 140342175667968)>
  File "/usr/lib/python3.5/threading.py", line 882, in _bootstrap
    self._bootstrap_inner()
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector.py", line 367, in run_fetcher
    connection_manager.run()
  File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector.py", line 328, in run
    self.__do_rx()
  File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector.py", line 204, in __do_rx
    self.storm_channel.process_data_events(to_tuple=False)
  File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 266, in process_data_events
    for message in self.build_inbound_messages(break_on_empty=True):
  File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 121, in build_inbound_messages
    message = self._build_message()
  File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 391, in _build_message
    body = self._build_message_body(content_header.body_size)
  File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 436, in _build_message_body
    sleep(IDLE_WAIT)

With rabbitmq reporting 0 unacked messages.

@eandersson
Copy link
Owner

eandersson commented Nov 20, 2016

Did you pull the fix for that?
d8462da
fc62af8

@eandersson
Copy link
Owner

eandersson commented Nov 20, 2016

Actually use the final patch

-                if self.is_closed:
 -                    self.check_for_errors()
 -                    break
 +                self.check_for_errors()

@fake-name
Copy link
Author

fake-name commented Nov 20, 2016

Derp. Will do.

@eandersson
Copy link
Owner

eandersson commented Nov 20, 2016

If its still an issue we can add a simple timeout to that function. There is no way that under normal situations that you should be stuck there longer than a second or two.

Gonna write some unit-tests around this.

@fake-name
Copy link
Author

Yeah, I just patched a timeout into it. Working on merging from master now.

@eandersson
Copy link
Owner

A lot of changes, but mostly in the unit-tests so hopefully wont be too tricky to merge. I did make a change to the io.close call that should speed things up a bit though.

@fake-name
Copy link
Author

fake-name commented Nov 20, 2016

I'm basically just clobbering everything but my _die bit.

@fake-name
Copy link
Author

fake-name commented Nov 20, 2016

Will auto_decode ever try to decode the message.body? Ok, yeah, it's causing bodies that went in a string to come out a bytestring. Which fields does it affect?

For that matter, if I put a unicode string into a message, and send it, it gets auto-encoded? If the transport is binary, it might make more sense to require inputs be bytestrings. That way, you avoid encoding ambiguities.

I'm pushing largely binary strings everywhere, and having a heuristic somewhere trying to potentially decode binary messages feels weird.

@eandersson
Copy link
Owner

eandersson commented Nov 20, 2016

Yes, if you don't need that, it might be good to set it to False. It might add some extra cpu cycles to have it enabled.

body = try_utf8_decode(self._body)
https://github.com/eandersson/amqpstorm/blob/master/amqpstorm/message.py#L54

@eandersson
Copy link
Owner

eandersson commented Nov 20, 2016

This was actually implemented to make code for Python 2 work better with Python 3 code.

As an example this would work with Python 2.7, but would require a decode in Python 3, as the message will always be received as bytestring by default in Python 3.

Python 2

if self.correlation_id != message.correlation_id:

Python 3

if self.correlation_id != message.correlation_id.decode('utf-8'):

Python 3 with auto-decode

if self.correlation_id != message.correlation_id:

@fake-name
Copy link
Author

Yeah, it gets fiddly when you're trying to bridge the two versions, definitely. When I updated my fork of amqpstorm, I defaulted to not decoding in all circumstances (by changing the function definitions)..

OTOH, for py3k only, I'd say enforce that all correlation IDs must be bytestrings. That way, you avoid any possible ambiguity. Right now, you basically have the situation that py3k was designed to prevent, namely usually functional bytestring/string interaction.

@eandersson
Copy link
Owner

Makes sense. Something we can address in the next major patch.

@fake-name
Copy link
Author

Yeah, I'm just not sure if that's something you'd want to do. I'm a big fan of py3k's enforcement of separation, but if you're trying to support both 2 and 3 it gets really hard to do, without having a different API for each version.

Personally, I've just decided to go 3k only, but then the only thing that's affected for me is my personal sillyness.

@eandersson
Copy link
Owner

eandersson commented Nov 20, 2016

I am actually looking at making a Python 3.5+ only version. Making use of some of the new threading features would be useful, but impossible to keep backwards compatible.

@fake-name
Copy link
Author

Heh. Coroutines ftw.

@fake-name
Copy link
Author

fake-name commented Nov 20, 2016

Ok, MOAR STUFF.

I think the issue /may/ be an interaction with basic.recover().

I wrote a minimal test-harness, which simply doesn't ack() every 3rd message. I then periodically call basic.recover(), and it immediately produces a Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 1

Is it possible there's something odd going on with the recovered messages? Could the delivery tags from the ostensibly original delivery be being used, rather then the redelivered tags?

Calling recover() seems to lead to PRECONDITION_FAILED errors in an assortment of places. I've seen it crop up in basic.qos() after the basic.recover() call, as well as the more expected process_data_events(), and even actually exception within the recover() call:

r.Internal(/rpcsys).Thread-221 - INFO - Heartbeat packet received! keepalive 21, random: 0.5790250738006805 -> keepalive_21
Main.RPC-Interface.MainThread - INFO - Get job call for 'RawMirror' -> 0
Main.Connector.Internal(/rss-feeds).Thread-205 - INFO - Interface timeout thread. Ages: heartbeat -> 2.96, last message -> 150.11.
Main.RPC-Interface.MainThread - INFO - Get job call for 'RawMirror' -> 0
Main.Connector.Internal(/rss-feeds).Thread-205 - INFO - Heartbeat packet received! keepalive 29, random: 0.2219987882155925 -> keepalive_29
Main.Connector.Internal(/rss-feeds).Thread-205 - INFO - Heartbeat packet received! keepalive 29, random: 0.2219987882155925 -> keepalive_29
Main.Connector.Manager(/rss-feeds).Thread-205 - ERROR - Exception in connector! Terminating connection...
Main.Connector.Manager(/rss-feeds).Thread-205 - ERROR - Traceback (most recent call last):
Main.Connector.Manager(/rss-feeds).Thread-205 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector.py", line 376, in run_fetcher
Main.Connector.Manager(/rss-feeds).Thread-205 - ERROR -     connection_manager.run()
Main.Connector.Manager(/rss-feeds).Thread-205 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector.py", line 338, in run
Main.Connector.Manager(/rss-feeds).Thread-205 - ERROR -     self.__check_timeouts()
Main.Connector.Manager(/rss-feeds).Thread-205 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector.py", line 323, in __check_timeouts
Main.Connector.Manager(/rss-feeds).Thread-205 - ERROR -     self.storm_channel.basic.recover(requeue=True)
Main.Connector.Manager(/rss-feeds).Thread-205 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/basic.py", line 98, in recover
Main.Connector.Manager(/rss-feeds).Thread-205 - ERROR -     return self._channel.rpc_request(recover_frame)
Main.Connector.Manager(/rss-feeds).Thread-205 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 300, in rpc_request
Main.Connector.Manager(/rss-feeds).Thread-205 - ERROR -     return self.rpc.get_request(uuid)
Main.Connector.Manager(/rss-feeds).Thread-205 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/rpc.py", line 97, in get_request
Main.Connector.Manager(/rss-feeds).Thread-205 - ERROR -     self._wait_for_request(uuid)
Main.Connector.Manager(/rss-feeds).Thread-205 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/rpc.py", line 128, in _wait_for_request
Main.Connector.Manager(/rss-feeds).Thread-205 - ERROR -     self._adapter.check_for_errors()
Main.Connector.Manager(/rss-feeds).Thread-205 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 203, in check_for_errors
Main.Connector.Manager(/rss-feeds).Thread-205 - ERROR -     raise exception
Main.Connector.Manager(/rss-feeds).Thread-205 - ERROR - amqpstorm.exception.AMQPChannelError: Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 30
Main.Connector.Manager(/rss-feeds).Thread-205 - ERROR -
Main.Connector.Internal(/rss-feeds).Thread-205 - INFO - ConnectorManager shutdown called!
Main.Connector.Manager(/rss-feeds).Thread-205 - ERROR - Triggering reconnection...

INFO:Main.Connector:Recovering messages!
received message! <amqpstorm.message.Message object at 0x7f51b4fd20f8>
message body: b'test?'
Not acking!
Loopin!
received message! <amqpstorm.message.Message object at 0x7f51b4fd20f8>
message body: b'wat'
received message! <amqpstorm.message.Message object at 0x7f51b4fd2150>
message body: b'test?'
received message! <amqpstorm.message.Message object at 0x7f51b4fd20f8>
message body: b'wat'
Not acking!
received message! <amqpstorm.message.Message object at 0x7f51b4fd2150>
message body: b'test?'
received message! <amqpstorm.message.Message object at 0x7f51b4fd20f8>
message body: b'wat'
received message! <amqpstorm.message.Message object at 0x7f51b4fd2150>
message body: b'test?'
Not acking!
received message! <amqpstorm.message.Message object at 0x7f51b4fd20f8>
message body: b'wat'
received message! <amqpstorm.message.Message object at 0x7f51b4fd2150>
message body: b'wat'
Loopin!
Traceback (most recent call last):
  File "amqpstorm_test.py", line 248, in <module>
    test()
  File "amqpstorm_test.py", line 244, in test
    tester = AmqpContainer()
  File "amqpstorm_test.py", line 138, in __init__
    self.storm_channel.process_data_events(to_tuple=False)
  File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 279, in process_data_events
    auto_decode=auto_decode):
  File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 120, in build_inbound_messages
    self.check_for_errors()
  File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 203, in check_for_errors
    raise exception
amqpstorm.exception.AMQPChannelError: Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 10
Putting message

Loopin!
INFO:Main.Connector:Recovering messages!
received message! <amqpstorm.message.Message object at 0x7f82e35eb048>
message body: b'test?'
Traceback (most recent call last):
  File "amqpstorm_test.py", line 240, in <module>
    test()
  File "amqpstorm_test.py", line 236, in test
    tester = AmqpContainer()
  File "amqpstorm_test.py", line 137, in __init__
    self.storm_channel.process_data_events(to_tuple=False)
  File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 288, in process_data_events
    self.consumer_callback(message)
  File "amqpstorm_test.py", line 166, in process_rx
    self.storm_channel.basic.qos(50, global_=True)
  File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/basic.py", line 47, in qos
    return self._channel.rpc_request(qos_frame)
  File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 300, in rpc_request
    return self.rpc.get_request(uuid)
  File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/rpc.py", line 97, in get_request
    self._wait_for_request(uuid)
  File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/rpc.py", line 128, in _wait_for_request
    self._adapter.check_for_errors()
  File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 203, in check_for_errors
    raise exception
amqpstorm.exception.AMQPChannelError: Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 1
Putting message

The good news here is I can reliably reproduce this issue, and my old setup that was having issues also sometimes used recover().

Additionally, the exception appears to happen a period of time after the recover call is called, which could be why the message logs /looked/ fine.

Anyways, https://github.com/fake-name/ReadableWebProxy/blob/master/amqpstorm_test.py reproduces this exact issue repeatably. My initial assumption here is I'm using recover() wrong, but I can't see how it should work if just calling it and expecting redelivery isn't correct.

@fake-name fake-name reopened this Nov 20, 2016
@eandersson
Copy link
Owner

Yea, honestly never really used it. A possibility is that I need to clear the inbound queue whenever recover is called. Since all messages will be re-delivered, the old ones may need to be cleared first.

@eandersson
Copy link
Owner

eandersson commented Nov 20, 2016

Testing the following at the moment and it is looking good.

self.storm_channel.basic.recover(requeue=True)
self.storm_channel._inbound.clear()

Good find!

@eandersson eandersson added the bug label Nov 20, 2016
@eandersson
Copy link
Owner

eandersson commented Nov 20, 2016

One thing that does stick out for recover, is that it's meant to be synchronous.

basic.recover-async: This method is deprecated in favour of the synchronous Recover/Recover-Ok.

@fake-name
Copy link
Author

fake-name commented Nov 20, 2016

Well, calling self.storm_channel._inbound.clear() certainly seems to partially mitigate the issue.

I'm not sure if the eventual crash I saw was due to the threading context of my test or not, though. My actual application project serializes all rabbitmq stuff (the MQ interface is owned by a separate thread, with queues for all operations), so I'm testing it there now.

@eandersson
Copy link
Owner

eandersson commented Nov 20, 2016

Yea, it will still die eventually. The documentation on recover is unfortunately quite vague. My understand of the problem is as follows.

  1. You have 100 messages in queue that hasn't been processed yet.
  2. You call basic.recover.
  3. All messages get re-delivered to RabbitMQ.
  4. Those messages are still in the inbound queue, and will be handled eventually.
  5. The messages get processed twice, causes an exception.

With my fix the major issues gets resolved, but as it's threaded it's possible a message was being processed before the queue was successfully cleared and still gets processed twice.

@eandersson
Copy link
Owner

I haven't found a single amqp library that actually clears the inbound queue, so I do wonder how exactly this command is supposed to be handled.

@fake-name
Copy link
Author

Wait, is basic.recover() a proxy call for basic.recover-async(), or basic.recover()? The depreciated function is basic.recover-async, while basic.recover seems to call pamqp_spec.Basic.Recover(requeue=requeue).

Anyways, assuming I can guarantee serialization (which I can in my case), it seems like it mat least mostly solves the issue.


Looking around for other people using basic.recover, it seems like there are a few questions about using it from various languages, with little follow up. I think the usual case involves just restarting the connection, rather then trying to recover anything while while keeping the connection alive.

@eandersson
Copy link
Owner

eandersson commented Nov 20, 2016

Yea, I think I'll ask in the RabbitMQ forums - not found anything useful.

I never implemented support for baisc.recover-async as it was deprecated, but it would be easy for you to try it out. I don't think it's simply a proxy call.

from pamqp import specification as pamqp_spec
recover_frame = pamqp_spec.Basic.RecoverAsync(requeue=True)
self.storm_channel.rpc_request(recover_frame)

@fake-name
Copy link
Author

I'm kind of confused about what the difference between the two is. They sound like they basically do the same thing.

It's worth noting that there is a flag on redelivered messages. Possibly a good work around to the multiple-acking-a-message would be to process redelivered messages first, and maintain a log of the delivery tags. Then, when processing normal messages, if you reach a message that's been seen as a redelivered message, just discard the original version.

I can't see a way to do it without maintaining some local state, though.

@eandersson
Copy link
Owner

eandersson commented Nov 20, 2016

Yea, feels like it would add a lot of complexity, especially when trying to keep this thread-safe. I'll look into it though.

I asked the following in the RabbitMQ forum. Feel free to fill in any missing information.
https://groups.google.com/forum/#!topic/rabbitmq-users/UFz59V2M3W8

@fake-name
Copy link
Author

Hahahahaha, so the depreciated version seems more fully functional. Sigh.

@eandersson
Copy link
Owner

eandersson commented Nov 20, 2016

So both recover and recover-async are apparently deprecated, based on the response from that thread.


I think you basically got two options here.

  1. Close the channel and re-open it (you don't need to close the connection, just the channel).

  2. Make sure to always acknowledge messages. You can reject and requeue if needed. If you are offloading the processing onto a different thread, maybe it would be possible to add a timer and say that if not done within X seconds, kill the thread and call message.reject(requeue=True).

@eandersson eandersson added wontfix and removed bug labels Nov 20, 2016
@fake-name
Copy link
Author

Yeah, I guess I'm stuck working around the issue. How annoying.

They could at least update their documentation!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants