Skip to content


Subversion checkout URL

You can clone with
Download ZIP


AMQP Subscribe #129

veered opened this Issue · 4 comments

3 participants


Presently the AMQP patch does not start a new fiber each time a message is received. This means that it is impossible to use em-synchrony inside a subscription callback. For example, suppose that I receive message A on fiber Z, and attempt to open a Redis connection. Well then em-synchrony will yield Z, waiting for Redis to open up its connection. But if we receive another message from AMQP, then AMQP will resume fiber Z with the new message, completely screwing up the Redis callback. I have a monkey patch for subscribe that seems to be working:

module EventMachine::Synchrony::AMQP
        class Queue
        def subscribe &block
                asubscribe { |headers, payload| {, payload) }.resume }

The description is confusing me, but the code makes sense. :-)

@calj how does that look to you? any unexpected behaviors that we would need to guard for?


It's true it's not obvious to use the consumer part of the lib, we should probably improve it.
I don't like the approach to create a new Fiber each time we receive a message, it sounds performance killer, we should use Fiber pools instead.

Let me show you the code I use in my consumers:

I consume messages in one Fiber only, and push them in a EM::Queue:

q =
consumer =, queue, nil, false, true)
consumer.on_delivery do |meta, msg|
  q.push(msg) if msg

Then I create a Fiber poll that pop messages from the queue:
(set the consumers_count as the parallelism level you need.)

(1..consumers_count).each do |n|
  f = do
    loop do
      q.pop { |*args| EM.next_tick { f.resume *args } }
      msg = Fiber.yield
      # Do what you want with the message here

Finally for the redis connections I use the EM:Synchrony::ConnectionPool to ensure there is not problem accessing them from different fibers: 4) do
  EM::Hiredis::Client.connect('localhost', 6379)

What do you think about this mechanism?
We can include it in the library.


@calj I've tested the Fiber pool theory in the past, and to my own surprise, it didn't come out any faster.. I was playing with this within the core of Goliath, and if anything it came out a bit slower. Just FYI.. If we're going to recommend using fiber pools, we should benchmark the use case first.

The above is for the performance side strictly.. There are many other good reasons why you would use a fiber-pool...


I definitely agree with the benchmark approach.
I will try to run some bench this week to get the answer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.