Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: Consumer group behavior related to session.timeout.ms and offset store API #1817

Closed
5 of 7 tasks
agis opened this issue May 23, 2018 · 8 comments
Closed
5 of 7 tasks

Comments

@agis
Copy link

agis commented May 23, 2018

Description

This is more of a question than an actual issue. I'm using the confluent-kafka-go v0.11.4 driver but I believe this is more relevant to librdkafka itself.

I'm trying to understand what happens in a scenario where the process of a message takes more than session.timeout.ms in conjunction with the offset store API.

I'm testing the following scenario:

  • a topic with 4 partitions
  • 4 consumers spawned in the same consumer group, that all execute do the following:
loop {
  msg = poll()
  puts "received message! sleeping for 10secs..."
  sleep 10 # this is 4 more seconds than session.timeout.ms
  commit_offset(x)
}

I spawn the consumers, they all get assigned a partition as I can observe from kafka-consumer-groups --describe and then produce one message.

One of the consumers receives the message normally. I try to produce further messages and they are indeed all received by the respective consumers. kafka-consumer-groups --describe show each consumer in the latest offset (so lag=0). So everything seems fine, but is it?

My question is: shouldn't this scenario cause the same message to be re-processed again and again in an endless loop, since it takes more than session.timeout.ms to process message AND commit the offset?

I'd expect that the consumer is kicked out of the group, since it does more than session.timeout.ms to Poll() or commit_offset(), and so the same partition/message should be give to one of the other consumers. In other words, I'd expect the consumer group to spin endlessly.

Probably I'm missing something here and my understanding is just wrong.

librdkafka logs

With debug=cgrp:

%7|1527070167.898|HEARTBEAT|agis.example.com/g-4#consumer-1| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070167.898|HEARTBEAT|agis.example.com/g-3#consumer-4| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070167.898|HEARTBEAT|agis.example.com/g-2#consumer-2| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070167.909|HEARTBEAT|agis.example.com/g-1#consumer-3| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070168.898|HEARTBEAT|agis.example.com/g-4#consumer-1| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070168.898|HEARTBEAT|agis.example.com/g-3#consumer-4| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070168.899|HEARTBEAT|agis.example.com/g-2#consumer-2| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070168.914|HEARTBEAT|agis.example.com/g-1#consumer-3| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070169.898|HEARTBEAT|agis.example.com/g-4#consumer-1| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070169.898|HEARTBEAT|agis.example.com/g-3#consumer-4| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070169.899|HEARTBEAT|agis.example.com/g-2#consumer-2| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070169.914|HEARTBEAT|agis.example.com/g-1#consumer-3| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070170.902|HEARTBEAT|agis.example.com/g-2#consumer-2| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070170.902|HEARTBEAT|agis.example.com/g-4#consumer-1| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070170.902|HEARTBEAT|agis.example.com/g-3#consumer-4| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070170.916|HEARTBEAT|agis.example.com/g-1#consumer-3| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070171.904|HEARTBEAT|agis.example.com/g-3#consumer-4| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070171.904|HEARTBEAT|agis.example.com/g-4#consumer-1| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070171.904|HEARTBEAT|agis.example.com/g-2#consumer-2| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070171.920|HEARTBEAT|agis.example.com/g-1#consumer-3| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070172.890|OFFSET|agis.example.com/g-4#consumer-1| [thrd:main]: Topic test-agis2 [3]: stored offset 2, committed offset 2
%7|1527070172.890|OFFSET|agis.example.com/g-3#consumer-4| [thrd:main]: Topic test-agis2 [2]: stored offset 2, committed offset 2
%7|1527070172.890|OFFSET|agis.example.com/g-4#consumer-1| [thrd:main]: Topic test-agis2 [3]: setting offset INVALID for commit
%7|1527070172.890|OFFSET|agis.example.com/g-3#consumer-4| [thrd:main]: Topic test-agis2 [2]: setting offset INVALID for commit
%7|1527070172.890|COMMIT|agis.example.com/g-4#consumer-1| [thrd:main]: OffsetCommit internal error: Local: No offset stored
%7|1527070172.890|COMMIT|agis.example.com/g-3#consumer-4| [thrd:main]: OffsetCommit internal error: Local: No offset stored
%7|1527070172.890|COMMIT|agis.example.com/g-4#consumer-1| [thrd:main]: OffsetCommit for 1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1527070172.890|OFFSET|agis.example.com/g-1#consumer-3| [thrd:main]: Topic test-agis2 [0]: stored offset 2, committed offset 2
%7|1527070172.890|COMMIT|agis.example.com/g-3#consumer-4| [thrd:main]: OffsetCommit for 1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1527070172.890|OFFSET|agis.example.com/g-2#consumer-2| [thrd:main]: Topic test-agis2 [1]: stored offset 3, committed offset 3
%7|1527070172.890|UNASSIGN|agis.example.com/g-4#consumer-1| [thrd:main]: Unassign not done yet (0 wait_unassign, 1 assigned, 0 wait commit): OffsetCommit done (__NO_OFFSET)
%7|1527070172.890|OFFSET|agis.example.com/g-1#consumer-3| [thrd:main]: Topic test-agis2 [0]: setting offset INVALID for commit
%7|1527070172.890|UNASSIGN|agis.example.com/g-3#consumer-4| [thrd:main]: Unassign not done yet (0 wait_unassign, 1 assigned, 0 wait commit): OffsetCommit done (__NO_OFFSET)
%7|1527070172.890|OFFSET|agis.example.com/g-2#consumer-2| [thrd:main]: Topic test-agis2 [1]: setting offset INVALID for commit
%7|1527070172.890|COMMIT|agis.example.com/g-1#consumer-3| [thrd:main]: OffsetCommit internal error: Local: No offset stored
%7|1527070172.890|COMMIT|agis.example.com/g-2#consumer-2| [thrd:main]: OffsetCommit internal error: Local: No offset stored
%7|1527070172.890|COMMIT|agis.example.com/g-1#consumer-3| [thrd:main]: OffsetCommit for 1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1527070172.890|COMMIT|agis.example.com/g-2#consumer-2| [thrd:main]: OffsetCommit for 1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1527070172.890|UNASSIGN|agis.example.com/g-1#consumer-3| [thrd:main]: Unassign not done yet (0 wait_unassign, 1 assigned, 0 wait commit): OffsetCommit done (__NO_OFFSET)
%7|1527070172.891|UNASSIGN|agis.example.com/g-2#consumer-2| [thrd:main]: Unassign not done yet (0 wait_unassign, 1 assigned, 0 wait commit): OffsetCommit done (__NO_OFFSET)
%7|1527070172.906|HEARTBEAT|agis.example.com/g-4#consumer-1| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070172.906|HEARTBEAT|agis.example.com/g-3#consumer-4| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070172.906|HEARTBEAT|agis.example.com/g-2#consumer-2| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070172.925|HEARTBEAT|agis.example.com/g-1#consumer-3| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070173.910|HEARTBEAT|agis.example.com/g-4#consumer-1| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070173.910|HEARTBEAT|agis.example.com/g-2#consumer-2| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070173.910|HEARTBEAT|agis.example.com/g-3#consumer-4| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070173.930|HEARTBEAT|agis.example.com/g-1#consumer-3| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070174.912|HEARTBEAT|agis.example.com/g-4#consumer-1| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070174.912|HEARTBEAT|agis.example.com/g-2#consumer-2| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070174.912|HEARTBEAT|agis.example.com/g-3#consumer-4| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070174.930|HEARTBEAT|agis.example.com/g-1#consumer-3| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070175.916|HEARTBEAT|agis.example.com/g-2#consumer-2| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070175.916|HEARTBEAT|agis.example.com/g-4#consumer-1| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070175.916|HEARTBEAT|agis.example.com/g-3#consumer-4| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153
%7|1527070175.931|HEARTBEAT|agis.example.com/g-1#consumer-3| [thrd:main]: kafka-b.example.com:9092/2: Heartbeat for group "foobar" generation id 153

Checklist

Please provide the following information:

  • librdkafka version (release number or git tag): 0.11.4
  • Apache Kafka version: 0.11.0.2
  • librdkafka client configuration:
{
   "api.version.request": true,
   "bootstrap.servers": "kafka.vm",
   "session.timeout.ms": 6000,
   "log.connection.close": false,
   "debug": "cgrp",
   "go.events.channel.enable": false,
   "go.application.rebalance.enable": false,
   "enable.auto.commit": true,
   "auto.commit.interval.ms": 5000,
   "enable.auto.offset.store": false,
   "auto.offset.reset": "latest",
   "enable.partition.eof": false
}
  • Operating system: macOS 10.13.3 (High Sierra)
  • Provide logs (with debug=cgrp as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

To stay in the group a member must send Heartbeat requests at least every < session.timeout.ms (but typically session.timeout.ms/3).
Initially the Java consumer was single threaded and the Heartbeat was triggered by the application calling poll(), this caused issues with exactly the scenario you are describing where message processing takes a long time.
So KIP-62 introduced a background thread to do the heartbeats regardless if the user was calling poll().
But this still has the problem that the group may rebalance for other reasons (another member falling out or joining) and by the time the message processing is done and the application attempts to commit it could no longer be assigned the partition it is trying to commit to.

This is also fixed in KIP-62 by separating the maximum processing time (added max.poll.interval.ms) from the group livelyness threshold (session.timeout.ms):
https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread

Now back to librdkafka:
librdkafka has always done heartbeats in a background thread so the need to call poll() was not as strict as with the Java client. But since max.poll.interval.ms is not yet implemented in librdkafka the maximum processing time is still bound by session.timeout.ms.
The issue is tracked here:
#1039 (more votes, higher prio .. ;) )

So in your case you will need to increase session.timeout.ms to match your processing time, at the expense of longer dead member detection and rebalancing times.

@edenhill
Copy link
Contributor

Dup of #1039 (but discussion may forego here)

@agis
Copy link
Author

agis commented May 23, 2018

Thanks for explaining this. That confirms what I thought was meant to happen, but to my surprise, it doesn't.

So my question could be rephrased to the following:

Since all my consumers process a message in >> session.timeout.ms, how come they don't spin? How come they don't end up in an endless loop of all the consumers receiving the same message all the time?

@edenhill
Copy link
Contributor

That will only happen if the group rebalances and partitions are reassigned to other group members while you are processing a message, and it should typically only affect the single message that is being processed.
As the consumer finishes processing the message and calls poll()/consume() again it will get the rebalance callback rather than a new message.

@agis
Copy link
Author

agis commented May 23, 2018

I think I get it now. So in the following scenario:

  • consumer#1 receives message A and sleeps for a long time
  • a rebalance happens
  • consumer#1 is kicked out of the group since it sleeps, so consumer#2 is assigned the partition
  • consumer#2 also gets message A, sleeps for a long time but finally processes the message
  • consumer#1 in the meantime gets the rebalance event and joins the group again

Is that right?

@edenhill
Copy link
Contributor

yes, and consumer#1 might be unable to commit the processed message because the partition is assigned to consumer#2.

@agis
Copy link
Author

agis commented May 23, 2018

Thanks a lot! It all makes sense now. Closing this.

@agis agis closed this as completed May 23, 2018
@edenhill
Copy link
Contributor

edenhill commented May 23, 2018

A state diagram fwiw:
state diagram

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants