Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions core/src/main/scala/kafka/server/DisklessFetchOffsetRouter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class DisklessFetchOffsetRouter(
* @param classicLogStartOffsetProvider returns the local UnifiedLog's `logStartOffset` for the
* given partition, used to decide whether the EARLIEST
* timestamp can be served by the classic path.
* @param hasCompleteClassicPrefix returns whether the local UnifiedLog's high watermark has
* reached the classic-to-diskless switch offset.
* @param classicFetchOffset runs the standard Kafka classic-path lookup for the
* given `(topicPartition, partition, allowFromFollower)`.
*/
Expand All @@ -90,20 +92,26 @@ class DisklessFetchOffsetRouter(
replicaId: Int,
version: Short,
classicLogStartOffsetProvider: TopicPartition => Option[Long],
hasCompleteClassicPrefix: (TopicPartition, Long) => Boolean,
classicFetchOffset: (TopicPartition, ListOffsetsPartition, Boolean) => ListOffsetsPartitionStatus
): ListOffsetsPartitionStatus = {
val classicToDisklessStartOffset = inklessMetadataView.getClassicToDisklessStartOffset(topicPartition)
val switchPending = classicToDisklessStartOffset == PartitionRegistration.CLASSIC_TO_DISKLESS_SWITCH_PENDING
val isSwitchedWithClassicAccess = (classicToDisklessStartOffset > 0 && disklessManagedReplicasEnabled)
val hasCommittedSwitchOffset = classicToDisklessStartOffset > 0
val isSwitchedWithClassicAccess = hasCommittedSwitchOffset && disklessManagedReplicasEnabled
val isConsolidatingPartition = disklessConsolidationEnabled && inklessMetadataView.isConsolidatingDisklessTopic(topicPartition.topic)

// Switched partitions seal their classic local log: once classicToDisklessStartOffset is
// committed the LEO can no longer grow and every ISR replica has the same data on disk.
// Any replica can therefore safely answer ListOffsets from its local log, so we let
// followers serve the classic-side query as well.
// Since consolidating partitions contain only data that has been stored in the diskless
// coordinator and its offsets won't change, we can allow follower requests.
val allowFromFollower = isSwitchedWithClassicAccess || isConsolidatingPartition
// committed the LEO can no longer grow. Any replica whose local HW has reached the seal
// can therefore safely answer ListOffsets from its local log, so we let those followers
// serve the classic-side query as well.
// Consolidating partitions can also allow follower requests, except when they are switched
// and this broker has not caught up to the sealed classic prefix.
val switchedAllowsFollower =
isSwitchedWithClassicAccess && hasCompleteClassicPrefix(topicPartition, classicToDisklessStartOffset)
val consolidatingAllowsFollower =
isConsolidatingPartition && (!hasCommittedSwitchOffset || hasCompleteClassicPrefix(topicPartition, classicToDisklessStartOffset))
val allowFromFollower = switchedAllowsFollower || consolidatingAllowsFollower
val isFollowerRequest = replicaId >= 0

def classicLookup(): ListOffsetsPartitionStatus = classicFetchOffset(topicPartition, partition, allowFromFollower)
Expand Down
24 changes: 20 additions & 4 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1711,6 +1711,8 @@ class ReplicaManager(val config: KafkaConfig,
classicFetchOffset(tp, p, replicaId, isolationLevel, version,
correlationId, clientId, buildErrorResponse, allowFromFollower = allowFromFollower)
val classicLogStart: TopicPartition => Option[Long] = tp => logManager.getLog(tp).map(_.logStartOffset)
val hasCompleteClassicPrefix: (TopicPartition, Long) => Boolean =
(tp, classicToDisklessStartOffset) => logManager.getLog(tp).exists(_.highWatermark >= classicToDisklessStartOffset)

topics.foreach { topic =>
topic.partitions.asScala.foreach { partition =>
Expand All @@ -1726,7 +1728,7 @@ class ReplicaManager(val config: KafkaConfig,
} else if (maybeFetchOffsetJob.exists(_.mustHandle(topic.name))) {
statusByPartition += topicPartition ->
disklessFetchOffsetRouter.route(maybeFetchOffsetJob.get, () => inklessFetchOffsetHandler.get.createJob(),
topicPartition, partition, replicaId, version, classicLogStart, classicFetch)
topicPartition, partition, replicaId, version, classicLogStart, hasCompleteClassicPrefix, classicFetch)
} else {
statusByPartition += topicPartition -> classicFetch(topicPartition, partition, false)
}
Expand Down Expand Up @@ -2801,7 +2803,11 @@ class ReplicaManager(val config: KafkaConfig,
lazy val inklessFetchOffsetHandlerJob: Option[FetchOffsetHandler.Job] = inklessFetchOffsetHandler.map(_.createJob())
var disklessOffsetForLeaderEpochRequested = false

def localOffsetForLeaderEpoch(topicPartition: TopicPartition, offsetForLeaderPartition: OffsetForLeaderPartition): EpochEndOffset = {
def localOffsetForLeaderEpoch(
topicPartition: TopicPartition,
offsetForLeaderPartition: OffsetForLeaderPartition,
fetchOnlyFromLeader: Boolean = true
): EpochEndOffset = {
getPartition(topicPartition) match {
case HostedPartition.Online(partition) =>
val currentLeaderEpochOpt =
Expand All @@ -2813,7 +2819,7 @@ class ReplicaManager(val config: KafkaConfig,
partition.lastOffsetForLeaderEpoch(
currentLeaderEpochOpt,
offsetForLeaderPartition.leaderEpoch,
fetchOnlyFromLeader = true)
fetchOnlyFromLeader = fetchOnlyFromLeader)

case HostedPartition.Offline(_) =>
new EpochEndOffset()
Expand Down Expand Up @@ -2891,7 +2897,17 @@ class ReplicaManager(val config: KafkaConfig,
disklessOffsetForLeaderEpoch(topicPartition, offsetForLeaderPartition)

case classicToDisklessStartOffset if classicToDisklessStartOffset >= 0L =>
val localResult = localOffsetForLeaderEpoch(topicPartition, offsetForLeaderPartition)
// The classic prefix is sealed at the switch offset, so any replica with the
// complete local classic log can answer epoch lookups for that prefix.
val hasCompleteLocalClassicPrefix = getPartition(topicPartition) match {
case HostedPartition.Online(partition) =>
partition.log.exists(_.highWatermark >= classicToDisklessStartOffset)
case _ => false
}
val localResult = localOffsetForLeaderEpoch(
topicPartition,
offsetForLeaderPartition,
fetchOnlyFromLeader = !hasCompleteLocalClassicPrefix)
val localError = Errors.forCode(localResult.errorCode)
if (localError != Errors.NONE) {
() => localResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class DisklessFetchOffsetRouterTest {
replicaId: Int = consumerReplicaId,
version: Short = 7,
classicLogStartOffset: Option[Long] = None,
hasCompleteClassicPrefix: Boolean = true,
classicResult: ListOffsetsPartitionStatus = defaultClassicResult,
newJob: () => FetchOffsetHandler.Job = () =>
throw new AssertionError("newJob() should not be called by this routing path")): ListOffsetsPartitionStatus = {
Expand All @@ -105,6 +106,7 @@ class DisklessFetchOffsetRouterTest {
replicaId = replicaId,
version = version,
classicLogStartOffsetProvider = _ => classicLogStartOffset,
hasCompleteClassicPrefix = (_, _) => hasCompleteClassicPrefix,
classicFetchOffset = (tpArg, partition, allow) => {
classicCalls += ((tpArg, partition, allow))
classicResult
Expand Down Expand Up @@ -179,6 +181,21 @@ class DisklessFetchOffsetRouterTest {
verify(job, never()).add(any(), any())
}

@Test
def routesToClassicWithoutFollowerAccessWhenSwitchedFollowerClassicPrefixIsIncomplete(): Unit = {
when(inklessMetadataView.getClassicToDisklessStartOffset(tp)).thenReturn(100L)

val status = route(
newRouter(),
timestamp = ListOffsetsRequest.LATEST_TIMESTAMP,
replicaId = followerReplicaId,
hasCompleteClassicPrefix = false)

assertSame(defaultClassicResult, status)
assertClassicCalledWith(allowFromFollower = false)
verify(job, never()).add(any(), any())
}

// ---------------------------------------------------------------------------
// Case 2: hybrid routing by timestamp.
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -480,6 +497,23 @@ class DisklessFetchOffsetRouterTest {
assertNotSame(noMatch, status)
}

@Test
def hybridConsolidatingWithCommittedBoundaryRequiresCompleteClassicPrefixForFollowerAccess(): Unit = {
when(inklessMetadataView.getClassicToDisklessStartOffset(tp)).thenReturn(100L)
when(inklessMetadataView.isConsolidatingDisklessTopic(tp.topic)).thenReturn(true)

val status = route(
newRouter(disklessConsolidationEnabled = true),
timestamp = ListOffsetsRequest.EARLIEST_TIMESTAMP,
classicLogStartOffset = Some(0L),
hasCompleteClassicPrefix = false
)

assertSame(defaultClassicResult, status)
assertClassicCalledWith(allowFromFollower = false)
verify(job, never()).add(any(), any())
}

@Test
def hybridConsolidatingEarliestReturnsClassicWhenClassicHitsEvenIfLogPastBoundary(): Unit = {
when(inklessMetadataView.getClassicToDisklessStartOffset(tp)).thenReturn(100L)
Expand Down
111 changes: 111 additions & 0 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8284,6 +8284,117 @@ class ReplicaManagerTest {
}
}

@Test
def testLastOffsetForLeaderEpochHybridAtSwitchBoundaryUsesFollowerLocalLog(): Unit = {
val jobMock = Mockito.mock(classOf[FetchOffsetHandler.Job])
when(jobMock.mustHandle(any())).thenReturn(true)
doNothing().when(jobMock).start()

val fetchOffsetHandlerCtorInit: MockedConstruction.MockInitializer[FetchOffsetHandler] = {
case (handlerMock, _) =>
when(handlerMock.createJob()).thenReturn(jobMock)
}
val fetchOffsetHandlerCtor = mockConstruction(classOf[FetchOffsetHandler], fetchOffsetHandlerCtorInit)

val replicaManager = try {
createReplicaManager(List(disklessTopicPartition.topic()), disklessManagedReplicasEnabled = true)
} finally {
fetchOffsetHandlerCtor.close()
}
try {
val partition = setupHybridLeaderPartition(replicaManager, disklessTopicPartition, localEndOffset = 101L)
val remoteLeaderId = replicaManager.config.brokerId + 1
partition.makeFollower(
partitionRegistration(
remoteLeaderId,
leaderEpoch = 1,
isr = Array(remoteLeaderId, replicaManager.config.brokerId),
partitionEpoch = 1,
replicas = Array(replicaManager.config.brokerId, remoteLeaderId)),
isNew = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava),
Some(disklessTopicPartition.topicId()))
assertFalse(partition.isLeader)
when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition()))
.thenReturn(101L)

val requestedEpochInfo = Seq(
new OffsetForLeaderTopic()
.setTopic(disklessTopicPartition.topic())
.setPartitions(util.List.of(
new OffsetForLeaderPartition()
.setPartition(disklessTopicPartition.partition())
.setLeaderEpoch(0)
))
)

val result = replicaManager.lastOffsetForLeaderEpoch(requestedEpochInfo)

val partitionResult = result.head.partitions().get(0)
assertEquals(Errors.NONE.code, partitionResult.errorCode())
assertEquals(101L, partitionResult.endOffset())
verify(jobMock, never()).add(any(), any())
verify(jobMock, never()).start()
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}

@Test
def testLastOffsetForLeaderEpochHybridAtSwitchBoundaryRejectsLaggingFollowerLocalLog(): Unit = {
val jobMock = Mockito.mock(classOf[FetchOffsetHandler.Job])
when(jobMock.mustHandle(any())).thenReturn(true)
doNothing().when(jobMock).start()

val fetchOffsetHandlerCtorInit: MockedConstruction.MockInitializer[FetchOffsetHandler] = {
case (handlerMock, _) =>
when(handlerMock.createJob()).thenReturn(jobMock)
}
val fetchOffsetHandlerCtor = mockConstruction(classOf[FetchOffsetHandler], fetchOffsetHandlerCtorInit)

val replicaManager = try {
createReplicaManager(List(disklessTopicPartition.topic()), disklessManagedReplicasEnabled = true)
} finally {
fetchOffsetHandlerCtor.close()
}
try {
val partition = setupHybridLeaderPartition(replicaManager, disklessTopicPartition, localEndOffset = 50L)
val remoteLeaderId = replicaManager.config.brokerId + 1
partition.makeFollower(
partitionRegistration(
remoteLeaderId,
leaderEpoch = 1,
isr = Array(remoteLeaderId),
partitionEpoch = 1,
replicas = Array(replicaManager.config.brokerId, remoteLeaderId)),
isNew = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava),
Some(disklessTopicPartition.topicId()))
assertFalse(partition.isLeader)
when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition()))
.thenReturn(101L)

val requestedEpochInfo = Seq(
new OffsetForLeaderTopic()
.setTopic(disklessTopicPartition.topic())
.setPartitions(util.List.of(
new OffsetForLeaderPartition()
.setPartition(disklessTopicPartition.partition())
.setLeaderEpoch(0)
))
)

val result = replicaManager.lastOffsetForLeaderEpoch(requestedEpochInfo)

val partitionResult = result.head.partitions().get(0)
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, partitionResult.errorCode())
verify(jobMock, never()).add(any(), any())
verify(jobMock, never()).start()
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}

@Test
def testLastOffsetForLeaderEpochHybridFallsBackToDisklessWhenClassicLogCannotAnswer(): Unit = {
val disklessResult = new OffsetResultHolder.FileRecordsOrError(
Expand Down
Loading