Skip to content

Consumer threads can deadlock in connection errors #1461

@magaman384

Description

@magaman384

After running some time my Kafka consumer got stuck in a deadlock. I found following messages in the logs.

kafka/conn.py <BrokerConnection node_id=1 host=... port=9092>: Closing connection. CorrelationIdError: Correlation IDs do not match: sent 873966, recv 873967
kafka/client_async.py Node 1 connection failed -- refreshing metadata
kafka/coordinator/base.py Error sending OffsetCommitRequest_v2 to node 1 [CorrelationIdError: Correlation IDs do not match: sent 873966, recv 873967]

I have tried connecting with gdb to the stuck consumer, here are the stack traces from threads waiting for locks.

#1  0x000000000056aae4 in PyThread_acquire_lock (lock=0x43466c0, waitflag=1) at ../Python/thread_pthread.h:324
324	../Python/thread_pthread.h: No such file or directory.
(gdb) pydown
Undefined command: "pydown".  Try "help".
(gdb) py-bt
#4 Frame 0x441f800, for file /usr/lib/python2.7/threading.py, line 173, in acquire (self=<_RLock(_Verbose__verbose=False, _RLock__owner=139894790764288, _RLock__block=<thread.lock at remote 0x7f3bce3c8d70>, _RLock__count=1) at remote 0x7f3bc8bd3a10>, blocking=1, me=139895040968512)
    rc = self.__block.acquire(blocking)
#8 Frame 0x4d0d880, for file /usr/lib/python2.7/threading.py, line 285, in __enter__ (self=<_Condition(_Condition__lock=<_RLock(_Verbose__verbose=False, _RLock__owner=139894790764288, _RLock__block=<thread.lock at remote 0x7f3bce3c8d70>, _RLock__count=1) at remote 0x7f3bc8bd3a10>, acquire=<instancemethod at remote 0x7f3bcbc32f00>, _is_owned=<instancemethod at remote 0x7f3bcbc32690>, _release_save=<instancemethod at remote 0x7f3bcbc32500>, release=<instancemethod at remote 0x7f3bcbc32460>, _acquire_restore=<instancemethod at remote 0x7f3bcbc329b0>, _Verbose__verbose=False, _Condition__waiters=[]) at remote 0x7f3bc8bd39d0>)
    return self.__lock.__enter__()
#15 Frame 0x433ad30, for file .../kafka/coordinator/base.py, line 692, in coordinator_dead (self=<ConsumerCoordinator(completed_offset_commits=<collections.deque at remote 0x7f3bc8c06210>, _is_leader=False, rejoining=False, rejoin_needed=False, join_future=None, coordinator_id=1, _assignment_snapshot=None, _lock=<_Condition(_Condition__lock=<_RLock(_Verbose__verbose=False, _RLock__owner=139894790764288, _RLock__block=<thread.lock at remote 0x7f3bce3c8d70>, _RLock__count=1) at remote 0x7f3bc8bd3a10>, acquire=<instancemethod at remote 0x7f3bcbc32f00>, _is_owned=<instancemethod at remote 0x7f3bcbc32690>, _release_save=<instancemethod at remote 0x7f3bcbc32500>, release=<instancemethod at remote 0x7f3bcbc32460>, _acquire_restore=<instancemethod at remote 0x7f3bcbc329b0>, _Verbose__verbose=False, _Condition__waiters=[]) at remote 0x7f3bc8bd39d0>, _generation=<Generation(member_id=u'...', protocol=u'range', ...(truncated)
    with self._lock:
#19 Frame 0x43719c0, for file .../kafka/coordinator/base.py, line 477, in _failed_request (self=<ConsumerCoordinator(completed_offset_commits=<collections.deque at remote 0x7f3bc8c06210>, _is_leader=False, rejoining=False, rejoin_needed=False, join_future=None, coordinator_id=1, _assignment_snapshot=None, _lock=<_Condition(_Condition__lock=<_RLock(_Verbose__verbose=False, _RLock__owner=139894790764288, _RLock__block=<thread.lock at remote 0x7f3bce3c8d70>, _RLock__count=1) at remote 0x7f3bc8bd3a10>, acquire=<instancemethod at remote 0x7f3bcbc32f00>, _is_owned=<instancemethod at remote 0x7f3bcbc32690>, _release_save=<instancemethod at remote 0x7f3bcbc32500>, release=<instancemethod at remote 0x7f3bcbc32460>, _acquire_restore=<instancemethod at remote 0x7f3bcbc329b0>, _Verbose__verbose=False, _Condition__waiters=[]) at remote 0x7f3bc8bd39d0>, _generation=<Generation(member_id=u'...', protocol=u'range', g...(truncated)
    self.coordinator_dead(error)
#29 Frame 0x7f3ba4010d20, for file .../kafka/future.py, line 79, in _call_backs (self=<Future(exception=<CorrelationIdError at remote 0x7f3bc8c8cfa0>, is_done=True, _errbacks=[<functools.partial at remote 0x7f3baf06c050>], value=None, _callbacks=[<functools.partial at remote 0x7f3baeefffc8>]) at remote 0x7f3bcbcde890>, back_type='errback', backs=[...], value=<...>, f=<functools.partial at remote 0x7f3baf06c050>)
    f(value)
#33 Frame 0x4b30310, for file .../kafka/future.py, line 45, in failure (self=<Future(exception=<CorrelationIdError at remote 0x7f3bc8c8cfa0>, is_done=True, _errbacks=[<functools.partial at remote 0x7f3baf06c050>], value=None, _callbacks=[<functools.partial at remote 0x7f3baeefffc8>]) at remote 0x7f3bcbcde890>, e=<...>)
    self._call_backs('errback', self._errbacks, self.exception)
#37 Frame 0x7f3bc8ccfc20, for file .../kafka/conn.py, line 664, in close (self=<BrokerConnection(_gai=[], _init_afi=0, afi=2, state='<disconnected>', _sensors=<BrokerConnectionMetrics(metrics=<Metrics(_config=<MetricConfig(event_window=9223372036854775807, _samples=2, quota=None, tags={'client-id': '...'}, time_window_ms=30000) at remote 0x7f3bc8cc1c10>, _children_sensors={<Sensor(_metrics=[<KafkaMetric(_config=<...>, _measurable=<Rate(_stat=<Count(_initial_value=<float at remote 0x2692ce0>, _samples=[<Sample(event_count=2569, last_window_ms=<float at remote 0x7f3ba400cb68>, value=<float at remote 0x431e998>, initial_value=<float at remote 0x2692ce0>) at remote 0x7f3bc8cea190>, <Sample(event_count=1432, last_window_ms=<float at remote 0x7f3ba40117c8>, value=<float at remote 0x4f70dd8>, initial_value=<float at remote 0x2692ce0>) at remote 0x7f3baee8c710>], _current=1) at remote 0x7f3bc8cd2950>, _unit=3) at remote 0x7f3bc8cd2990>, _metric_...(truncated)
    future.failure(error)
#41 Frame 0x51ab770, for file .../kafka/conn.py, line 790, in _recv (self=<BrokerConnection(_gai=[], _init_afi=0, afi=2, state='<disconnected>', _sensors=<BrokerConnectionMetrics(metrics=<Metrics(_config=<MetricConfig(event_window=9223372036854775807, _samples=2, quota=None, tags={'client-id': '...'}, time_window_ms=30000) at remote 0x7f3bc8cc1c10>, _children_sensors={<Sensor(_metrics=[<KafkaMetric(_config=<...>, _measurable=<Rate(_stat=<Count(_initial_value=<float at remote 0x2692ce0>, _samples=[<Sample(event_count=2569, last_window_ms=<float at remote 0x7f3ba400cb68>, value=<float at remote 0x431e998>, initial_value=<float at remote 0x2692ce0>) at remote 0x7f3bc8cea190>, <Sample(event_count=1432, last_window_ms=<float at remote 0x7f3ba40117c8>, value=<float at remote 0x4f70dd8>, initial_value=<float at remote 0x2692ce0>) at remote 0x7f3baee8c710>], _current=1) at remote 0x7f3bc8cd2950>, _unit=3) at remote 0x7f3bc8cd2990>, _metric_name=...(truncated)
    self.close(e)
#45 Frame 0x7f3bc400b950, for file .../kafka/conn.py, line 731, in recv (self=<BrokerConnection(_gai=[], _init_afi=0, afi=2, state='<disconnected>', _sensors=<BrokerConnectionMetrics(metrics=<Metrics(_config=<MetricConfig(event_window=9223372036854775807, _samples=2, quota=None, tags={'client-id': '...'}, time_window_ms=30000) at remote 0x7f3bc8cc1c10>, _children_sensors={<Sensor(_metrics=[<KafkaMetric(_config=<...>, _measurable=<Rate(_stat=<Count(_initial_value=<float at remote 0x2692ce0>, _samples=[<Sample(event_count=2569, last_window_ms=<float at remote 0x7f3ba400cb68>, value=<float at remote 0x431e998>, initial_value=<float at remote 0x2692ce0>) at remote 0x7f3bc8cea190>, <Sample(event_count=1432, last_window_ms=<float at remote 0x7f3ba40117c8>, value=<float at remote 0x4f70dd8>, initial_value=<float at remote 0x2692ce0>) at remote 0x7f3baee8c710>], _current=1) at remote 0x7f3bc8cd2950>, _unit=3) at remote 0x7f3bc8cd2990>, _metric_n...(truncated)
    responses = self._recv()
#49 Frame 0x7f3ba40069d0, for file .../kafka/client_async.py, line 626, in _poll (self=<KafkaClient(_conns=<Dict at remote 0x7f3bcbb7fab8>, _lock=<_RLock(_Verbose__verbose=False, _RLock__owner=139895040968512, _RLock__block=<thread.lock at remote 0x7f3bcbbb5030>, _RLock__count=1) at remote 0x7f3bc8cc1ed0>, _idle_expiry_manager=<IdleConnectionManager(connections_max_idle=<float at remote 0x42316e0>, lru_connections=<OrderedDict(_OrderedDict__root=[[[[...], [...], 2], [...], 5], [...], None], _OrderedDict__map={2: [...], 5: [...]}) at remote 0x7f3bcb939ab8>, next_idle_close_check_time=<float at remote 0x7f3ba400cc58>) at remote 0x7f3bc8cc1f10>, _closed=False, _pending_completion=<collections.deque at remote 0x7f3bcbbdfec0>, _wake_r=<_socket.socket at remote 0x7f3bc8caf930>, _connecting=set([]), _sensors=<KafkaClientMetrics(metric_group_name='consumer-metrics', select_time=<Sensor(_metrics=[<KafkaMetric(_config=<MetricConfig(event_window=9223372036854775807, _sa...(truncated)
    self._pending_completion.extend(conn.recv())
#53 Frame 0x43773a0, for file .../kafka/client_async.py, line 573, in poll (self=<KafkaClient(_conns=<Dict at remote 0x7f3bcbb7fab8>, _lock=<_RLock(_Verbose__verbose=False, _RLock__owner=139895040968512, _RLock__block=<thread.lock at remote 0x7f3bcbbb5030>, _RLock__count=1) at remote 0x7f3bc8cc1ed0>, _idle_expiry_manager=<IdleConnectionManager(connections_max_idle=<float at remote 0x42316e0>, lru_connections=<OrderedDict(_OrderedDict__root=[[[[...], [...], 2], [...], 5], [...], None], _OrderedDict__map={2: [...], 5: [...]}) at remote 0x7f3bcb939ab8>, next_idle_close_check_time=<float at remote 0x7f3ba400cc58>) at remote 0x7f3bc8cc1f10>, _closed=False, _pending_completion=<collections.deque at remote 0x7f3bcbbdfec0>, _wake_r=<_socket.socket at remote 0x7f3bc8caf930>, _connecting=set([]), _sensors=<KafkaClientMetrics(metric_group_name='consumer-metrics', select_time=<Sensor(_metrics=[<KafkaMet---Type <return> to continue, or q <return> to quit---
ric(_config=<MetricConfig(event_window=9223372036854775807, _samples=...(truncated)
    self._poll(timeout)
#57 Frame 0x7f3bc4006300, for file .../kafka/coordinator/consumer.py, line 502, in commit_offsets_sync (self=<ConsumerCoordinator(completed_offset_commits=<collections.deque at remote 0x7f3bc8c06210>, _is_leader=False, rejoining=False, rejoin_needed=False, join_future=None, coordinator_id=1, _assignment_snapshot=None, _lock=<_Condition(_Condition__lock=<_RLock(_Verbose__verbose=False, _RLock__owner=139894790764288, _RLock__block=<thread.lock at remote 0x7f3bce3c8d70>, _RLock__count=1) at remote 0x7f3bc8bd3a10>, acquire=<instancemethod at remote 0x7f3bcbc32f00>, _is_owned=<instancemethod at remote 0x7f3bcbc32690>, _release_save=<instancemethod at remote 0x7f3bcbc32500>, release=<instancemethod at remote 0x7f3bcbc32460>, _acquire_restore=<instancemethod at remote 0x7f3bcbc329b0>, _Verbose__verbose=False, _Condition__waiters=[]) at remote 0x7f3bc8bd39d0>, _generation=<Generation(member_id=u'...', protoco...(truncated)
    self._client.poll(future=future)
#61 Frame 0x7f3ba40061f0, for file .../kafka/consumer/group.py, line 508, in commit (self=<KafkaConsumer(_metrics=<Metrics(_config=<MetricConfig(event_window=9223372036854775807, _samples=2, quota=None, tags={'client-id': '...'}, time_window_ms=30000) at remote 0x7f3bc8cc1c10>, _children_sensors={<Sensor(_metrics=[<KafkaMetric(_config=<...>, _measurable=<Rate(_stat=<Count(_initial_value=<float at remote 0x2692ce0>, _samples=[<Sample(event_count=2569, last_window_ms=<float at remote 0x7f3ba400cb68>, value=<float at remote 0x431e998>, initial_value=<float at remote 0x2692ce0>) at remote 0x7f3bc8cea190>, <Sample(event_count=1432, last_window_ms=<float at remote 0x7f3ba40117c8>, value=<float at remote 0x4f70dd8>, initial_value=<float at remote 0x2692ce0>) at remote 0x7f3baee8c710>], _current=1) at remote 0x7f3bc8cd2950>, _unit=3) at remote 0x7f3bc8cd2990>, _metric_name=<MetricName(_group='consumer-metrics', _description='The average number o...(truncated)
    self._coordinator.commit_offsets_sync(offsets)
...
#4 Frame 0x4d0d5d0, for file /usr/lib/python2.7/threading.py, line 173, in acquire (self=<_RLock(_Verbose__verbose=False, _RLock__owner=139895040968512, _RLock__block=<thread.lock at remote 0x7f3bcbbb5030>, _RLock__count=1) at remote 0x7f3bc8cc1ed0>, blocking=1, me=139894790764288)
    rc = self.__block.acquire(blocking)
#11 Frame 0x7f3bc400a370, for file .../kafka/client_async.py, line 552, in poll (self=<KafkaClient(_conns=<Dict at remote 0x7f3bcbb7fab8>, _lock=<_RLock(_Verbose__verbose=False, _RLock__owner=139895040968512, _RLock__block=<thread.lock at remote 0x7f3bcbbb5030>, _RLock__count=1) at remote 0x7f3bc8cc1ed0>, _idle_expiry_manager=<IdleConnectionManager(connections_max_idle=<float at remote 0x42316e0>, lru_connections=<OrderedDict(_OrderedDict__root=[[[[...], [...], 2], [...], 5], [...], None], _OrderedDict__map={2: [...], 5: [...]}) at remote 0x7f3bcb939ab8>, next_idle_close_check_time=<float at remote 0x7f3ba400cc58>) at remote 0x7f3bc8cc1f10>, _closed=False, _pending_completion=<collections.deque at remote 0x7f3bcbbdfec0>, _wake_r=<_socket.socket at remote 0x7f3bc8caf930>, _connecting=set([]), _sensors=<KafkaClientMetrics(metric_group_name='consumer-metrics', select_time=<Sensor(_metrics=[<KafkaMetric(_config=<MetricConfig(event_window=9223372036854775807, _sam...(truncated)
    with self._lock:
#15 Frame 0x7f3bc8bee238, for file .../kafka/coordinator/base.py, line 941, in _run_once (self=<HeartbeatThread(coordinator=<weakproxy at remote 0x7f3bc8cadec0>, _Thread__ident=139894790764288, failed=None, _Thread__block=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f3bcbbb5070>, acquire=<built-in method acquire of thread.lock object at remote 0x7f3bcbbb5070>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x7f3bcbbb5070>) at remote 0x7f3bc8be8bd0>, _Thread__name='...-heartbeat', _Thread__daemonic=True, enabled=True, _Thread__started=<_Event(_Verbose__verbose=False, _Event__flag=True, _Event__cond=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f3bce3f44b0>, acquire=<built-in method acquire of thread.lock object at remote 0x7f3bce3f44b0>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x7f3bce3...(truncated)
    self.coordinator._client.poll(timeout_ms=0)
#19 Frame 0x7f3bc8be3988, for file .../kafka/coordinator/base.py, line 910, in run (self=<HeartbeatThread(coordinator=<weakproxy at remote 0x7f3bc8cadec0>, _Thread__ident=139894790764288, failed=None, _Thread__block=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f3bcbbb5070>, acquire=<built-in method acquire of thread.lock object at remote 0x7f3bcbbb5070>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x7f3bcbbb5070>) at remote 0x7f3bc8be8bd0>, _Thread__name='...-heartbeat', _Thread__daemonic=True, enabled=True, _Thread__started=<_Event(_Verbose__verbose=False, _Event__flag=True, _Event__cond=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f3bce3f44b0>, acquire=<built-in method acquire of thread.lock object at remote 0x7f3bce3f44b0>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x7f3bce3f44b0>...(truncated)
    self._run_once()
#23 Frame 0x7f3bc4000910, for file /usr/lib/python2.7/threading.py, line 810, in __bootstrap_inner (self=<HeartbeatThread(coordinator=<weakproxy at remote 0x7f3bc8cadec0>, _Thread__ident=139894790764288, failed=None, _Thread__block=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f3bcbbb5070>, acquire=<built-in method acquire of thread.lock object at remote 0x7f3bcbbb5070>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x7f3bcbbb5070>) at remote 0x7f3bc8be8bd0>, _Thread__name='...-heartbeat', _Thread__daemonic=True, enabled=True, _Thread__started=<_Event(_Verbose__verbose=False, _Event__flag=True, _Event__cond=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x7f3bce3f44b0>, acquire=<built-in method acquire of thread.lock object at remote 0x7f3bce3f44b0>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x7f3bce3f44b0>) at remote 0x7f3bc8be8850>) at remote ...(truncated)
    self.run()

Metadata

Metadata

Assignees

Labels

No labels
No labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions