Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer overruns its session timeout, drops out of the group and never rejoins--stuck in zombie state. #1435

Closed
andrewkowalik opened this issue Mar 10, 2018 · 11 comments

Comments

@andrewkowalik
Copy link
Contributor

andrewkowalik commented Mar 10, 2018

We just upgraded to 1.4.1 and I am noticing new behavior. It looks like a consumer is dropping out of the group and not automatically rejoining.

In our logs the only similarity I have noticed is the following logs. Could be unrelated.

INFO:kafka.coordinator:Leaving consumer group

After that message the only other logs I see are idle connections getting closed.

INFO:kafka.client:Closing idle connection 14, last active 540000 ms ago

The service we run via supervisor just sits there and the consumers are no longer part of the group.

I am still digging through the behavior of kafka-python to understand why we are dropping out of the consumer group. I am going to poke around on the assumption that the worker is leaving the group and not rejoining. Will update when I see something.

This may be related to #1418 but not sure based on the users input.

@dpkp
Copy link
Owner

dpkp commented Mar 10, 2018

Can you enable debug logs and see if anything sticks out ?

@andrewkowalik
Copy link
Contributor Author

Working on getting that turned on fully now. At the same time trying to replicate in a development environment.

@andrewkowalik
Copy link
Contributor Author

@dpkp Looking into if I have something misconfigured or not here. Look at kafka-python source now but appears that I am reaching self.coordinator.heartbeat.poll_timeout_expired and never rejoining. I don't quite know how to make of this behavior. I feel that perhaps an exception should be raised here or there should be some attempt to automatically rejoin the group.

I don't have full depth on this check yet though so I am just speculating.

DEBUG:kafka.coordinator:Heartbeat: worker.w.temp_actions_test_worker[7] kafka-python-1.4.1-eb9309fd-53d3-4f50-9f2b-5490e6e039d8
DEBUG:kafka.protocol.parser:Sending request HeartbeatRequest_v0(group='worker.w.temp_actions_test_worker', generation_id=7, member_id=u'kafka-python-1.4.1-eb9309fd-53d3-4f50-9f2b-5490e6e039d8')
DEBUG:kafka.conn:<BrokerConnection node_id=7 host=<host>> Request 1369: HeartbeatRequest_v0(group='worker.w.temp_actions_test_worker', generation_id=7, member_id=u'kafka-python-1.4.1-eb9309fd-53d3-4f50-9f2b-5490e6e039d8')
DEBUG:kafka.protocol.parser:Received correlation id: 1369
DEBUG:kafka.protocol.parser:Processing response HeartbeatResponse_v0
DEBUG:kafka.conn:<BrokerConnection node_id=7 host=<host>> Response 1369 (25.6671905518 ms): HeartbeatResponse_v0(error_code=0)
DEBUG:kafka.coordinator:Received successful heartbeat response for group worker.w.temp_actions_test_worker
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Heartbeat: worker.w.temp_actions_test_worker[7] kafka-python-1.4.1-eb9309fd-53d3-4f50-9f2b-5490e6e039d8
DEBUG:kafka.protocol.parser:Sending request HeartbeatRequest_v0(group='worker.w.temp_actions_test_worker', generation_id=7, member_id=u'kafka-python-1.4.1-eb9309fd-53d3-4f50-9f2b-5490e6e039d8')
DEBUG:kafka.conn:<BrokerConnection node_id=7 host=<host> port=9092> Request 1370: HeartbeatRequest_v0(group='worker.w.temp_actions_test_worker', generation_id=7, member_id=u'kafka-python-1.4.1-eb9309fd-53d3-4f50-9f2b-5490e6e039d8')
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.protocol.parser:Received correlation id: 1370
DEBUG:kafka.protocol.parser:Processing response HeartbeatResponse_v0
DEBUG:kafka.conn:<BrokerConnection node_id=7 host=<host> port=9092> Response 1370 (1.5561580658 ms): HeartbeatResponse_v0(error_code=0)
DEBUG:kafka.coordinator:Received successful heartbeat response for group worker.w.temp_actions_test_worker
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Not ready to heartbeat, waiting
DEBUG:kafka.coordinator:Heartbeat poll expired, leaving group
INFO:kafka.coordinator:Leaving consumer group (worker.w.temp_actions_test_worker).
DEBUG:kafka.protocol.parser:Sending request LeaveGroupRequest_v0(group='worker.w.temp_actions_test_worker', member_id=u'kafka-python-1.4.1-eb9309fd-53d3-4f50-9f2b-5490e6e039d8')
DEBUG:kafka.conn:<BrokerConnection node_id=7 host=<host> port=9092> Request 1371: LeaveGroupRequest_v0(group='worker.w.temp_actions_test_worker', member_id=u'kafka-python-1.4.1-eb9309fd-53d3-4f50-9f2b-5490e6e039d8')
DEBUG:kafka.protocol.parser:Received correlation id: 1371
DEBUG:kafka.protocol.parser:Processing response LeaveGroupResponse_v0
DEBUG:kafka.conn:<BrokerConnection node_id=7 host=<host> port=9092> Response 1371 (2.7539730072 ms): LeaveGroupResponse_v0(error_code=0)
DEBUG:kafka.coordinator:LeaveGroup request for group worker.w.temp_actions_test_worker returned successfully
DEBUG:kafka.coordinator:Group state is not stable, disabling heartbeats
DEBUG:kafka.coordinator:Heartbeat disabled. Waiting

@andrewkowalik
Copy link
Contributor Author

^ Forgot to mention that the remaining logs are just some metadata requests and then idle connection cleanup.

@andrewkowalik
Copy link
Contributor Author

Trying to separate our own implementation details and kafka-python....

We built a wrapper our kafka-python to standardized how developers interact with the library. Historically we ran consumers by running message = next(self._kafka_client) inside of a while loop until a time or message size limit was hit. _kafka_client represents an instance of kafka-python's KafkaConsumer. We manually commit after processing of messages. We have been running like this for 1-2 years.

I am going to try and sanitize the logs before the snippet I provided. So far I am seeing proper fetching and committing of offsets until the very end where I see no new work but I am not seeing a reason why yet.

@dpkp
Copy link
Owner

dpkp commented Mar 22, 2018

This log DEBUG:kafka.coordinator:Heartbeat poll expired, leaving group means that you are not processing messages fast enough to keep up with your configured max_poll_interval_ms . What is that configured to and is it possible that your consumer application might not be able to keep up?

It is also possible that there is a bug in the kafka-python implementation that is triggering a heartbeat poll expiration too early. But I haven't noticed anything yet myself.

@andrewkowalik
Copy link
Contributor Author

I setup a test outside of our production use for the past week and was not able to replicate the problem. I am going to try and get proper logging running in our production environment and I might be able to at least narrow this down between a code issue between our wrapper or something happening in kafka-python.

@dpkp Even if we are not keeping up with max_poll_interval_ms should the consumer not try to restart itself?

@jeffwidman jeffwidman changed the title Consumer drops out of the group and never rejoins Consumer overruns its session timeout, drops out of the group and never rejoins--stuck in zombie state. Dec 19, 2018
Repository owner deleted a comment from NoneGG Mar 5, 2019
Repository owner deleted a comment from NoneGG Mar 5, 2019
Repository owner deleted a comment from vkjv Mar 5, 2019
Repository owner deleted a comment from licy121 Mar 5, 2019
@jeffwidman
Copy link
Collaborator

jeffwidman commented Mar 5, 2019

This ticket was getting difficult to follow with people adding comments about issues that appeared unrelated (including several that were clearly user error), so I deleted some of those comments.

Please file a new bug unless you are quite sure you are experiencing this particular bug.

Other known sources of stuckness in 1.4.4: #1691 and #1728.

As best I can tell, this particular bug is different than both of those.

@jeffwidman
Copy link
Collaborator

@infecto I wonder if this is the same as #1728?

There a bunch of changes merged to master over the last week. Can you try and see if this is still an issue?

@meedeepak
Copy link

Try setting legacy_iterator=True in KafkaConsumer or use poll() because somehow next_v2 does't seems to work on of the class

@bkmead99
Copy link

Bump. Still seeing this 'zombie state' occurring. Nothing logged in debug after it's encountered
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants