Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16452: Don't throw OOORE when converting the offset to metadata #15825

Open
wants to merge 5 commits into
base: trunk
Choose a base branch
from

Conversation

kamalcph
Copy link
Collaborator

@kamalcph kamalcph commented Apr 28, 2024

Test Plan

Steps to reproduce the issue:

  1. Create a topic with 1 partition and 2 RF
  2. Send some messages to the topic, wait for the segments to roll over, and upload them to remote storage.
  3. Ensure that the log-start-offset < local-log-start-offset
  4. Stop all the nodes where the replica is placed.
  5. Delete the replication-offset-checkpoint, recovery-point-offset-checkpoint, and kafka_cleanshutdown files on one node that is about to be started first.
  6. Start the remaining nodes
  7. The high-watermark will be set to log-start-offset and the leader will throw OFFSET_OUT_OF_RANGE errors for the follower fetch-requests and cannot accept the produce requests.

Commands:

 sh kafka-topics.sh --create --topic tieredTopic --partitions 1 --replication-factor 2 --bootstrap-server localhost:9092 \
    --config remote.storage.enable=true --config local.retention.ms=60000 --config retention.ms=7200000 \
    --config segment.bytes=104857600 --config file.delete.delay.ms=1000

sh kafka-producer-perf-test.sh --topic tieredTopic --num-records 120000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kamalcph : Thanks for the PR. Left a few comments.

@kamalcph kamalcph marked this pull request as ready for review May 7, 2024 09:32
@kamalcph
Copy link
Collaborator Author

kamalcph commented May 7, 2024

@junrao @satishd @chia7712 @showuon

Updated the test plan in the summary. Verified that the patch fixes the issue by running the trunk and patched build. With the fix, the high-watermark value gets updated to the valid offset. Please take a look when you get chance.

Copy link
Contributor

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. One question: So when we temporarily set high-watermark as LogOffsetMetadata(0) for the leader, we're waiting for the high-watermark gets updated after followers fetch from the leader, right?

@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,

/**
* Given a message offset, find its corresponding offset metadata in the log.
* If the message offset is out of range, throw an OffsetOutOfRangeException
* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For case 1, it looks like we never throw OffsetOutOfRangeException now, doesn't it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkLogStartOffset will throw OffsetOutOfRangeException if the offset is lesser than the logStartOffset.

Copy link
Collaborator Author

@kamalcph kamalcph May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we avoid throwing the error and return message-only metadata when the offset is lesser than the log-start-offset?

While updating the UnifiedLog#updateLogStartOffset, the HWM also gets updated and it doesn't hit the fetchHighWatermarkMetadata (or) convertToOffsetMetadataOrThrow so the call will succeed even when current-log-start-offset > old-HWM.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, OK. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While updating the UnifiedLog#updateLogStartOffset, the HWM also gets updated and it doesn't hit the fetchHighWatermarkMetadata (or) convertToOffsetMetadataOrThrow so the call will succeed even when current-log-start-offset > old-HWM.

For UnifiedLog#updateLogStartOffset, I think it's safe we didn't throw exception when current-log-start-offset > old-HWM because it will be called when initialization or maybeIncrementLogStartOffset. In the latter, we've checked newLogStartOffset > logStartOffset already.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Since we change the logic such that it's ok not to have the metadata for an offset, we could just skip the call to checkLogStartOffset.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the checkLogStartOffset from the convertToOffsetMetadataOrThrow method.

@@ -92,7 +92,10 @@ class DelayedFetch(
// has just rolled, then the high watermark offset will remain the same but be on the old segment,
// which would incorrectly be seen as an instance of Case F.
if (endOffset.messageOffset != fetchOffset.messageOffset) {
if (endOffset.onOlderSegment(fetchOffset)) {
if (endOffset.messageOffsetOnly() || fetchOffset.messageOffsetOnly()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can understand the endOffset.messageOffsetOnly() case since the leader's high watermark is still not updated. But when will fetchOffset be messageOffsetOnly?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetchOffset can be message-only metadata when there is a diverging-epoch. If there is a diverged-epoch in the LogReadResults, then it won't enter the DelayedFetch. We can remove the check if it is not required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetchOffset typically shouldn't be message only. But it doesn't hurt to have the check.

@kamalcph
Copy link
Collaborator Author

kamalcph commented May 8, 2024

Thanks for the PR. One question: So when we temporarily set high-watermark as LogOffsetMetadata(0) for the leader, we're waiting for the high-watermark gets updated after followers fetch from the leader, right?

yes, the call to maybeIncrementLeaderHW will succeed when the node becomes leader for the partition. Note that if the current node is the only alive replica, then the high-watermark gets updated to the leader-log-end-offset. The behavior is same for both normal and remote-storage enabled topic.

@kamalcph
Copy link
Collaborator Author

kamalcph commented May 9, 2024

Test failures are unrelated.

@kamalcph kamalcph added the tiered-storage Pull requests associated with KIP-405 (Tiered Storage) label May 9, 2024
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kamalcph : Thanks for the updated PR. Left a few more comments.

core/src/main/scala/kafka/log/UnifiedLog.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/server/DelayedFetch.scala Outdated Show resolved Hide resolved
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,

/**
* Given a message offset, find its corresponding offset metadata in the log.
* If the message offset is out of range, throw an OffsetOutOfRangeException
* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Since we change the logic such that it's ok not to have the metadata for an offset, we could just skip the call to checkLogStartOffset.

*/
private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LocalLog.read() also calls convertToOffsetMetadataOrThrow.

      else if (startOffset > maxOffsetMetadata.messageOffset)
        emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns)

It seems this could lead to infinite recursion. To avoid that, we could change the above code to avoid calling convertToOffsetMetadataOrThrow and return a message-only LogOffsetMetadata instead to emptyFetchDataInfo.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a potential loop (not sure when it would be triggered), updated the logic to return the message-only metadata.

Copy link
Collaborator Author

@kamalcph kamalcph May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To solve the infinite loop, instead of returning the message-only LogOffsetMetadata when startOffset is beyond the maxOffsetMetadata. Can we retain the same behavior without the loop? Something like below:

def read(startOffset: Long,
           maxLength: Int,
           minOneMessage: Boolean,
           maxOffsetMetadata: LogOffsetMetadata,
           includeAbortedTxns: Boolean): FetchDataInfo = {
    maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
      trace(s"Reading maximum $maxLength bytes at offset $startOffset from log with " +
        s"total length ${segments.sizeInBytes} bytes")

      val endOffsetMetadata = nextOffsetMetadata
      val endOffset = endOffsetMetadata.messageOffset
      val segmentOpt = segments.floorSegment(startOffset)

      // return error on attempt to read beyond the log end offset
      if (startOffset > endOffset || !segmentOpt.isPresent)
        throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
          s"but we only have log segments upto $endOffset.")

      if (startOffset == maxOffsetMetadata.messageOffset) {
        emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
      } else if (startOffset > maxOffsetMetadata.messageOffset) {
       // Updated code to avoid the loop:
        val tmpFetchDataInfo = readFetchDataInfo(segmentOpt.get, startOffset, maxLength = 1, minOneMessage = false, nextOffsetMetadata, includeAbortedTxns = false)
        emptyFetchDataInfo(tmpFetchDataInfo.fetchOffsetMetadata, includeAbortedTxns)
      } else {
        readFetchDataInfo(segmentOpt.get, startOffset, maxLength, minOneMessage, maxOffsetMetadata, includeAbortedTxns)
      }
    }
  }

  private def readFetchDataInfo(segment: LogSegment,
                                startOffset: Long,
                                maxLength: Int,
                                minOneMessage: Boolean,
                                maxOffsetMetadata: LogOffsetMetadata,
                                includeAbortedTxns: Boolean): FetchDataInfo = {
    // Do the read on the segment with a base offset less than the target offset
    // but if that segment doesn't contain any messages with an offset greater than that
    // continue to read from successive segments until we get some messages or we reach the end of the log
    var fetchDataInfo: FetchDataInfo = null
    var segmentOpt: Optional[LogSegment] = Optional.of(segment)
    while (fetchDataInfo == null && segmentOpt.isPresent) {
      val segment = segmentOpt.get
      val baseOffset = segment.baseOffset

      val maxPosition =
        // Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
        if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) maxOffsetMetadata.relativePositionInSegment
        else segment.size

      fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
      if (fetchDataInfo != null) {
        if (includeAbortedTxns)
          fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo)
      } else segmentOpt = segments.higherSegment(baseOffset)
    }

    if (fetchDataInfo != null) fetchDataInfo
    else {
      // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
      // this can happen when all messages with offset larger than start offsets have been deleted.
      // In this case, we will return the empty set with log end offset metadata
      new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
    }
  }

@@ -92,7 +92,10 @@ class DelayedFetch(
// has just rolled, then the high watermark offset will remain the same but be on the old segment,
// which would incorrectly be seen as an instance of Case F.
if (endOffset.messageOffset != fetchOffset.messageOffset) {
if (endOffset.onOlderSegment(fetchOffset)) {
if (endOffset.messageOffsetOnly() || fetchOffset.messageOffsetOnly()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetchOffset typically shouldn't be message only. But it doesn't hurt to have the check.

@kamalcph
Copy link
Collaborator Author

While going through the usages, it looks to me that the LogOffsetMetadata conversion happens in the KafkaMetadataLog is not correct. Could someone please double check?

org.apache.kafka.storage.internals.log.LogOffsetMetadata -> org.apache.kafka.raft.LogOffsetMetadata

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala#L226

Question: Why do we make the org.apache.kafka.raft.LogOffsetMetadata#segmentPosition as empty when hwm.messageOffsetOnly is false?

@jsancio
Copy link
Member

jsancio commented May 10, 2024

@kamalcph, looks like a bug to me. The predicate should be if (!hwm.messageOffsetOnly) or the if/else blocks should be swapped. I suspect that we haven't noticed this bug in the KRaft implementation (KafkaRaftClient) because kraft never looks at the segment and byte position for the HWM.

If you are going to fix this code, do you mind adding a test for this case? Since KafkaMetadataLog calls UnifiedLog.fetchOffsetSnapshot, hwm.messageOffsetOnly should always be false.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kamalcph : Thanks for the updated PR. Left a few more comments.

core/src/main/scala/kafka/log/UnifiedLog.scala Outdated Show resolved Hide resolved
else {
} else if (startOffset > maxOffsetMetadata.messageOffset) {
// Instead of converting the `startOffset` to metadata, returning message-only metadata to avoid potential loop
emptyFetchDataInfo(new LogOffsetMetadata(startOffset), includeAbortedTxns)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have the following code below.

            if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) maxOffsetMetadata.relativePositionInSegment
            else segment.size

We need to be more careful there given maxOffsetMetadata may not have a corresponding relativePositionInSegment.

If maxOffsetMetadata.segmentBaseOffset is -1, we can return empty since maxOffsetMetadata.offset is not on local log segments.

If maxOffsetMetadata.segmentBaseOffset equals to segment.baseOffset, we can use maxOffsetMetadata.relativePositionInSegment.

If maxOffsetMetadata.segmentBaseOffset is larger than segment.baseOffset, we can use segment.size.

If maxOffsetMetadata.segmentBaseOffset is smaller than segment.baseOffset, we return empty.

Copy link
Collaborator Author

@kamalcph kamalcph May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, this patch is getting tricky. We want to validate all the scenarios especially when there is no data to read from the server, the number of fetch requests rate from the clients should be almost the same.

To avoid/reduce the cases, Can we always resolve the maxOffsetMetadata to complete metadata?
#15825 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I thought the design of this PR is to allow maxOffsetMetadata to be message-only in some of the rare cases, right?


val expected = minBytes == 1
assertEquals(expected, delayedFetch.tryComplete())
assertEquals(expected, delayedFetch.isCompleted)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exposes an issue in delayedFetch. If HWM is less than fetchOffset, we haven't gained any bytes. So, we shouldn't complete the delayedFetch immediately.

Copy link
Collaborator Author

@kamalcph kamalcph May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the test, the LogOffsetSnapshot contains message-only offset for logEndOffset, highWatermark, and lastStableOffset in DelayedFetchTest.java#207. So, the test passed with the newly added condition.

In real scenario, we expect the LogOffsetSnapshot to contain the complete metadata for all the offsets.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I understand that the test passes as it is. I am just saying that the logic in DelayedFetch is not consistent.

If the offset metadata is available, we accumulate bytes only if (fetchOffset.messageOffset < endOffset.messageOffset). To be consistent, we need to do the same if test if the offset metadata is not available.

void testMessageOffsetOnly() {
LogOffsetMetadata metadata1 = new LogOffsetMetadata(1L);
LogOffsetMetadata metadata2 = new LogOffsetMetadata(1L, 0L, 1);
assertFalse(UNKNOWN_OFFSET_METADATA.messageOffsetOnly());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more natural for UNKNOWN_OFFSET_METADATA to be message offset only.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can take this one separately in the next PR.

assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK)
assertEmptyFetch(log, log.highWatermark, FetchIsolation.HIGH_WATERMARK)

(log.highWatermark + 1 to log.logEndOffset).foreach { offset =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we changing to start from log.highWatermark + 1 instead of log.highWatermark?

Copy link
Collaborator Author

@kamalcph kamalcph May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since log.highWatermark contains the full-metadata, divided the check into 2 when reading at (or) beyond the high-watermark:

  1. When the fetchOffset equals to the high-watermark, we return empty-records but with complete offset metadata
  2. When the fetchOffset is beyond the high-watermark/max-offset-allowed-to-read, we return empty-records with message-only metadata.

We can reconsider the case whether to return message-only-offset metadata (or) complete-offset metadata when the fetch-offset is beyond the max-offset-allowed-to-read. (comment)

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kamalcph : Thanks for the updated PR. Added a couple of more comments.

else {
} else if (startOffset > maxOffsetMetadata.messageOffset) {
// Instead of converting the `startOffset` to metadata, returning message-only metadata to avoid potential loop
emptyFetchDataInfo(new LogOffsetMetadata(startOffset), includeAbortedTxns)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I thought the design of this PR is to allow maxOffsetMetadata to be message-only in some of the rare cases, right?


val expected = minBytes == 1
assertEquals(expected, delayedFetch.tryComplete())
assertEquals(expected, delayedFetch.isCompleted)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I understand that the test passes as it is. I am just saying that the logic in DelayedFetch is not consistent.

If the offset metadata is available, we accumulate bytes only if (fetchOffset.messageOffset < endOffset.messageOffset). To be consistent, we need to do the same if test if the offset metadata is not available.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tiered-storage Pull requests associated with KIP-405 (Tiered Storage)
Projects
None yet
4 participants