Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7044: Fix Fetcher.fetchOffsetsByTimes and NPE in describe consumer group #5627

Merged
merged 6 commits into from
Sep 11, 2018

Conversation

apovzner
Copy link
Contributor

@apovzner apovzner commented Sep 9, 2018

kafka-consumer-groups --describe --group ... can result in NullPointerException for two reasons:

  1. Fetcher.fetchOffsetsByTimes() may return too early, without sending list offsets request for topic partitions that are not in cached metadata.
  2. ConsumerGroupCommand.getLogEndOffsets() and getLogStartOffsets() assumed that endOffsets()/beginningOffsets() which eventually call Fetcher.fetchOffsetsByTimes(), would return a map with all the topic partitions passed to endOffsets()/beginningOffsets() and that values are not null. Because of (1), null values were possible if some of the topic partitions were already known (in metadata cache) and some not (metadata cache did not have entries for some of the topic partitions). However, even with fixing (1), endOffsets()/beginningOffsets() may return a map with some topic partitions missing, when list offset request returns a non-retriable error. This happens in corner cases such as message format on broker is before 0.10, or maybe in cases of some other errors.

Testing:
-- added unit test to verify fix in Fetcher.fetchOffsetsByTimes()
-- did some manual testing with kafka-consumer-groups --describe, causing NPE. Was not able to reproduce any NPE cases with DescribeConsumerGroupTest.scala,

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@hachikuji hachikuji self-assigned this Sep 9, 2018

remainingToSearch.keySet().removeAll(result.fetchedOffsets.keySet());
remainingToSearch.keySet().removeAll(value.partitionsWithUnknownOffset);
if (value.partitionsToRetry.isEmpty() && remainingToSearch.isEmpty())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be some redundance between partitionsToRetry and remainingToSearch. It might be nicer if we could get rid of remainingToSearch so that we only had to rely on ListOffsetResult to know if we should retry. I think the only thing we need is to avoid losing partitions in the call to groupListOffsetRequests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, we could add partitions we lose in the groupListOffsetRequest to partitionsToRetry, since sendListOffsetsRequests is called only from that one method... Let me try that.

Copy link

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. Left a couple more comments.

@@ -414,7 +415,9 @@ private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
if (value.partitionsToRetry.isEmpty())
return result;

remainingToSearch.keySet().removeAll(result.fetchedOffsets.keySet());
remainingToSearch = timestampsToSearch.entrySet().stream()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be replaced with remainingToSearch.keySet().retainAll(value.partitionsToRetry).

@@ -632,9 +635,11 @@ public void onFailure(RuntimeException e) {
final RequestFuture<ListOffsetResult> listOffsetRequestsFuture = new RequestFuture<>();
final Map<TopicPartition, OffsetData> fetchedTimestampOffsets = new HashMap<>();
final Set<TopicPartition> partitionsToRetry = new HashSet<>();
final Set<TopicPartition> partitionsRequireMetadataUpdate = new HashSet<>(timestampsToSearch.keySet());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another idea might be to pass partitionsToRetry into groupListOffsetRequests.

} else if (client.isUnavailable(info.leader())) {
client.maybeThrowAuthFailure(info.leader());

// The connection has failed and we need to await the blackout period before we can
// try again. No need to request a metadata update since the disconnect will have
// done so already.
log.debug("Leader {} for partition {} is unavailable for fetching offset until reconnect backoff expires",
info.leader(), tp);
info.leader(), tp);
partitionsToRetry.add(tp);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the logic for getting the list of partitions that do not have available leader to groupListOffsetRequests . I was contemplating whether we should add partition for which we have a failed connection to leader to partitionsToRetry. Seems like we should, because we may be able to reconnect before the list offsets timeout.

logEndOffsetResult._2 match {
case LogOffsetResult.LogOffset(logEndOffset) => getDescribePartitionResult(logEndOffsetResult._1, Some(logEndOffset))
case LogOffsetResult.Unknown => getDescribePartitionResult(logEndOffsetResult._1, None)
case LogOffsetResult.Ignore => null

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, this was the only use of LogOffsetResult.Ignore. Maybe we can get rid of it?

If we did that, then we could probably also get rid of LogOffsetResult and replace it with a simple Option[Long], but we can leave that for a separate PR.

Copy link

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. We can do the cleanup of LogOffsetResult separately.

@hachikuji hachikuji merged commit e2ec2d7 into apache:trunk Sep 11, 2018
hachikuji pushed a commit that referenced this pull request Sep 11, 2018
…mer group (#5627)

A call to `kafka-consumer-groups --describe --group ...` can result in NullPointerException for two reasons:
1)  `Fetcher.fetchOffsetsByTimes()` may return too early, without sending list offsets request for topic partitions that are not in cached metadata.
2) `ConsumerGroupCommand.getLogEndOffsets()` and `getLogStartOffsets()` assumed that endOffsets()/beginningOffsets() which eventually call Fetcher.fetchOffsetsByTimes(), would return a map with all the topic partitions passed to endOffsets()/beginningOffsets() and that values are not null. Because of (1), null values were possible if some of the topic partitions were already known (in metadata cache) and some not (metadata cache did not have entries for some of the topic partitions). However, even with fixing (1), endOffsets()/beginningOffsets() may return a map with some topic partitions missing, when list offset request returns a non-retriable error. This happens in corner cases such as message format on broker is before 0.10, or maybe in cases of some other errors.

Testing:
-- added unit test to verify fix in Fetcher.fetchOffsetsByTimes()
-- did some manual testing with `kafka-consumer-groups --describe`, causing NPE. Was not able to reproduce any NPE cases with DescribeConsumerGroupTest.scala,

Reviewers: Jason Gustafson <jason@confluent.io>
hachikuji pushed a commit that referenced this pull request Sep 11, 2018
…mer group (#5627)

A call to `kafka-consumer-groups --describe --group ...` can result in NullPointerException for two reasons:
1)  `Fetcher.fetchOffsetsByTimes()` may return too early, without sending list offsets request for topic partitions that are not in cached metadata.
2) `ConsumerGroupCommand.getLogEndOffsets()` and `getLogStartOffsets()` assumed that endOffsets()/beginningOffsets() which eventually call Fetcher.fetchOffsetsByTimes(), would return a map with all the topic partitions passed to endOffsets()/beginningOffsets() and that values are not null. Because of (1), null values were possible if some of the topic partitions were already known (in metadata cache) and some not (metadata cache did not have entries for some of the topic partitions). However, even with fixing (1), endOffsets()/beginningOffsets() may return a map with some topic partitions missing, when list offset request returns a non-retriable error. This happens in corner cases such as message format on broker is before 0.10, or maybe in cases of some other errors.

Testing:
-- added unit test to verify fix in Fetcher.fetchOffsetsByTimes()
-- did some manual testing with `kafka-consumer-groups --describe`, causing NPE. Was not able to reproduce any NPE cases with DescribeConsumerGroupTest.scala,

Reviewers: Jason Gustafson <jason@confluent.io>
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…mer group (apache#5627)

A call to `kafka-consumer-groups --describe --group ...` can result in NullPointerException for two reasons:
1)  `Fetcher.fetchOffsetsByTimes()` may return too early, without sending list offsets request for topic partitions that are not in cached metadata.
2) `ConsumerGroupCommand.getLogEndOffsets()` and `getLogStartOffsets()` assumed that endOffsets()/beginningOffsets() which eventually call Fetcher.fetchOffsetsByTimes(), would return a map with all the topic partitions passed to endOffsets()/beginningOffsets() and that values are not null. Because of (1), null values were possible if some of the topic partitions were already known (in metadata cache) and some not (metadata cache did not have entries for some of the topic partitions). However, even with fixing (1), endOffsets()/beginningOffsets() may return a map with some topic partitions missing, when list offset request returns a non-retriable error. This happens in corner cases such as message format on broker is before 0.10, or maybe in cases of some other errors. 

Testing:
-- added unit test to verify fix in Fetcher.fetchOffsetsByTimes() 
-- did some manual testing with `kafka-consumer-groups --describe`, causing NPE. Was not able to reproduce any NPE cases with DescribeConsumerGroupTest.scala,

Reviewers: Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants