Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception swallowing in AmqpProtocol.run #90

Open
shturman opened this issue Apr 6, 2016 · 7 comments
Open

Exception swallowing in AmqpProtocol.run #90

shturman opened this issue Apr 6, 2016 · 7 comments

Comments

@shturman
Copy link

shturman commented Apr 6, 2016

aioamqp 0.7.0
RabbitMQ 3.6.1

Got an exception:

Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/aioamqp/protocol.py", line 262, in run
yield from self.dispatch_frame()
File "/usr/local/lib/python3.5/dist-packages/aioamqp/protocol.py", line 217, in dispatch_frame
yield from channel.dispatch_frame(frame)
File "/usr/local/lib/python3.5/dist-packages/aioamqp/channel.py", line 111, in dispatch_frame
yield from methods(frame.class_id, frame.method_id)
File "/usr/lib/python3.5/asyncio/coroutines.py", line 200, in coro
res = func(_args, *_kw)
File "/usr/local/lib/python3.5/dist-packages/aioamqp/channel.py", line 665, in server_basic_cancel
consumer_tag = frame.arguments['consumer_tag']
KeyError: 'consumer_tag'

And the client has stopped consuming messages after it.
I don't see any possibility to handle such situation due to exception swallowed in AmqpProtocol.run.

@dzen
Copy link
Contributor

dzen commented Apr 6, 2016

Hello,

We clearly want to tackle the problem with having an exception in the Task AmqpProtocol.run and give a pleasant solution to know what happened.

Could you please let us know how you initialized your queues and consumers ?

@shturman
Copy link
Author

shturman commented Apr 6, 2016

     transport, protocol = await aioamqp.connect(
            host='host here',
            port=5672,
            login='login',
            password='password',
            virtualhost='vhost',
            on_error=self.disconnect,
            heartbeat=15)

    channel = await protocol.channel()
    await channel.exchange('exchange', 'direct', durable=True)
    await channel.queue(
        queue_name='messages',
        durable=True,
        arguments={
            'x-dead-letter-exchange': 'dead-exchange',
            'x-message-ttl': 1200000}
    )
    await channel.basic_qos(prefetch_count=300, connection_global=False)
    await channel.queue_bind('messages', 'exchange', 'consume.messages')

    await channel.basic_consume(message_processor, queue_name='messages')

@dzen
Copy link
Contributor

dzen commented Apr 6, 2016

I think will see this later this week. We didn't really have python 3.5 for the moment. Do you have the same behavior when running the examples ?

@shturman
Copy link
Author

shturman commented Apr 6, 2016

Everything works well until exception like above. It is an exceptional situation I would say.

@opedge
Copy link

opedge commented Jul 12, 2016

I have the same issue (sometimes) in our production. Do you have any workaround?

@RemiCardona
Copy link
Contributor

There are no workaround for now, and this is very unfortunate. I've come up with this issue myself and I'll see if I can't improve things somewhat. I'll report back if/when I find something.

Cheers

RemiCardona added a commit that referenced this issue Jul 19, 2016
Though in this spot, catching all exceptions is an awful idea.

To be addressed in issue #90.
RemiCardona added a commit that referenced this issue Jul 19, 2016
Though in this spot, catching all exceptions is an awful idea.

To be addressed in issue #90.
adamhooper added a commit to CJWorkbench/channels_rabbitmq that referenced this issue Dec 29, 2020
This exposed a bug in both environments involving async_to_sync()'s new
"kill the thread executor" behavior. See
Polyconseil/aioamqp#90. For now, we override the
AmqpProtocol to avoid catching CancelledError.
@adamhooper
Copy link

My workaround is to subclass and pass this as the protocol_factory:

class AioamqpProtocolWithIssue90Solved(aioamqp.protocol.AmqpProtocol):
    async def run(self):
        # rewrite of aioamqp.AmqpProtocol.run() to nix the exception catch-all
        while not self.stop_now.done():
            try:
                await self.dispatch_frame()
            except AmqpClosedConnection as exc:
                aioamqp.protocol.logger.info("Close connection")
                self.stop_now.set_result(None)

                self._close_channels(exception=exc)
            # except Exception:
            #     logger.exception('error on dispatch')

Why does this catch-all exception handler exist? What errors does it catch that it should catch?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants