Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

amqp_simple_wait_frame hangs #16

Closed
stonemaster opened this issue Jun 30, 2012 · 9 comments
Closed

amqp_simple_wait_frame hangs #16

stonemaster opened this issue Jun 30, 2012 · 9 comments

Comments

@stonemaster
Copy link
Contributor

Hi,

I have a problem with a call to amqp_simple_wait_frame in BasicPublish():

  • RabbitMQ messages are sent through a thread pool which internally sends messages to the rabbitmq server using your C++ wrapper. Sometimes it happens that the application doesn't quit because the thread pool hangs in a call to Channel::BasicPublish. This actually runs down to the code line CheckForError(amqp_simple_wait_frame(m_connection, &frame)); in ChannelImpl::GetNextFrameFromBroker.
  • The stack trace looks like this:
    #0 0x0000000000f7fb0c in recv ()
    Doesn't build under VS2008 #1 0x0000000000ea7e59 in wait_frame_inner ()
    Stdint on vs2008 fix #2 0x0000000000ea7f64 in amqp_simple_wait_frame ()
    Fix examples #3 0x0000000000de2fc4 in AmqpClient::Detail::ChannelImpl::GetNextFrameFromBroker (this=0x1323d6d0, frame=..., timeout=...) at SimpleAmqpClient/ChannelImpl.cpp:344
    Build failure on Linux #4 0x0000000000de3392 in AmqpClient::Detail::ChannelImpl::GetNextFrameFromBrokerOnChannel (this=0x1323d6d0, channel=2, frame_out=..., timeout=...)
    at /SimpleAmqpClient/ChannelImpl.cpp:358
    Build static version of the library #5 0x0000000000ddf086 in GetMethodOnChannel > (timeout=..., expected_responses=..., frame=..., channel=, this=)
    at /SimpleAmqpClient/ChannelImpl.h:106
    Adding ability to compile SimpleAmqpClient statically #6 GetMethodOnChannel > (timeout=..., expected_responses=..., frame=..., channel=, this=0x1323d6d0)
    at /SimpleAmqpClient/Channel.cpp:287
    Memory leaks when Channel() constructor throws #7 AmqpClient::Channel::BasicPublish (this=0xc6ae2a0, exchange_name=..., routing_key=..., message=..., mandatory=, immediate=)
    at /SimpleAmqpClient/Channel.cpp:311
  • It seems as if the connection died (which might be possible because the network in my case sometimes has some latency) and the recv call never returns actually blocking the thread pool and thus stopping the application from quitting.

I'm actually quite clueless right now so maybe you have an idea what might be causing this problems. If you need any more debug information just tell me.

Cheers,
André

@stonemaster
Copy link
Contributor Author

The newest investigation has shown that the rabbitmq server was blocking because the free disk limit wasn't fulfilled. This led to the problem the ampq_simple_wait_frame hangs. Is there any way that this function doesn't hang but rather returns an error code which is translated into an c++ exception? This allow a greater flexbility at client side.

@alanxz
Copy link
Owner

alanxz commented Jul 3, 2012

Sorry it took so long to get to this:

So a bit of background:

basic.publish from an AMQP protocol perspective is async operation. The client transmits the message to the broker, and by default no response is required unless something goes wrong.
When something goes wrong the broker will signal with a basic.reject, a channel.close, or connection.close "at some point in the future".

SimpleAmqpClient turns on a rabbitmq feature called publisher confirms, which simply asks the broker to respond with a basic.ack when the message has been successfully dealt with.

Back to what you're seeing - it appears as if the message is being published to the broker, and the broker simply isn't dealing with it (thus not sending something to the client indicating that it was able to deal with it).

So I'll need to look into what happens at a protocol level to when the broker pauses clients. What I don't want is to throw an exception, only to have the message actually be delivered, at some point in the future.

@stonemaster
Copy link
Contributor Author

Thanks for the detailed description!

The problem in my situation is that the client doesn't know why the server doesn't accept the message. Unless the filling rate of the thread pool queue is somehow monitored and appropriate actions are taken, new messages will just be blocked which results in weird behaviour when the application is shutdown. It would be nice to have a configurable behaviour which eithers blocks the connection or just throws an exception. In my case I would rather loose a message or retry it later than having a message queue filling up. But I suppose this isn't non-trivial..

Cheers and thanks,
André

@stonemaster
Copy link
Contributor Author

Did you make a decision regarding what to do when the broker pauses it clients? Or is there any chance to get to the know that state through the C++ API?

Cheers,
André

@alanxz
Copy link
Owner

alanxz commented Aug 21, 2012

Sorry - this temporarily fell off my radar as things got a bit too crazy. I've sent a message to the rabbitmq-discuss list to get clarification as to what happens from a protocol level.

I suspect the answer will be that the broker stops reading from the socket, which will eventually result in TCP backpressure. Then depending on what the socket buffer size is on both the client and broker side, the client will block on a write() call eventually. I think some changes to the rabbitmq-c client would be necessary to immediately detect this TCP backpressure. I'll let you know what I find out.

@stonemaster
Copy link
Contributor Author

Thanks a lot!

@alanxz
Copy link
Owner

alanxz commented Aug 22, 2012

Here's what i got in response to my inquiry:

The server throttles connections by stopping to read from the socket. The effect is the same as just reading more slowly, or that of a slow / disrupted network.

If the condition lasts for a short duration only then there may be no observable effects at the client at all since buffering in the network stack / network can compensate.

If the condition persists then eventually the buffers fill up. How that manifests in the APIs depends on the client. Usually it's just a longer (or, in the extreme, indefinite) execution time for basicPublish method. Or in some async APIs it would be a delayed notification that the socket operation succeeded. Or it may be possible to check how much local buffer space there is left.

So detecting having the connection blocked or throttled from a client perspective sounds impossible to do reliably at in the way I use rabbitmq-c currently. I would have to send enough data to the broker that it fills up the broker's receive buffer, then the client machine's send buffer before I can detect the TCP backpressure. This is certainly possible if the message size is large enough, though won't work in a general sense (and there's very little control current of the size of the various send and receive buffers.

One possible crude method of dealing with a timeout on shutdown would be to have a timeout on waiting for the basic.ack in the basicPublish method. You would need to pick a timeout that was long enough that you didn't get false positive. Also you wouldn't be able to detect whether or not the message actually got routed or not. E.g., the message might sit in the receive buffer for a while while the client times out, then after the client disconnects the broker gets a chance to read the buffer routing the message. Also after the client times out - the SimpleAmqpClient would be an expected state: the only way for the client to proceed would be to disconnect from the broker. For that reason I won't implement this as a part of the SimpleAmqpClient API. However if you would like hack SimpleAmqpClient to do this, I'm willing to give you pointers to make it work.

@alanxz alanxz closed this as completed Aug 22, 2012
@asanand
Copy link
Contributor

asanand commented Jan 29, 2014

We are also having similar issues in BasicPublish, and the code seems to be hung..

#0 0x00007fbc67738102 in recv () from /lib/x86_64-linux-gnu/libpthread.so.0
#1 0x00000000006ff675 in wait_frame_inner (state=0x2504360, decoded_frame=0x7fff7f5c3770) at librabbitmq/amqp_socket.c:279
#2 0x00000000006ecacc in AmqpClient::Detail::ChannelImpl::GetNextFrameFromBroker(amqp_frame_t_&, boost::chrono::duration<long, boost::ratio<1l, 1000000l> >) clone .constprop.195
#3 0x00000000006ed8e8 in AmqpClient::Detail::ChannelImpl::GetNextFrameFromBrokerOnChannel(unsigned short, amqp_frame_t_&, boost::chrono::duration<long, boost::ratio<1l, 1000000l> >) ()
#4 0x00000000006e5636 in AmqpClient::Channel::BasicPublish(std::string const&, std::string const&, boost::shared_ptrAmqpClient::BasicMessage, bool, bool) ()

This seems to be one year old issue, I was wondering if there was any fix made for this. I saw that you have timeout based solution as proposal, but I dont see any timeout param in BasicPublish API currently.

@alanxz
Copy link
Owner

alanxz commented Jan 29, 2014

I have not put much time into adding this feature to SimpleAmqpClient.

Two new bits of information I should add to this:

  • With the proposed solution above: it suffers from two problems:
    • If we timeout waiting for a basic.ack from the broker: what state is the client left in? E.g., if we timeout waiting for an ack - how should the API handle the next BasicPublish() command (or any other RPC to the broker that will block as a result of the broker pausing reading from its socket)?
    • If the message is large enough it will fill up both the broker and the client's network buffers and rabbitmq-c will pause on a send(), if we timeout on a send() there's a good chance that the message is only partially sent. Restarting from this state is a good deal more complicated.
  • The RabbitMQ broker now supports two new methods: connection.blocked and connection.unblocked. If the client reports to the broker upon connection that it supports these features, the broker will send a connection.blocked

In light of my second point: it might be possible to add some functionality that more immediately rejects an attempt to do a BasicPublish to a broker that is overloaded.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants