Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interrupting library calls #77

Closed
adambadura opened this issue Nov 13, 2012 · 12 comments
Closed

Interrupting library calls #77

adambadura opened this issue Nov 13, 2012 · 12 comments

Comments

@adambadura
Copy link

I'm not sure this is a good place to ask (most likely not). But I don't know any better place (a feature I miss in github) and quick browsing through the code makes me think this might not be possible at all and thus would be an "issue" in fact.

So is there a way to interrupt library calls?

For library user it would be useful to be able to interrupt any calls that could take long (in particular those which block execution until some input on a socket). And keep consistency and stability of the data/library so any actions could be (as much as possible) taken again without complete reload.

@bkw
Copy link

bkw commented Nov 13, 2012

Generally you could use the posix signal alarm for that. But since you mention sockets, it is safer to actually not abort the syscall, but use setsockopt with option SO_RCVTIMEO to set an input timeout on the listening socket instead.
@nark used this technique to add a timeout on consume() for the php amqp extension, which builds on top of rabbitmq-c; see php-amqp/php-amqp#13
Hope this helps.

@alanxz
Copy link
Owner

alanxz commented Nov 13, 2012

Your analysis is correct. Currently the library doesn't directly support interrupting calls into the library.

That said the library does support extracting the socket fd from the connection object. So it is possible with a bit if work to use the select system call with a timeout parameter of your choosing to make the library behave in a non blocking manner.

The SimpleAmqpClient library which wraps rabbitmq-c does this as an example when consuming messages.

@adambadura
Copy link
Author

I'm using the library on Windows (with Visual Studio 2010) to build a service (that runs in a background) that waits for messages from other application through RabbitMQ and then executes some other actions.

Since I wanted to make the service well I should implement options to pause and to stop the service. Both of which (certainly the second one) requires me to interrupt any call, especially the blocking ones.

So are the posix features available to me? How?

And making calls with timeout is not a reasonable option here. Making short timeout would make the code do much more than needed while making a long timeout puts me in a risk of service manager deciding that the service is no longer responsive and stable.

Is running the whole thing on a different thread and then killing it brutally, possibly closing the socket from the main thread first (or later) an option? Would starting a new thread and reinitializing the library (creating connection, exchange, ...) work? (A solution that just now came to my mind.)

@adambadura
Copy link
Author

Also another possibility: is alanxz/libamqp-cpp capable of such interrupting? Is it mature and stable enough for use in production code?

@alanxz
Copy link
Owner

alanxz commented Nov 13, 2012

Win32/WinSock2 does support the [select()](http://msdn.microsoft.com/en-us/library/windows/desktop/ms740141(v=vs.85\).aspx) system call.

Given your use case I would look into using the SimpleAmqpClient library wrapper. It doesn't allow a true interrupt (e.g., using a SIG_ALARM or whatever), but given a short timeout (1-5 seconds) you could get something that is pretty close. This library has already been developed so you're that much farther ahead :)

A good thing to think about: how frequently are you going to be pausing/stopping the service, and how fast does it need to be at pausing?

alanxz/libamqp-cpp is far from complete (I believe the state I left it in is that it can successfully connect to the broker and maybe do some RPC-style AMQP method calls (e.g., queue.declare). It was an experiment I started and never got around to finishing.

@adambadura
Copy link
Author

Actually in this specific case I don't expect any pause. Stopping the service would be done (I guess) with every turning off the computer. And then the time limits are rather strong (but 1-5 seconds should be enough) since it could be for example turning off due to low battery...

Somehow I don't like the timeout solution since this is actually semi-active waiting while it should be passive. But looks like this is the only way to go.

Thanks for assistance and please consider this feature. To me it seems important (and rather natural expectation to be able to interrupt a long blocking call in network environment).

@alanxz
Copy link
Owner

alanxz commented Nov 14, 2012

Why can't you have multiple threads in the service: one is your event loop that alternates between checking for a message from the broker, and checking for a shutdown notification of some kind from another thread.

That's how I've seen a lot of these kinds of services work at least on windows.

I'm unlikely to add an interruptable interface based on signals. Their implementation across platforms is very inconsistent and you quickly run into problems of "what happens when more than one library wants to handle a signal?"

That said an asynchronous interface is something I hope to add in the future, which I think will solve a lot of use cases, such as yours.

@alanxz alanxz closed this as completed Nov 14, 2012
@adambadura
Copy link
Author

Maybe may approach is wrong but I’m used to idea that if a thread has nothing to do it should do nothing and not repeatedly check if there is anything to do. Since such checking is a needless work ideologically not far from active waiting. In fact it is active waiting with improvements.

Not to mention that you end up with a difficult dilemma: how often should you check (alternate)? If you do it often to seem responsive your waiting is more active. If you do it rarely you seem unresponsive. Why put yourself into such dilemma if you might skip that issue?

As to how to implement interrupting I’m of little use here. I’m not used to POSIX which the library seems to be using. I don’t know much about signals and if they are as bad as you describe then surly they are not a solution: just a change into different problem.

As to “interrupting” itself maybe you understood me to strictly. As a library user I don’t care how it is implemented under the hood. I just want to be able to make a call like amqp_cancel( connection ); or something like that which would cancel any blocking/pending IO on for the specified connection/exchange/queue/whatever (if there is any to be canceled). (And I would like to have the library wait for IO rather than repeatedly peek for IO in semi-active approach. To preemptively reply to a possibly solution. ;))

In native Windows API a call to ReadFile[Ex] (both synchronous and asynchronous) – which can be used to read from sockets – can be canceled. Yet I don’t know how reliable and fast this is since I cannot recall actually doing that. Yet referring to native Windows API might mean having to provide a separate implementation for every platform which is an obvious drawback and obstacle.

@alanxz
Copy link
Owner

alanxz commented Nov 14, 2012

The other thing I failed to mention above is that makes a cancellation interface a non-starter is currently rabbitmq-c is not thread-safe: meaning instances of amqp_connection_state_t can not be used concurrently in different threads.

I do realize that ideally you want your app to be NOTHING while waiting for I/O. Having a polling loops sucks down unnecessary CPU cycles, and if you're running on something with limited power you're potentially waking up a processor core when maybe you don't have to.

Unfortunately, even if I did manage to resolve the concurrency issues above, cancelling network I/O is a minefield as you go across different platforms, as there are different levels of support for this.

@adambadura
Copy link
Author

Maybe another approach: is it possible to not use library-provided socket handling and do I/O on your own? That way I could implement interrupting I/O the way I like and just feed raw data to the library functions to parse it in "AMQP way".

I'm not sure how difficult would it be even if the library would (reasonably) allow that. The idea just came to my mind.

@alanxz
Copy link
Owner

alanxz commented Nov 26, 2012

I have not tried this yet, but I think you could get it done with the amqp_handle_input function.

The unwritten (as of yet) documentation on this function:

int amqp_handle_input(amqp_connection_state_t state, 
                       amqp_byte_t received_data, 
                       amqp_frame_t *decoded_frame)

Takes a chunk of received_data from the broker and tries to convert it to an amqp_frame_t.

WARNING: this modifies the internal state of the state object in that you're reading from the socket without the the knowledge of the library. Thus it is important that you do not have any unprocessed data from the broker that the library hasn't consumed before calling into a RPC function like amqp_queue_declare.

Inputs:
state the connection state object
received_data the chunk of data received from the broker. Note that the len member should be set to the actual length of the data received from the broker, and not the size of the memory buffer (in case the two differ).
decoded_frame a pointer to the frame decoded from the received_data. decoded_frame->frame_type is set to 0 unless a complete frame is successfully decoded, in which case the frame_type will be set to AMQP_FRAME_METHOD, AMQP_FRAME_HEADER, or AMQP_FRAME_BODY.

Returns:
A positive number indicates how many bytes were used out of the received_data parameter. Any unused bytes should be shifted to the beginning of the received_data buffer before calling amqp_handle_input again. A negative number indicates that an error occurred in either allocating memory or decoding the stream. In either case the connection is likely dead at this point and should be destroyed.

Errors

  • -ERROR_NO_MEMORY out of memory
  • -ERROR_BAD_AMQP_DATA there was a problem with the data packet. AMQP protocol dictates that the client should immediately send an connection.close and close the connection

@w3sip
Copy link

w3sip commented Jan 29, 2021

Is the still the current situation? Attempting to terminate a client, where the polling thread is in amqp_consume_message ... no luck so far.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants