Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14095: Improve handling of sync offset failures in MirrorMaker #12432

Merged
merged 2 commits into from Aug 1, 2022

Conversation

mimaison
Copy link
Member

We should not treat UNKNOWN_MEMBER_ID and FENCED_INSTANCE_ID as unexpected errors in the Admin client. In MirrorMaker, check the result of committing offsets and log an useful error message in case that failed with UNKNOWN_MEMBER_ID.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

We should not treat UNKNOWN_MEMBER_ID and FENCED_INSTANCE_ID as unexpected errors in the Admin client. In MirrorMaker, check the result of committing offsets and log an useful error message in case that failed with UNKNOWN_MEMBER_ID.
@C0urante C0urante self-requested a review July 28, 2022 02:23
Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Mickael. I agree that the unknown member ID should not be treated as an unexpected error in the admin client, and the new MM2 warning message is a clear improvement. I have a few minor questions, but overall this looks fine 👍

@@ -179,6 +179,9 @@ private void handleError(
case INVALID_GROUP_ID:
case INVALID_COMMIT_OFFSET_SIZE:
case GROUP_AUTHORIZATION_FAILED:
// Member level errors.
case UNKNOWN_MEMBER_ID:
case FENCED_INSTANCE_ID:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would we encounter this error? My understanding is that it'll only be returned when the broker receives a request with a group instance ID defined, and IIUC it's not possible to define one with the admin API we expose right now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, currently this error code is not expected as the admin client does not set the group instance ID. I've removed it.

log.error("Unable to sync offsets for consumer group {}.", consumerGroupId, throwable);
}
}
});
log.trace("sync-ed the offset for consumer group: {} with {} number of offset entries",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update this log message since it's not guaranteed that the sync will have completed by this point, or that it will even complete successfully at all?

Or alternatively, could we keep the message as-is, but move it into an else block in the callback we pass to whenComplete?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved it in an else block above and tweaked the message slightly.

AlterConsumerGroupOffsetsResult result = targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync);
result.all().whenComplete((v, throwable) -> {
if (throwable != null) {
if (throwable.getCause() instanceof UnknownMemberIdException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How stable is the API for the exceptions we'll see here? Can we be reasonably certain that the exception we want to examine will always be wrapped?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The outer exception depends how the future is completed but I think the actual error should always be wrapped.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a closer look at the KafkaFuture Javadocs.

The docs for KafkaFuture::whenComplete state (emphasis mine):

Returns a new KafkaFuture with the same result or exception as this future, that executes the given action when this future completes. When this future is done, the given action is invoked with the result (or null if none) and the exception (or null if none) of this future as arguments."

Given this, we know that whenComplete receives the same exception that would be thrown by, e.g., KafkaFuture::get, if the action failed.

The method signatures for KafkaFuture declare that they throw ExecutionException if the action fails, which always wraps the cause of failure.

So, I'm convinced that we can expect this wrapping 👍

Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Mickael, LGTM. Left one nit; feel free to address or not at your discretion.

AlterConsumerGroupOffsetsResult result = targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync);
result.all().whenComplete((v, throwable) -> {
if (throwable != null) {
if (throwable.getCause() instanceof UnknownMemberIdException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a closer look at the KafkaFuture Javadocs.

The docs for KafkaFuture::whenComplete state (emphasis mine):

Returns a new KafkaFuture with the same result or exception as this future, that executes the given action when this future completes. When this future is done, the given action is invoked with the result (or null if none) and the exception (or null if none) of this future as arguments."

Given this, we know that whenComplete receives the same exception that would be thrown by, e.g., KafkaFuture::get, if the action failed.

The method signatures for KafkaFuture declare that they throw ExecutionException if the action fails, which always wraps the cause of failure.

So, I'm convinced that we can expect this wrapping 👍

targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync);
log.trace("sync-ed the offset for consumer group: {} with {} number of offset entries",
consumerGroupId, offsetToSync.size());
AlterConsumerGroupOffsetsResult result = targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: would it also be useful to have a log line here indicating that we're attempting to sync offsets?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Scheduler prints an info line each time it runs this:

INFO [MirrorCheckpointConnector|task-0] sync idle consumer group offset from source to target took 0 ms (org.apache.kafka.connect.mirror.Scheduler:95)

So I'll leave it like this for now.

@mimaison
Copy link
Member Author

mimaison commented Aug 1, 2022

Thanks for the review @C0urante. Test failures are not related, merging to trunk.

@mimaison mimaison merged commit 1cc1e77 into apache:trunk Aug 1, 2022
@mimaison mimaison deleted the KAFKA-14095 branch August 1, 2022 10:59
ijuma added a commit to confluentinc/kafka that referenced this pull request Aug 5, 2022
…(5 August 2022)

Version related conflicts:
* Jenkinsfile
* gradle.properties
* streams/quickstart/java/pom.xml
* streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
* streams/quickstart/pom.xml
* tests/kafkatest/__init__.py
* tests/kafkatest/version.py

* commit 'add7cd85baa61cd0e1430': (66 commits)
KAFKA-14136 Generate ConfigRecord for brokers even if the value is
unchanged (apache#12483)
  HOTFIX / KAFKA-14130: Reduce RackAwarenesssTest to unit Test (apache#12476)
  MINOR: Remove ARM/PowerPC builds from Jenkinsfile (apache#12380)
  KAFKA-14111 Fix sensitive dynamic broker configs in KRaft (apache#12455)
  KAFKA-13877: Fix flakiness in RackAwarenessIntegrationTest (apache#12468)
KAFKA-14129: KRaft must check manual assignments for createTopics are
contiguous (apache#12467)
KAFKA-13546: Do not fail connector validation if default topic
creation group is explicitly specified (apache#11615)
KAFKA-14122: Fix flaky test
DynamicBrokerReconfigurationTest#testKeyStoreAlter (apache#12452)
  MINOR; Use right enum value for broker registration change (apache#12236)
  MINOR; Synchronize access to snapshots' TreeMap (apache#12464)
  MINOR; Bump trunk to 3.4.0-SNAPSHOT (apache#12463)
  MINOR: Stop logging 404s at ERROR level in Connect
KAFKA-14095: Improve handling of sync offset failures in MirrorMaker
(apache#12432)
  Minor: enable index for emit final sliding window (apache#12461)
  MINOR: convert some more junit tests to support KRaft (apache#12456)
  KAFKA-14108: Ensure both JUnit 4 and JUnit 5 tests run (apache#12441)
  MINOR: Remove code of removed metric (apache#12453)
MINOR: Update comment on verifyTaskGenerationAndOwnership method in
DistributedHerder
KAFKA-14012: Add warning to closeQuietly documentation about method
references of null objects (apache#12321)
  MINOR: Fix static mock usage in ThreadMetricsTest (apache#12454)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants