Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threading #1

Closed
cp2587 opened this issue Jan 14, 2015 · 3 comments
Closed

Threading #1

cp2587 opened this issue Jan 14, 2015 · 3 comments
Assignees
Labels

Comments

@cp2587
Copy link
Contributor

cp2587 commented Jan 14, 2015

Hi,

First, let me thank you for your library. It works really well. I am using it inside celery tasks to send log message in an AMQP queue that gets further process by logstash (so i need thread safety). I was just wondering why does the library spawns a new thread to handle the amqp connection, can't it be done in the main process ? I would greatly appreciate if you explain to me the workflow of the library in a threading point of view, I don't really understand how rabbitmq and threading should be linked..

Cheers,

@eandersson
Copy link
Owner

Hey

RabbitMQ and threading aren't linked directly, but RabbitMQ expects that certain messages are replied to in a timely manner. That is why I use a background thread to make sure that heavy workloads performed by developers are not preventing the client receiving and responding to mission-critical messages; such as Heartbeats from the server.

In the end it would probably be possible to get around this without a background thread, but as an example the AMQP library; pika that uses a single thread, and the responsibility of handling RabbitMQ messages instead goes to the developer intermediately having to call process_data_events when the main thread is busy.

e.g.
channel.basic.publish(.....)
time.sleep(180) # The internal thread will keep receiving messages while the main process is sleeping
channel.basic.publish(.....)

An example of the issue that pika users frequently encounter is available here.
pika/pika#418


If you want better understand how the inner workflow of the thread and amqp-storm I would recommend that you review the code. Although, you need to keep in mind that the Connection and Channels are all under the main process, and the thread hosted by the Connection will only handle incoming data and then assign it to the appropriate Channel. Also, when you send a message it is done under the main process.

If you want to dig into the code it all starts here.
https://github.com/eandersson/amqp-storm/blob/master/amqpstorm/connection.py#L329

Any incoming message is handled by the thread, and then depending on the Channel will pass it on to the appropriate on_frame handler. If it is Channel0 it is an Internal message (e.g. a heartbeat), if it is a Channel higher than 0 it is one of the users channels.
https://github.com/eandersson/amqp-storm/blob/master/amqpstorm/channel0.py = Channel0
https://github.com/eandersson/amqp-storm/blob/master/amqpstorm/channel.py = User Channel

I wanted the library to be as simple as possible (even though it uses multiple threads) so any incoming message frames that isn't a system message (e.g. heartbeat) are simply appended to a list that is then read by the main process, and then processed on the main thread using the process_data_events function.
https://github.com/eandersson/amqp-storm/blob/master/amqpstorm/channel.py#L106

Any message that needs to be sent in order (e.g. Open/Close Channel) is sent using a simple per-channel RPC mechanism using a threading lock.
https://github.com/eandersson/amqp-storm/blob/master/amqpstorm/base.py#L86
https://github.com/eandersson/amqp-storm/blob/master/amqpstorm/channel.py#L193

Honestly the append list part is something I would like to improve, but has worked really well so far. The only danger is that if QoS is not set, memory may sky rocket, as AMQP may try to push as many as 32k messages to the client. ;)

Let me know if that was unclear, or if you have more questions! :)

@eandersson eandersson self-assigned this Jan 15, 2015
@cp2587
Copy link
Contributor Author

cp2587 commented Jan 19, 2015

Hi,

Thanks for the quick reply, you answered all my interrogations !

@eandersson
Copy link
Owner

No worries! I am glad someone is using it! 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants