Skip to content

Commit

Permalink
KAFKA-15080; Fetcher's lag never set when partition is idle (#13843)
Browse files Browse the repository at this point in the history
The PartitionFetchState's lag field is set to None when the state is created and it is updated when bytes are received for a partition. For idle partitions (newly created or not), the lag is never updated because `validBytes > 0` is never true. As a side effect, the partition is considered out-of-sync and could be incorrectly throttled.

Reviewers: Divij Vaidya <diviv@amazon.com>, Jason Gustafson <jason@confluent.io>
  • Loading branch information
dajac committed Jun 13, 2023
1 parent 303b457 commit dfe050c
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 7 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Expand Up @@ -363,13 +363,13 @@ abstract class AbstractFetcherThread(name: String,
fetcherLagStats.getAndMaybePut(topicPartition).lag = lag

// ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
if (validBytes > 0 && partitionStates.contains(topicPartition)) {
if ((validBytes > 0 || currentFetchState.lag.isEmpty) && partitionStates.contains(topicPartition)) {
// Update partitionStates only if there is no exception during processPartitionData
val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
currentFetchState.currentLeaderEpoch, state = Fetching,
logAppendInfo.lastLeaderEpoch.asScala)
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
fetcherStats.byteRate.mark(validBytes)
if (validBytes > 0) fetcherStats.byteRate.mark(validBytes)
}
}
}
Expand Down
105 changes: 100 additions & 5 deletions core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
Expand Up @@ -30,13 +30,13 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetFor
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, RecordConversionStats, SimpleRecord}
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, UpdateMetadataRequest}
import org.apache.kafka.common.utils.{LogContext, SystemTime}
import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion}
import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.storage.internals.log.LogAppendInfo
import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogOffsetMetadata}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.params.ParameterizedTest
Expand All @@ -47,8 +47,7 @@ import org.mockito.Mockito.{mock, never, times, verify, when}

import java.nio.charset.StandardCharsets
import java.util
import java.util.{Collections, Optional}

import java.util.{Collections, Optional, OptionalInt}
import scala.collection.{Map, mutable}
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -742,6 +741,102 @@ class ReplicaFetcherThreadTest {
verify(log, times(0)).maybeUpdateHighWatermark(anyLong())
}

@Test
def testLagIsUpdatedWhenNoRecords(): Unit = {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
val quota: ReplicationQuotaManager = mock(classOf[ReplicationQuotaManager])
val logManager: LogManager = mock(classOf[LogManager])
val log: UnifiedLog = mock(classOf[UnifiedLog])
val partition: Partition = mock(classOf[Partition])
val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])

when(log.highWatermark).thenReturn(0)
when(log.latestEpoch).thenReturn(Some(0))
when(log.endOffsetForEpoch(0)).thenReturn(Some(new OffsetAndEpoch(0, 0)))
when(log.logEndOffset).thenReturn(0)
when(log.maybeUpdateHighWatermark(0)).thenReturn(None)

when(replicaManager.metadataCache).thenReturn(metadataCache)
when(replicaManager.logManager).thenReturn(logManager)
when(replicaManager.localLogOrException(t1p0)).thenReturn(log)
when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition)
when(replicaManager.brokerTopicStats).thenReturn(mock(classOf[BrokerTopicStats]))

when(partition.localLogOrException).thenReturn(log)
when(partition.appendRecordsToFollowerOrFutureReplica(any(), any())).thenReturn(Some(new LogAppendInfo(
Optional.empty[LogOffsetMetadata],
0,
OptionalInt.empty,
RecordBatch.NO_TIMESTAMP,
-1L,
RecordBatch.NO_TIMESTAMP,
-1L,
RecordConversionStats.EMPTY,
CompressionType.NONE,
CompressionType.NONE,
-1,
0, // No records.
false,
-1L
)))

val logContext = new LogContext(s"[ReplicaFetcher replicaId=${config.brokerId}, leaderId=${brokerEndPoint.id}, fetcherId=0] ")

val mockNetwork = new MockBlockingSender(
Collections.emptyMap(),
brokerEndPoint,
new SystemTime()
)

val leader = new RemoteLeaderEndPoint(
logContext.logPrefix,
mockNetwork,
new FetchSessionHandler(logContext, brokerEndPoint.id),
config,
replicaManager,
quota,
() => config.interBrokerProtocolVersion,
() => 1
)

val thread = new ReplicaFetcherThread(
"fetcher-thread",
leader,
config,
failedPartitions,
replicaManager,
quota,
logContext.logPrefix,
() => config.interBrokerProtocolVersion
)

thread.addPartitions(Map(
t1p0 -> initialFetchState(Some(topicId1), 0))
)

// Lag is initialized to None when the partition fetch
// state is created.
assertEquals(None, thread.fetchState(t1p0).flatMap(_.lag))

// Prepare the fetch response data.
mockNetwork.setFetchPartitionDataForNextResponse(Map(
t1p0 -> new FetchResponseData.PartitionData()
.setPartitionIndex(t1p0.partition)
.setLastStableOffset(0)
.setLogStartOffset(0)
.setHighWatermark(0)
.setRecords(MemoryRecords.EMPTY) // No records.
))
mockNetwork.setIdsForNextResponse(topicIds)

// Sends the fetch request and processes the response.
thread.doWork()
assertEquals(1, mockNetwork.fetchCount)

// Lag is set to Some(0).
assertEquals(Some(0), thread.fetchState(t1p0).flatMap(_.lag))
}

@Test
def shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20(): Unit = {

Expand Down

0 comments on commit dfe050c

Please sign in to comment.