Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CancelledError exception is not handled inside Sender._fail_all() #710

Closed
bitphage opened this issue Jan 19, 2021 · 1 comment · Fixed by #711
Closed

CancelledError exception is not handled inside Sender._fail_all() #710

bitphage opened this issue Jan 19, 2021 · 1 comment · Fixed by #711

Comments

@bitphage
Copy link
Contributor

bitphage commented Jan 19, 2021

Describe the bug

When doing a clean shutdown of an app using asyncio.run(my_app()), aiofakfa is sometimes failing to handle asyncio.exceptions.CancelledError. Here is the example:

^C2021-01-19 01:04:31,629 - asyncio(_cancel_all_tasks:61) - ERROR: Exception in callback <bound method Sender._fail_all of <aiokafka.producer.sender.Sender object at 0x7f0ec6542b20>>
handle: <Handle Sender._fail_all>
Traceback (most recent call last):
  File "/home/vvk/miniconda3/envs/fetcher/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "uvloop/loop.pyx", line 1450, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1443, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1351, in uvloop.loop.Loop.run_forever
  File "uvloop/loop.pyx", line 519, in uvloop.loop.Loop._run
  File "uvloop/loop.pyx", line 436, in uvloop.loop.Loop._on_idle
  File "uvloop/cbhandles.pyx", line 90, in uvloop.loop.Handle._run
  File "uvloop/cbhandles.pyx", line 70, in uvloop.loop.Handle._run
  File "./bin/order_book_fetcher.py", line 87, in fetch_ws_messages
    await fetcher.log_messages()
  File "/home/vvk/devel/hummingbot-team/exchange-data-fetcher/fetcher/ws_fetcher.py", line 123, in log_messages
    await producer.send(topic_name, item, timestamp_ms=timestamp)
  File "/home/vvk/miniconda3/envs/fetcher/lib/python3.8/site-packages/aiokafka/producer/producer.py", line 439, in send
    partition = self._partition(topic, partition, key, value,
  File "/home/vvk/miniconda3/envs/fetcher/lib/python3.8/site-packages/aiokafka/producer/producer.py", line 366, in _partition
    return self._partitioner(
  File "/home/vvk/miniconda3/envs/fetcher/lib/python3.8/site-packages/kafka/partitioner/default.py", line 15, in __call__
    @classmethod
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 70, in uvloop.loop.Handle._run
  File "/home/vvk/miniconda3/envs/fetcher/lib/python3.8/site-packages/aiokafka/producer/sender.py", line 62, in _fail_all
    if task.exception() is not None:
asyncio.exceptions.CancelledError
2021-01-19 01:04:40,965 - asyncio(__del__:282) - ERROR: Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7f0ec65429d0>

After looking into aiokafka code I found that logic in aiokafka.producer.Sender._fail_all() does not account for raised exceptions here:

    def _fail_all(self, task):
        """ Called when sender fails. Will fail all pending batches, as they
        will never be delivered as well as fail transaction
        """
        if task.exception() is not None:  #  <---- here
            self._message_accumulator.fail_all(task.exception())
            if self._txn_manager is not None:
                self._txn_manager.fatal_error(task.exception())

According to https://docs.python.org/3.9/library/asyncio-task.html#asyncio.Task.exception exception() method raises CancelledError or InvalidStateError.

So basically to fix the error, CancelledError exception handling should be added into _fail_all().

@bitphage
Copy link
Contributor Author

Updated a report with proper code fragment where the error is happening (copy-pasted wrong fragment at first).

bitphage added a commit to bitphage/aiokafka that referenced this issue Jan 19, 2021
@ods ods closed this as completed in #711 Aug 18, 2021
ods added a commit that referenced this issue Aug 18, 2021
* Fix asyncio.CancelledError handling in Sender._fail_all()

Closes: #710

* Fix long line

* Refactor task cancel handling

Co-authored-by: Denis Otkidach <denis.otkidach@gmail.com>

Co-authored-by: Denis Otkidach <denis.otkidach@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant