Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16194: Do not return records from poll if group metadata unknown #15369

Merged
merged 9 commits into from
Feb 21, 2024

Conversation

cadonna
Copy link
Contributor

@cadonna cadonna commented Feb 14, 2024

Due to the asynchronous nature of the async consumer, it might happen that on the application thread the group metadata is not known after the first poll returns records. If the offsets of those records are then send to a transaction with

txnProducer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata);

and then the transaction is committed, the group coordinator will raise an error saying that the member is not known since the member in groupMetadata is still -1 before the metadata is updated.

This commit avoids this error by not returning any records from poll() until the group metadata is updated, i.e., the member ID and the generation ID (a.k.a. member epoch) are known. This check is only done if group management is used.

Additionally, this commit resets the group metadata when the consumer unsubscribes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@cadonna cadonna marked this pull request as ready for review February 19, 2024 09:46
@cadonna cadonna changed the title WIP KAFKA-16194: Do not return records from poll if group metadata unknown Feb 19, 2024
@cadonna cadonna requested a review from dajac February 19, 2024 10:12
Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! I left some comments.
(I did not check the tests in detail, because I'm not sure about the current main implementation)

@@ -98,6 +99,8 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used

@@ -727,6 +732,17 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
}
}

private boolean isGenerationKnown() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this one-liner

return groupMetadata.filter(g -> g.generationId() != JoinGroupRequest.UNKNOWN_GENERATION_ID).isPresent();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this is correct, because I can use user assigned partitions but still use a group ID to manage the offsets broker-side, if I remember correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, I can do something like

if (subscriptions.hasAutoAssignedPartitions()) {
    return groupMetadata.filter(g -> g.generationId() != JoinGroupRequest.UNKNOWN_GENERATION_ID).isPresent(); 
}
return true;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. But then I'm a bit confused by the method name, because it just returns true when there is no generation...

@@ -1467,6 +1483,7 @@ public void unsubscribe() {
} catch (TimeoutException e) {
log.error("Failed while waiting for the unsubscribe event to complete");
}
groupMetadata = initializeGroupMetadata(groupMetadata.get().groupId(), Optional.empty());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like an independent fix? Maybe mention in the PR description

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

if (!fetch.isEmpty()) {
if (fetch.records().isEmpty()) {
log.trace("Returning empty records from `poll()` "
if (isGenerationKnown()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this check is false... where do we block? It seems like we busy loop with 100% CPU, but maybe I'm wrong

Copy link
Contributor Author

@cadonna cadonna Feb 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The busy loop should not be too bad since the group metadata update event is added to the queue when also the new assignment is set in the membership manager.

Moreover, this is kind of a temporary solution until https://issues.apache.org/jira/browse/KAFKA-16285 is done.

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want, you could improve the name of isGenerationKnown, but LGTM.

@cadonna cadonna merged commit 02ebfc6 into apache:trunk Feb 21, 2024
1 check failed
@jolshan
Copy link
Contributor

jolshan commented Feb 22, 2024

Hmm after this was merged, I'm seeing failed builds on every master build.

[2024-02-22T21:19:13.914Z] Execution failed for task ':streams:test'.
[2024-02-22T21:19:13.914Z] > The following test methods could not be retried, which is unexpected. Please file a bug report at https://github.com/gradle/test-retry-gradle-plugin/issues
[2024-02-22T21:19:13.914Z]      org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStoreTest#shouldLogAndMeasureExpiredRecords[org.apache.kafka.streams.state.internals.WindowKeySchema@71154f21]
[2024-02-22T21:19:13.914Z]      org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStoreTest#shouldLogAndMeasureExpiredRecords[org.apache.kafka.streams.state.internals.SessionKeySchema@5b69d40d]
[2024-02-22T21:19:13.914Z]      org.apache.kafka.streams.state.internals.RocksDBTimestampedSegmentedBytesStoreTest#shouldLogAndMeasureExpiredRecords[org.apache.kafka.streams.state.internals.WindowKeySchema@41c07648]
[2024-02-22T21:19:13.914Z]      org.apache.kafka.streams.state.internals.RocksDBTimestampedSegmentedBytesStoreTest#shouldLogAndMeasureExpiredRecords[org.apache.kafka.streams.state.internals.SessionKeySchema@52d10fb8]

I see this build was fine so not sure if there was some other issue or if the issue is external.

@jolshan
Copy link
Contributor

jolshan commented Feb 22, 2024

Looks like it was actually this one: #15396. I will follow up there.

clolov pushed a commit to clolov/kafka that referenced this pull request Apr 5, 2024
apache#15369)

Due to the asynchronous nature of the async consumer, it might happen that on the application thread the group metadata is not known after the first poll returns records. If the offsets of those records are then send to a transaction with

txnProducer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata);

and then the transaction is committed, the group coordinator will raise an error saying that the member is not known since the member in groupMetadata is still -1 before the metadata is updated.

This commit avoids this error by not returning any records from poll() until the group metadata is updated, i.e., the member ID and the generation ID (a.k.a. member epoch) are known. This check is only done if group management is used.

Additionally, this commit resets the group metadata when the consumer unsubscribes.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants