Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consuming multiple queues with one channel #48

Closed
smarlowucf opened this issue Dec 7, 2017 · 7 comments
Closed

Consuming multiple queues with one channel #48

smarlowucf opened this issue Dec 7, 2017 · 7 comments
Assignees

Comments

@smarlowucf
Copy link
Contributor

I am porting code from pika to amqpstorm for thread safety. In the existing code there is a channel that is consuming multiple queues. With amqpstorm all messages go to the last consumed callback:

producer.py

from amqpstorm import Message, UriConnection


msg_properties = {
    'content_type': 'application/json',
    'delivery_mode': 2
}

connection = UriConnection(
    'amqp://guest:guest@localhost:5672/%2F?heartbeat=600'
)

channel = connection.channel()
channel.confirm_deliveries()

channel.exchange.declare(
    exchange='testing',
    exchange_type='direct',
    durable=True
)

Message.create(
    channel,
    'Test service event.',
    msg_properties
).publish(
    'service_event',
    exchange='testing',
)

Message.create(
    channel,
    'Test message event.',
    msg_properties
).publish(
    'message_event',
    exchange='testing',
)

channel.close()
connection.close()

consumer.py

from amqpstorm import UriConnection


connection = UriConnection(
    'amqp://guest:guest@localhost:5672/%2F?heartbeat=600'
)

channel = connection.channel()
channel.confirm_deliveries()


def service_event(message):
    message.ack()
    print('From service method.')


def message_event(message):
    message.ack()
    print('From message method.')


channel.exchange.declare(
    exchange='testing',
    exchange_type='direct',
    durable=True
)

s_queue = 'service_event.queue'
channel.queue.declare(s_queue)
channel.basic.consume(service_event, s_queue)
channel.queue.bind(
    exchange='testing',
    queue=s_queue,
    routing_key='service_event'
)

m_queue = 'message_event.queue'
channel.queue.declare(m_queue)
channel.basic.consume(message_event, m_queue)
channel.queue.bind(
    exchange='testing',
    queue=m_queue,
    routing_key='message_event'
)

try:
    channel.start_consuming()
except:
    channel.stop_consuming()

channel.close()
connection.close()

Yields:

From message method.
From message method.

But should be:

From service method.
From message method.

Am I doing something wrong here? It seems like one channel should be able to consume multiple queues into the correct callbacks?

@smarlowucf
Copy link
Contributor Author

This appears to be the cause:

https://github.com/eandersson/amqpstorm/blob/master/amqpstorm/basic.py#L132

Is this intended or is it something that could be updated?

@eandersson
Copy link
Owner

eandersson commented Dec 8, 2017

This was indeed an intentional design decision. I can revisit it.

@eandersson
Copy link
Owner

eandersson commented Dec 8, 2017

How about something like this as an alternative?

from amqpstorm import UriConnection


connection = UriConnection(
    'amqp://guest:guest@localhost:5672/%2F?heartbeat=600'
)

channel = connection.channel()
channel.confirm_deliveries()


def service_event(message):
    message.ack()
    print('From service method.')


def message_event(message):
    message.ack()
    print('From message method.')


channel.exchange.declare(
    exchange='testing',
    exchange_type='direct',
    durable=True
)

s_queue = 'service_event.queue'
channel.queue.declare(s_queue)
channel.basic.consume(queue=s_queue)
channel.queue.bind(
    exchange='testing',
    queue=s_queue,
    routing_key='service_event'
)

m_queue = 'message_event.queue'
channel.queue.declare(m_queue)
channel.basic.consume(queue=m_queue)
channel.queue.bind(
    exchange='testing',
    queue=m_queue,
    routing_key='message_event'
)

for message in channel.build_inbound_messages():
    routing_key = message.method.get('routing_key')
    if 'service_event' in routing_key:
        service_event(message)
    elif 'message_event' in routing_key:
        message_event(message)

channel.close()
connection.close()

@smarlowucf
Copy link
Contributor Author

@eandersson Thanks for the quick response!

Yep, there are other options to accomplish handling the keys and passing messages to the proper callbacks. I was more surprised since all the other libraries I tested allowed for multiple consuming callbacks.

I think it would be a nice addition and would provide a clean way to handle messages intended for different callbacks. But given this was a design decision then this really isn't a bug. So I will close the issue for now.

@eandersson eandersson reopened this Dec 9, 2017
@eandersson eandersson self-assigned this Dec 9, 2017
eandersson added a commit that referenced this issue Dec 9, 2017
@eandersson eandersson added this to the 3.X milestone Dec 9, 2017
eandersson added a commit that referenced this issue Dec 9, 2017
@eandersson
Copy link
Owner

Thanks for the feedback @smarlowucf.

I'll have this implemented in the next release.

@smarlowucf
Copy link
Contributor Author

Awesome, thanks @eandersson !

eandersson added a commit that referenced this issue Jan 1, 2018
eandersson added a commit that referenced this issue Jan 5, 2018
* Multi-Callback Support [#48]
* Added sanity check to api connection test and fixing broken test
@eandersson
Copy link
Owner

This should be fixed with 2.4.0. Will let it bake a few days before I push it out. Thanks for the report @smarlowucf .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants