Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… #15618

Closed
wants to merge 2 commits into from

Conversation

chia7712
Copy link
Contributor

  • add default implementation to Batch to return offset of max timestamp
  • copy ListOffsetsIntegrationTest from trunk branch

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 : Thanks for the PR. Left a quick comment.


Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
latestTimestampAndOffset.offset,
val batch = latestTimestampSegment.log.batches().asScala.maxBy(_.maxTimestamp())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, iterating all batches can be expensive. We could use the offset index to find the batch containing the offset in maxTimestampAndOffsetSoFar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao thanks for feedback. I use the max timestamp to find the "first" batch instead of using offset index.

It seems to me using max timestamp is more simple since the offset stored by maxTimestampAndOffsetSoFar could be either the last offset or offset of max timestamp. Hence we have to use condition baseOffset <= offset <= lastOffset to find batch.

I'm ok to use offset if using max timestamp to find first batch have any side effect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 : latestTimestampSegment.log.batches() scans the whole log segment and could introduce unnecessary extra I/O. So, there could be performance degradation because of that.

Hence we have to use condition baseOffset <= offset <= lastOffset to find batch.

I am not sure I understand this. Looking up for a batch with each baseOffset or lastOffset will locate the same batch using the offset index, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

latestTimestampSegment.log.batches() scans the whole log segment and could introduce unnecessary extra I/O. So, there could be performance degradation because of that.

The batches is a iterable object, and its implementation load the batch only if we call next. https://github.com/apache/kafka/blob/3.6/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java#L63

Hence, the benefit of looking up for a batch (find the position and then use it to call batchesFrom) is that we can save some I/O by skipping some batches. Please correct me if I misunderstand anything.

I am not sure I understand this. Looking up for a batch with each baseOffset or lastOffset will locate the same batch using the offset index, right?

Is the impl of lookup like this?

        val position = latestTimestampSegment.offsetIndex.lookup(latestTimestampSegment.offsetOfMaxTimestampSoFar)
        latestTimestampSegment.log.batchesFrom(position.position).asScala

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, suppose you have a 1GB segment and the maxTimestamp is in the last batch. latestTimestampSegment.log.batches() needs to read 1GB from disk. Using the offsetIndex, we only need to read the index and the index.interval (default to 4KB) worth of bytes.

Is the impl of lookup like this?

Yes, that's what I was thinking.

@junrao
Copy link
Contributor

junrao commented Mar 28, 2024

@chia7712: Since the follower only maintains offsetForMaxTimestamp at the batch level, the listMaxTimestamp API was never implemented correctly. So, technically, there was no regression for listMaxTimestamp. It seems there is no need to fix this in the 3.6? We could just fix it in trunk.

@chia7712
Copy link
Contributor Author

Since the follower only maintains offsetForMaxTimestamp at the batch level, the listMaxTimestamp API was never implemented correctly.

I am not sure I understand this. All we need for this solution (or workaround) is the "max timestamp" of a segment, since we always iterate the batches (from the segment having the max timestamp) to find the "offset" of max timestamp when handling the ListOffsetsRequest.MAX_TIMESTAMP. Hence, we can correct the implement for all active branches (include 3.6.3) by this PR.

So, technically, there was no regression for listMaxTimestamp. It seems there is no need to fix this in the 3.6? We could just fix it in trunk.

BTW, I'm ok to keep the behavior for 3.6 as it is not a kind of "regression".

@junrao
Copy link
Contributor

junrao commented Mar 28, 2024

I am not sure I understand this. All we need for this solution (or workaround) is the "max timestamp" of a segment, since we always iterate the batches (from the segment having the max timestamp) to find the "offset" of max timestamp when handling the ListOffsetsRequest.MAX_TIMESTAMP. Hence, we can correct the implement for all active branches (include 3.6.3) by this PR.

Yes, I agree that we can fix this issue completely. I am just saying that we only need to fix this in trunk since the implementation was never correct in any previous branches, thus not a regression.

@chia7712
Copy link
Contributor Author

I am just saying that we only need to fix this in trunk since the implementation was never correct in any previous branches, thus not a regression.

got it. will open another PR for trunk

@chia7712 chia7712 closed this Mar 28, 2024
@chia7712 chia7712 deleted the KAFKA-16310_36 branch April 26, 2024 22:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants