Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BrokerConnection: deadlock casused by not handled socket recv exception, exists in latest master branch #1817

Closed
ventfang opened this issue May 25, 2019 · 5 comments
Assignees

Comments

@ventfang
Copy link

kafka-python: 1.4.6

here is my code snippet :

while 1:
    msgs = self.consumer.poll(timeout_ms)
    handle_message(msg)

traceback:

 2019-05-25 19:51:09,205 kafka_clent.py[line:56] ERROR Traceback (most recent call last):
  File "kafka_clent.py", line 48, in __main
    self.__poll_once(handle_messages, timeout_ms)
  File "kafka_clent.py", line 67, in __poll_once
    msgs = self.consumer.poll(timeout_ms)
  File "./opt/venv/lib/python3.7/site-packages/kafka/consumer/group.py", line 617, in poll
    records = self._poll_once(remaining, max_records)
  File "./opt/venv/lib/python3.7/site-packages/kafka/consumer/group.py", line 637, in _poll_once
    self._coordinator.poll()
  File "./opt/venv/lib/python3.7/site-packages/kafka/coordinator/consumer.py", line 263, in poll
    self.ensure_coordinator_ready()
  File "./opt/venv/lib/python3.7/site-packages/kafka/coordinator/base.py", line 259, in ensure_coordinator_ready
    self._client.poll(future=future)
  File "./opt/venv/lib/python3.7/site-packages/kafka/client_async.py", line 593, in poll
    self._poll(timeout)
  File "./opt/venv/lib/python3.7/site-packages/kafka/client_async.py", line 649, in _poll
    self._pending_completion.extend(conn.recv())
  File "./opt/venv/lib/python3.7/site-packages/kafka/conn.py", line 905, in recv
    responses = self._recv()
  File "./opt/venv/lib/python3.7/site-packages/kafka/conn.py", line 942, in _recv
    data = self._sock.recv(self.config['sock_chunk_bytes'])
TimeoutError: [Errno 60] Operation timed out

^C2019-05-25 19:52:19,183 kafka_clent.py[line:51] INFO Traceback (most recent call last):
  File "kafka_clent.py", line 48, in __main
    self.__poll_once(handle_messages, timeout_ms)
  File "kafka_clent.py", line 67, in __poll_once
    msgs = self.consumer.poll(timeout_ms)
  File "./opt/venv/lib/python3.7/site-packages/kafka/consumer/group.py", line 617, in poll
    records = self._poll_once(remaining, max_records)
  File "./opt/venv/lib/python3.7/site-packages/kafka/consumer/group.py", line 637, in _poll_once
    self._coordinator.poll()
  File "./opt/venv/lib/python3.7/site-packages/kafka/coordinator/consumer.py", line 263, in poll
    self.ensure_coordinator_ready()
  File "./opt/venv/lib/python3.7/site-packages/kafka/coordinator/base.py", line 246, in ensure_coordinator_ready
    with self._client._lock, self._lock:
KeyboardInterrupt
@ventfang
Copy link
Author

here is simple patch to avoid the deadlock, but lock guard instead of lock acquire/release is beter...

diff --git a/kafka/conn.py b/kafka/conn.py
index 044d2d5..f5feb99 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -967,6 +967,8 @@ class BrokerConnection(object):
                     break
                 self._lock.release()
                 raise
+            except Exception as e:
+                log.exception(f'{type(e)} {e} unhandled exception.')

@dpkp dpkp self-assigned this May 29, 2019
@dpkp
Copy link
Owner

dpkp commented May 29, 2019

Thanks for the bug report -- are you configuring TCP KeepAlive, perchance?

@ventfang
Copy link
Author

Thanks for the bug report -- are you configuring TCP KeepAlive, perchance?

default config

@jeffwidman
Copy link
Collaborator

Looks like this is fixed by #1820

@litterzhang
Copy link

Looks like i met the same problem.
consumer deadlock when Connection timed out raise. ensure_coordinator_ready forever loop on enter
38233 :: Reader-7 consumer kafka source error -> [Errno 110] Connection timed out

<built-in method __enter__ of _thread.RLock object at remote 0x7fcaca656480>
  File "/home/work/.jumbo/lib/python3.6/site-packages/kafka/coordinator/base.py", line 246, in ensure_coordinator_ready
    with self._client._lock, self._lock:
  File "/home/work/.jumbo/lib/python3.6/site-packages/kafka/coordinator/consumer.py", line 505, in commit_offsets_sync
    self.ensure_coordinator_ready()
  File "/home/work/.jumbo/lib/python3.6/site-packages/kafka/coordinator/consumer.py", line 521, in _maybe_auto_commit_offsets_sync
    self.commit_offsets_sync(self._subscription.all_consumed_offsets())
  (frame information optimized out)
  (frame information optimized out)
  File "/ssd1/zhang92/skyline-z/src/horizon/reader.py", line 82, in run
    if consumer: consumer.close()
  File "/home/work/.jumbo/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/home/work/.jumbo/lib/python3.6/multiprocessing/popen_fork.py", line 80, in _launch
    code = process_obj._bootstrap()
  File "/home/work/.jumbo/lib/python3.6/multiprocessing/popen_fork.py", line 26, in __init__
    self._launch(process_obj)
  File "/home/work/.jumbo/lib/python3.6/multiprocessing/context.py", line 277, in _Popen
    return Popen(process_obj)
  File "/home/work/.jumbo/lib/python3.6/multiprocessing/context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/home/work/.jumbo/lib/python3.6/multiprocessing/process.py", line 105, in start
    self._popen = self._Popen(self)
  File "src/horizon/agent.py", line 42, in run
    for reader in readers: reader.start()
  (frame information optimized out)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants