Skip to content

Commit

Permalink
Addressed the review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kamalcph committed May 7, 2024
1 parent 8d34faa commit 7048828
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 23 deletions.
13 changes: 8 additions & 5 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1424,14 +1424,17 @@ class UnifiedLog(@volatile var logStartOffset: Long,

/**
* Given a message offset, find its corresponding offset metadata in the log.
* If the message offset is out of range, throw an OffsetOutOfRangeException
* 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException
* 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata
* 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata
*/
private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
checkLogStartOffset(offset)
if (remoteLogEnabled() && offset < localLogStartOffset()) {
new LogOffsetMetadata(offset, LogOffsetMetadata.REMOTE_LOG_UNKNOWN_OFFSET)
} else {
try {
localLog.convertToOffsetMetadataOrThrow(offset)
} catch {
case _: OffsetOutOfRangeException =>
new LogOffsetMetadata(offset)
}
}

Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/server/DelayedFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ class DelayedFetch(
// has just rolled, then the high watermark offset will remain the same but be on the old segment,
// which would incorrectly be seen as an instance of Case F.
if (endOffset.messageOffset != fetchOffset.messageOffset) {
if (endOffset.onOlderSegment(fetchOffset)) {
if (endOffset.messageOffsetOnly() || fetchOffset.messageOffsetOnly()) {
// This case is to handle the stale high-watermark on the leader until it gets updated with the correct value
accumulatedSize += 1
} else if (endOffset.onOlderSegment(fetchOffset)) {
// Case F, this can happen when the new fetch operation is on a truncated leader
debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.")
return forceComplete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, FetchParams, FetchPartitionData, LogOffsetMetadata, LogOffsetSnapshot}
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.mockito.ArgumentMatchers.{any, anyInt}
import org.mockito.Mockito.{mock, when}

Expand All @@ -46,7 +48,7 @@ class DelayedFetchTest {

val fetchStatus = FetchPartitionStatus(
startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId(), fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)

var fetchResultOpt: Option[FetchPartitionData] = None
Expand Down Expand Up @@ -92,7 +94,7 @@ class DelayedFetchTest {

val fetchStatus = FetchPartitionStatus(
startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId(), fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)

var fetchResultOpt: Option[FetchPartitionData] = None
Expand All @@ -116,6 +118,9 @@ class DelayedFetchTest {
assertTrue(delayedFetch.tryComplete())
assertTrue(delayedFetch.isCompleted)
assertTrue(fetchResultOpt.isDefined)

val fetchResult = fetchResultOpt.get
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.error)
}

@Test
Expand Down Expand Up @@ -164,18 +169,71 @@ class DelayedFetchTest {
assertTrue(delayedFetch.tryComplete())
assertTrue(delayedFetch.isCompleted)
assertTrue(fetchResultOpt.isDefined)

val fetchResult = fetchResultOpt.get
assertEquals(Errors.NONE, fetchResult.error)
}

@ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark minBytes={0}")
@ValueSource(ints = Array(1, 2))
def testDelayedFetchWithInvalidHighWatermark(minBytes: Int): Unit = {
val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val fetchOffset = 450L
val logStartOffset = 5L
val currentLeaderEpoch = Optional.of[Integer](10)
val replicaId = 1

val fetchStatus = FetchPartitionStatus(
startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500, minBytes = minBytes)

var fetchResultOpt: Option[FetchPartitionData] = None
def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
fetchResultOpt = Some(responses.head._2)
}

val delayedFetch = new DelayedFetch(
params = fetchParams,
fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
replicaManager = replicaManager,
quota = replicaQuota,
responseCallback = callback
)

val partition: Partition = mock(classOf[Partition])
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
// high-watermark is lesser than the log-start-offset
val endOffsetMetadata = new LogOffsetMetadata(0L, 0L, 0)
when(partition.fetchOffsetSnapshot(
currentLeaderEpoch,
fetchOnlyFromLeader = true))
.thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata))
when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE)

val expected = minBytes == 1
assertEquals(expected, delayedFetch.tryComplete())
assertEquals(expected, delayedFetch.isCompleted)
assertEquals(expected, fetchResultOpt.isDefined)

if (fetchResultOpt.isDefined) {
val fetchResult = fetchResultOpt.get
assertEquals(Errors.NONE, fetchResult.error)
}
}

private def buildFollowerFetchParams(
replicaId: Int,
maxWaitMs: Int
maxWaitMs: Int,
minBytes: Int = 1,
): FetchParams = {
new FetchParams(
ApiKeys.FETCH.latestVersion,
replicaId,
1,
maxWaitMs,
1,
minBytes,
maxBytes,
FetchIsolation.LOG_END,
Optional.empty()
Expand Down
43 changes: 43 additions & 0 deletions core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4208,6 +4208,49 @@ class UnifiedLogTest {
assertEquals(31, log.localLogStartOffset())
}

@Test
def testConvertToOffsetMetadataDoesNotThrowOffsetOutOfRangeError(): Unit = {
val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)

var offset = 0L
for(_ <- 0 until 50) {
val records = TestUtils.singletonRecords("test".getBytes())
val info = log.appendAsLeader(records, leaderEpoch = 0)
offset = info.lastOffset
if (offset != 0 && offset % 10 == 0)
log.roll()
}
assertEquals(5, log.logSegments.size)
log.updateHighWatermark(log.logEndOffset)
// simulate calls to upload 3 segments to remote storage
log.updateHighestOffsetInRemoteStorage(30)

log.deleteOldSegments()
assertEquals(2, log.logSegments.size())
assertEquals(0, log.logStartOffset)
assertEquals(31, log.localLogStartOffset())

log.updateLogStartOffsetFromRemoteTier(15)
assertEquals(15, log.logStartOffset)

// case-1: offset is higher than the local-log-start-offset.
// log-start-offset < local-log-start-offset < offset-to-be-converted < log-end-offset
val offsetMetadata = log.convertToOffsetMetadataOrThrow(35)
assertEquals(new LogOffsetMetadata(35, 31, 288), offsetMetadata)
// case-2: offset is lesser than the local-log-start-offset
// log-start-offset < offset-to-be-converted < local-log-start-offset < log-end-offset
val offsetMetadata1 = log.convertToOffsetMetadataOrThrow(29)
assertEquals(new LogOffsetMetadata(29, -1L, -1), offsetMetadata1)
// case-3: offset is higher than the log-end-offset
// log-start-offset < local-log-start-offset < log-end-offset < offset-to-be-converted
val offsetMetadata2 = log.convertToOffsetMetadataOrThrow(log.logEndOffset + 1)
assertEquals(new LogOffsetMetadata(log.logEndOffset + 1, -1L, -1), offsetMetadata2)
// case-4: offset is lesser than the log-start-offset
// offset-to-be-converted < log-start-offset < local-log-start-offset < log-end-offset
assertThrows(classOf[OffsetOutOfRangeException], () => log.convertToOffsetMetadataOrThrow(14))
}

private def appendTransactionalToBuffer(buffer: ByteBuffer,
producerId: Long,
producerEpoch: Short,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public final class LogOffsetMetadata {

//TODO KAFKA-14484 remove once UnifiedLog has been moved to the storage module
private static final long UNIFIED_LOG_UNKNOWN_OFFSET = -1L;
public static final long REMOTE_LOG_UNKNOWN_OFFSET = -2L;

public static final LogOffsetMetadata UNKNOWN_OFFSET_METADATA = new LogOffsetMetadata(-1L, 0L, 0);

Expand All @@ -42,11 +41,6 @@ public LogOffsetMetadata(long messageOffset) {
this(messageOffset, UNIFIED_LOG_UNKNOWN_OFFSET, UNKNOWN_FILE_POSITION);
}

public LogOffsetMetadata(long messageOffset,
long segmentBaseOffset) {
this(messageOffset, segmentBaseOffset, UNKNOWN_FILE_POSITION);
}

public LogOffsetMetadata(long messageOffset,
long segmentBaseOffset,
int relativePositionInSegment) {
Expand All @@ -57,10 +51,8 @@ public LogOffsetMetadata(long messageOffset,

// check if this offset is already on an older segment compared with the given offset
public boolean onOlderSegment(LogOffsetMetadata that) {
if (this.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET || that.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET)
return false;
if (messageOffsetOnly())
throw new KafkaException(this + " cannot compare its segment info with " + that + " since it only has message offset info");
return false;

return this.segmentBaseOffset < that.segmentBaseOffset;
}
Expand All @@ -73,8 +65,6 @@ private boolean onSameSegment(LogOffsetMetadata that) {
// compute the number of bytes between this offset to the given offset
// if they are on the same segment and this offset precedes the given offset
public int positionDiff(LogOffsetMetadata that) {
if (this.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET || that.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET)
return 1;
if (messageOffsetOnly())
throw new KafkaException(this + " cannot compare its segment position with " + that + " since it only has message offset info");
if (!onSameSegment(that))
Expand All @@ -85,8 +75,7 @@ public int positionDiff(LogOffsetMetadata that) {

// decide if the offset metadata only contains message offset info
public boolean messageOffsetOnly() {
return (segmentBaseOffset == UNIFIED_LOG_UNKNOWN_OFFSET || segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET)
&& relativePositionInSegment == UNKNOWN_FILE_POSITION;
return segmentBaseOffset == UNIFIED_LOG_UNKNOWN_OFFSET && relativePositionInSegment == UNKNOWN_FILE_POSITION;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;

import org.apache.kafka.common.KafkaException;
import org.junit.jupiter.api.Test;

import static org.apache.kafka.storage.internals.log.LogOffsetMetadata.UNKNOWN_OFFSET_METADATA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

class LogOffsetMetadataTest {

@Test
void testOnOlderSegment() {
LogOffsetMetadata metadata1 = new LogOffsetMetadata(1L, 0L, 1);
LogOffsetMetadata metadata2 = new LogOffsetMetadata(5L, 4L, 2);
LogOffsetMetadata messageOnlyMetadata = new LogOffsetMetadata(1L);
assertFalse(UNKNOWN_OFFSET_METADATA.onOlderSegment(UNKNOWN_OFFSET_METADATA));
assertFalse(metadata1.onOlderSegment(messageOnlyMetadata));
assertFalse(messageOnlyMetadata.onOlderSegment(metadata1));
assertFalse(metadata1.onOlderSegment(metadata1));
assertFalse(metadata2.onOlderSegment(metadata1));
assertTrue(metadata1.onOlderSegment(metadata2));
}

@Test
void testPositionDiff() {
LogOffsetMetadata metadata1 = new LogOffsetMetadata(1L);
LogOffsetMetadata metadata2 = new LogOffsetMetadata(5L, 0L, 5);
KafkaException exception = assertThrows(KafkaException.class, () -> metadata1.positionDiff(metadata2));
assertTrue(exception.getMessage().endsWith("since it only has message offset info"));

exception = assertThrows(KafkaException.class, () -> metadata2.positionDiff(metadata1));
assertTrue(exception.getMessage().endsWith("since they are not on the same segment"));

LogOffsetMetadata metadata3 = new LogOffsetMetadata(15L, 10L, 5);
exception = assertThrows(KafkaException.class, () -> metadata3.positionDiff(metadata2));
assertTrue(exception.getMessage().endsWith("since they are not on the same segment"));

LogOffsetMetadata metadata4 = new LogOffsetMetadata(40L, 10L, 100);
assertEquals(95, metadata4.positionDiff(metadata3));
}

@Test
void testMessageOffsetOnly() {
LogOffsetMetadata metadata1 = new LogOffsetMetadata(1L);
LogOffsetMetadata metadata2 = new LogOffsetMetadata(1L, 0L, 1);
assertFalse(UNKNOWN_OFFSET_METADATA.messageOffsetOnly());
assertFalse(metadata2.messageOffsetOnly());
assertTrue(metadata1.messageOffsetOnly());
}
}

0 comments on commit 7048828

Please sign in to comment.