Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

An exception occurs when the ConsumerCoordinator object is being deleted #2409

Open
tatnguyennguyen opened this issue Nov 8, 2023 · 1 comment

Comments

@tatnguyennguyen
Copy link

tatnguyennguyen commented Nov 8, 2023

Hi, I am using kafka-python version 2.0.2 on Ubuntu 22.04 with Python 3.10
My application use multithreading and sometimes I saw this exception

Exception ignored in: <function ConsumerCoordinator.__del__ at 0x7fc077562ef0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/kafka/coordinator/consumer.py", line 132, in __del__
    super(ConsumerCoordinator, self).__del__()
  File "/usr/local/lib/python3.10/dist-packages/kafka/coordinator/base.py", line 756, in __del__
    self._close_heartbeat_thread()
  File "/usr/local/lib/python3.10/dist-packages/kafka/coordinator/base.py", line 750, in _close_heartbeat_thread
    self._heartbeat_thread.close()
  File "/usr/local/lib/python3.10/dist-packages/kafka/coordinator/base.py", line 927, in close
    self.join(self.coordinator.config['heartbeat_interval_ms'] / 1000)
  File "/usr/lib/python3.10/threading.py", line 1093, in join
    raise RuntimeError("cannot join current thread")
RuntimeError: cannot join current thread

I found someone also have this problem #663 (comment)

To debug this error, I tried to add some logging just before line 927 in base.py

log.info(f'Current thread id: {threading.get_ident()}')
log.info(f'HeartbeatThread thread id: {self.ident}')

When exception happen, thread identifier of current thread is the same as thread identifier of the HeartbeatThread object. Other times, when exception don't happend, the log print two different identifiers.

So I think error happen because at some point, the garbage collector was run on the same thread managed by HeartbeatThread object. When garbage collector call __del__ method of ConsumerCoordinator, it eventually leads to calling self.join(self.coordinator.config['heartbeat_interval_ms'] / 1000) inside close() method of HeartbeatThread and cause exception

To avoid this error, maybe we should avoid calling self.join inside HeartbeatThread?. We could do this by wrapping HeartbeatThread inside another class or something?

@tatnguyennguyen tatnguyennguyen changed the title Exception when ConsumerCoordinator object was deleted An exception occurs when the ConsumerCoordinator object is being deleted Nov 8, 2023
@mohak1907
Copy link

any update on this?
even i am facing a similar issue, when I am trying to close the consumer iterator by calling the close() method, it just stuck there and goes into infinite loop.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants