From 6005e9be1d6cfaeacd50fbed00929de35a843b45 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Fri, 16 Dec 2016 11:36:06 -0800 Subject: [PATCH 1/9] KAFKA-4485; Follower should be in the isr if its FetchRequest has fetched up to the logEndOffset of leader --- .../main/scala/kafka/cluster/Partition.scala | 25 ++++++++++----- .../main/scala/kafka/cluster/Replica.scala | 32 ++++++++++++++----- .../scala/kafka/server/ReplicaManager.scala | 27 +++++++++------- .../unit/kafka/server/ISRExpirationTest.scala | 11 ++++--- .../unit/kafka/server/SimpleFetchTest.scala | 2 +- 5 files changed, 63 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 9eb92cd24395..1a88806099df 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -255,7 +255,8 @@ class Partition(val topic: String, replica.updateLogReadResult(logReadResult) // check if we need to expand ISR to include this replica // if it is not in the ISR yet - maybeExpandIsr(replicaId) + maybeExpandIsr(replicaId, logReadResult) + debug("Recorded replica %d log end offset (LEO) position %d for partition %s." .format(replicaId, @@ -273,20 +274,21 @@ class Partition(val topic: String, } /** - * Check and maybe expand the ISR of the partition. + * Check and maybe expand the ISR of the partition. A replica is in ISR of the partition if and only if + * replica's LEO >= HW and replica's lag <= replicaLagTimeMaxMs. * * This function can be triggered when a replica's LEO has incremented */ - def maybeExpandIsr(replicaId: Int) { + def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult) { val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) { // check if this replica needs to be added to the ISR leaderReplicaIfLocal() match { case Some(leaderReplica) => val replica = getReplica(replicaId).get - val leaderHW = leaderReplica.highWatermark if(!inSyncReplicas.contains(replica) && assignedReplicas.map(_.brokerId).contains(replicaId) && - replica.logEndOffset.offsetDiff(leaderHW) >= 0) { + logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.hw && + logReadResult.fetchTimeMs - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs) { val newInSyncReplicas = inSyncReplicas + replica info("Expanding ISR for partition [%s,%d] from %s to %s" .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), @@ -298,7 +300,8 @@ class Partition(val topic: String, // check if the HW of the partition can now be incremented // since the replica maybe now be in the ISR and its LEO has just incremented - maybeIncrementLeaderHW(leaderReplica) + // TODO: is this maybeIncrementLeaderHW() necessary? + maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs) case None => false // nothing to do if no longer leader } @@ -362,12 +365,18 @@ class Partition(val topic: String, * 1. Partition ISR changed * 2. Any replica's LEO changed * + * We only increase HW if HW is smaller than LEO of all replicas whose lag <= replicaLagTimeMaxMs. + * This means that if a replica does not lag much behind leader and the replica's LEO is smaller than HW, HW will + * wait for this replica to catch up so that this replica can be added to ISR set. + * * Returns true if the HW was incremented, and false otherwise. * Note There is no need to acquire the leaderIsrUpdate lock here * since all callers of this private API acquire that lock */ - private def maybeIncrementLeaderHW(leaderReplica: Replica): Boolean = { - val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset) + private def maybeIncrementLeaderHW(leaderReplica: Replica, curTime: Long = time.milliseconds): Boolean = { + val allLogEndOffsets = assignedReplicas.filter(replica => { + curTime - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs || inSyncReplicas.contains(replica) + }).map(_.logEndOffset) val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering) val oldHighWatermark = leaderReplica.highWatermark if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) { diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 40cf1814dfa0..e094fd5887a4 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -36,25 +36,41 @@ class Replica(val brokerId: Int, // for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch @volatile private[this] var logEndOffsetMetadata: LogOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata + // The log end offset value at the time the leader receives last FetchRequest from this follower. + // This is used to determine the last catch-up time of the follower + @volatile var lastFetchLeaderLogEndOffset: Long = 0L + + // The time when the leader receives last FetchRequest from this follower + // This is used to determine the last catch-up time of the follower + @volatile var lastFetchTimeMs: Long = 0L + val topic = partition.topic val partitionId = partition.partitionId def isLocal: Boolean = log.isDefined + // lastCatchUpTimeMs is the largest time t such that the begin offset of most recent FetchRequest from this follower >= + // the LEO of leader at time t. This is used to determine the lag of this follower and ISR of this partition. private[this] val lastCaughtUpTimeMsUnderlying = new AtomicLong(time.milliseconds) def lastCaughtUpTimeMs = lastCaughtUpTimeMsUnderlying.get() + /* + * If the FetchRequest reads up to the log end offset of the leader when the current fetch request was received, + * set the lastCaughtUpTimeMsUnderlying to the time when the current fetch request was received. + * + * Else if the FetchRequest reads up to the log end offset of the the leader when the previous fetch request was received, + * set the lastCaughtUpTimeMsUnderlying to the time when the previous fetch request was received. + */ def updateLogReadResult(logReadResult : LogReadResult) { - logEndOffset = logReadResult.info.fetchOffsetMetadata + if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset) + lastCaughtUpTimeMsUnderlying.set(logReadResult.fetchTimeMs) + else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset) + lastCaughtUpTimeMsUnderlying.set(lastFetchTimeMs) - /* If the request read up to the log end offset snapshot when the read was initiated, - * set the lastCaughtUpTimeMsUnderlying to the current time. - * This means that the replica is fully caught up. - */ - if(logReadResult.isReadFromLogEnd) { - lastCaughtUpTimeMsUnderlying.set(time.milliseconds) - } + logEndOffset = logReadResult.info.fetchOffsetMetadata + lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset + lastFetchTimeMs = logReadResult.fetchTimeMs } private def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 9e4c14918964..ec997829a287 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -64,8 +64,9 @@ case class LogAppendResult(info: LogAppendInfo, error: Option[Throwable] = None) */ case class LogReadResult(info: FetchDataInfo, hw: Long, + leaderLogEndOffset: Long, + fetchTimeMs: Long, readSize: Int, - isReadFromLogEnd : Boolean, error: Option[Throwable] = None) { def errorCode = error match { @@ -74,8 +75,8 @@ case class LogReadResult(info: FetchDataInfo, } override def toString = { - "Fetch Data: [%s], HW: [%d], readSize: [%d], isReadFromLogEnd: [%b], error: [%s]" - .format(info, hw, readSize, isReadFromLogEnd, error) + "Fetch Data: [%s], HW: [%d], leaderLogEndOffset: [%d], readSize: [%d], error: [%s]" + .format(info, hw, leaderLogEndOffset, readSize, error) } } @@ -85,8 +86,9 @@ object LogReadResult { val UnknownLogReadResult = LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), -1L, - -1, - false) + -1L, + -1L, + -1) } case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[TopicPartition, Short], errorCode: Short) { @@ -217,7 +219,8 @@ class ReplicaManager(val config: KafkaConfig, def startup() { // start ISR expiration thread - scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS) + // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x (1 + 20%) before it is removed from ISR + scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs / 5, unit = TimeUnit.MILLISECONDS) scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS) } @@ -559,7 +562,9 @@ class ReplicaManager(val config: KafkaConfig, * where data gets appended to the log immediately after the replica has consumed from it * This can cause a replica to always be out of sync. */ - val initialLogEndOffset = localReplica.logEndOffset + val initialLogEndOffset = localReplica.logEndOffset.messageOffset + val initialHighWatermark = localReplica.highWatermark.messageOffset + val fetchTimeMs = time.milliseconds val logReadInfo = localReplica.log match { case Some(log) => val adjustedFetchSize = math.min(partitionFetchSize, limitBytes) @@ -581,9 +586,7 @@ class ReplicaManager(val config: KafkaConfig, FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY) } - val readToEndOfLog = initialLogEndOffset.messageOffset - logReadInfo.fetchOffsetMetadata.messageOffset <= 0 - - LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, partitionFetchSize, readToEndOfLog, None) + LogReadResult(logReadInfo, initialHighWatermark, initialLogEndOffset, fetchTimeMs, partitionFetchSize, None) } catch { // NOTE: Failed fetch requests metric is not incremented for known exceptions since it // is supposed to indicate un-expected failure of a broker in handling a fetch request @@ -592,13 +595,13 @@ class ReplicaManager(val config: KafkaConfig, _: ReplicaNotAvailableException | _: OffsetOutOfRangeException) => LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), -1L, - partitionFetchSize, false, Some(e)) + -1L, -1L, partitionFetchSize, Some(e)) case e: Throwable => BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark() error(s"Error processing fetch operation on partition $tp, offset $offset", e) LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), -1L, - partitionFetchSize, false, Some(e)) + -1L, -1L, partitionFetchSize, Some(e)) } } diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index aad37d1d7299..f04f6cd341f3 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -77,8 +77,9 @@ class IsrExpirationTest { (partition0.assignedReplicas() - leaderReplica).foreach( r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), -1L, - -1, - true))) + -1L, + -1L, + -1))) var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -126,7 +127,7 @@ class IsrExpirationTest { // Make the remote replica not read to the end of log. It should be not be out of sync for at least 100 ms for(replica <- partition0.assignedReplicas() - leaderReplica) - replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MemoryRecords.EMPTY), -1L, -1, false)) + replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MemoryRecords.EMPTY), -1L, -1L, -1L, -1)) // Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of the log. // The replicas will no longer be in ISR. We do 2 fetches because we want to simulate the case where the replica is lagging but is not stuck @@ -136,7 +137,7 @@ class IsrExpirationTest { time.sleep(75) (partition0.assignedReplicas() - leaderReplica).foreach( - r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(11L), MemoryRecords.EMPTY), -1L, -1, false))) + r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(11L), MemoryRecords.EMPTY), -1L, -1L, -1L, -1))) partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -148,7 +149,7 @@ class IsrExpirationTest { // Now actually make a fetch to the end of the log. The replicas should be back in ISR (partition0.assignedReplicas() - leaderReplica).foreach( - r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), -1L, -1, true))) + r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), -1L, -1L, -1L, -1))) partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index ff7265783417..df46ca41332b 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -111,7 +111,7 @@ class SimpleFetchTest { // create the follower replica with defined log end offset val followerReplica= new Replica(configs(1).brokerId, partition, time) val leo = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt) - followerReplica.updateLogReadResult(new LogReadResult(FetchDataInfo(leo, MemoryRecords.EMPTY), -1L, -1, true)) + followerReplica.updateLogReadResult(new LogReadResult(FetchDataInfo(leo, MemoryRecords.EMPTY), -1L, -1L, -1L, -1)) // add both of them to ISR val allReplicas = List(leaderReplica, followerReplica) From 5ca45f3bec18fd6cac552dca0659d7e063adc61b Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Sun, 18 Dec 2016 01:30:47 -0800 Subject: [PATCH 2/9] Initialize lastCatchUpTimeMs to 0L. Reset it to 0L when broker receives LeaderAndIsrRequest. --- .../apache/kafka/clients/CommonClientConfigs.java | 3 ++- core/src/main/scala/kafka/cluster/Partition.scala | 8 +++----- core/src/main/scala/kafka/cluster/Replica.scala | 12 +++++++++++- core/src/main/scala/kafka/server/KafkaConfig.scala | 3 ++- 4 files changed, 18 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index 3327815e5e8b..49396c58b3bf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -77,7 +77,8 @@ public class CommonClientConfigs { public static final String REQUEST_TIMEOUT_MS_DOC = "The configuration controls the maximum amount of time the client will wait " + "for the response of a request. If the response is not received before the timeout " + "elapses the client will resend the request if necessary or fail the request if " - + "retries are exhausted."; + + "retries are exhausted. request.timeout.ms should be larger than replica.lag.time.max.ms " + + "to reduce message duplication caused by unnecessary producer retry."; private static List nonTestingSecurityProtocolNames() { List names = new ArrayList<>(); diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 1a88806099df..e07f1d131c4a 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -186,8 +186,8 @@ class Partition(val topic: String, // record the epoch of the controller that made the leadership decision. This is useful while updating the isr // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = partitionStateInfo.controllerEpoch - // add replicas that are new - allReplicas.foreach(replica => getOrCreateReplica(replica)) + // add replicas that are new and reset lastCatchUpTime of all replicas to exclude offline replicas from ISR + allReplicas.foreach(replica => getOrCreateReplica(replica).resetLastCatchUpTime()) val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet // remove assigned replicas that have been removed by the controller (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica) @@ -257,7 +257,6 @@ class Partition(val topic: String, // if it is not in the ISR yet maybeExpandIsr(replicaId, logReadResult) - debug("Recorded replica %d log end offset (LEO) position %d for partition %s." .format(replicaId, logReadResult.info.fetchOffsetMetadata.messageOffset, @@ -299,8 +298,7 @@ class Partition(val topic: String, } // check if the HW of the partition can now be incremented - // since the replica maybe now be in the ISR and its LEO has just incremented - // TODO: is this maybeIncrementLeaderHW() necessary? + // since the replica may already be in the ISR and its LEO has just incremented maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs) case None => false // nothing to do if no longer leader diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index e094fd5887a4..f8236c0dce92 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -51,7 +51,7 @@ class Replica(val brokerId: Int, // lastCatchUpTimeMs is the largest time t such that the begin offset of most recent FetchRequest from this follower >= // the LEO of leader at time t. This is used to determine the lag of this follower and ISR of this partition. - private[this] val lastCaughtUpTimeMsUnderlying = new AtomicLong(time.milliseconds) + private[this] val lastCaughtUpTimeMsUnderlying = new AtomicLong(0L) def lastCaughtUpTimeMs = lastCaughtUpTimeMsUnderlying.get() @@ -61,6 +61,10 @@ class Replica(val brokerId: Int, * * Else if the FetchRequest reads up to the log end offset of the the leader when the previous fetch request was received, * set the lastCaughtUpTimeMsUnderlying to the time when the previous fetch request was received. + * + * This is needed to enforce the semantics of ISR, i.e. a replica is in ISR if and only if it lags behind leader's LEO + * by at most replicaLagTimeMaxMs. This semantics allows a follower to be added to the ISR even if offset of its fetch request is + * always smaller than leader's LEO, which can happen if there are constant small produce requests at high frequency. */ def updateLogReadResult(logReadResult : LogReadResult) { if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset) @@ -73,6 +77,12 @@ class Replica(val brokerId: Int, lastFetchTimeMs = logReadResult.fetchTimeMs } + def resetLastCatchUpTime() { + lastFetchLeaderLogEndOffset = 0L + lastFetchTimeMs = 0L + lastCaughtUpTimeMsUnderlying.set(0L) + } + private def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) { if (isLocal) { throw new KafkaException("Should not set log end offset on partition [%s,%d]'s local replica %d".format(topic, partitionId, brokerId)) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 40a5e1099d1b..c471264e76c2 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -476,7 +476,8 @@ object KafkaConfig { val ControllerMessageQueueSizeDoc = "The buffer size for controller-to-broker-channels" val DefaultReplicationFactorDoc = "default replication factors for automatically created topics" val ReplicaLagTimeMaxMsDoc = "If a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time," + - " the leader will remove the follower from isr" + " the leader will remove the follower from isr. replica.lag.time.max.ms should be smaller than request.timeout.ms" + + " to reduce message duplication caused by unnecessary producer retry." val ReplicaSocketTimeoutMsDoc = "The socket timeout for network requests. Its value should be at least replica.fetch.wait.max.ms" val ReplicaSocketReceiveBufferBytesDoc = "The socket receive buffer for network requests" val ReplicaFetchMaxBytesDoc = "The number of bytes of messages to attempt to fetch for each partition. This is not an absolute maximum, " + From e2da89c427c0eda213cc93104d8029035411a896 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Sun, 18 Dec 2016 01:51:19 -0800 Subject: [PATCH 3/9] Address comment --- core/src/main/scala/kafka/cluster/Partition.scala | 6 ++++-- core/src/main/scala/kafka/server/ReplicaManager.scala | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index e07f1d131c4a..22e557b316c4 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -286,8 +286,10 @@ class Partition(val topic: String, val replica = getReplica(replicaId).get if(!inSyncReplicas.contains(replica) && assignedReplicas.map(_.brokerId).contains(replicaId) && - logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.hw && - logReadResult.fetchTimeMs - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs) { + // This approximates the requirement logReadResult.fetchTimeMs - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs. + // We don't directly specify the above requirement in order to make maybeExpandIsr() consistent with ReplicaFetcherThread.shouldFollowerThrottle() + // A offset replica whose lag > ReplicaFetcherThread may still exceed hw because maybeShrinkIsr() is called periodically + logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.hw) { val newInSyncReplicas = inSyncReplicas + replica info("Expanding ISR for partition [%s,%d] from %s to %s" .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index ec997829a287..6c2b814425dd 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -219,8 +219,8 @@ class ReplicaManager(val config: KafkaConfig, def startup() { // start ISR expiration thread - // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x (1 + 20%) before it is removed from ISR - scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs / 5, unit = TimeUnit.MILLISECONDS) + // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x (1 + 50%) before it is removed from ISR + scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS) scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS) } From f3b80adf4e595b249f4a29450935519862c25765 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Sun, 18 Dec 2016 13:52:07 -0800 Subject: [PATCH 4/9] Fix integration tests --- .../unit/kafka/server/ISRExpirationTest.scala | 18 +++++++++--------- .../unit/kafka/server/SimpleFetchTest.scala | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index f04f6cd341f3..3aded576474e 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -74,12 +74,8 @@ class IsrExpirationTest { val leaderReplica = partition0.getReplica(configs.head.brokerId).get // let the follower catch up to the Leader logEndOffset (15) - (partition0.assignedReplicas() - leaderReplica).foreach( - r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), - -1L, - -1L, - -1L, - -1))) + for(replica <- partition0.assignedReplicas() - leaderReplica) + replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), 15L, 15L, time.milliseconds, -1)) var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -127,7 +123,7 @@ class IsrExpirationTest { // Make the remote replica not read to the end of log. It should be not be out of sync for at least 100 ms for(replica <- partition0.assignedReplicas() - leaderReplica) - replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MemoryRecords.EMPTY), -1L, -1L, -1L, -1)) + replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MemoryRecords.EMPTY), 10L, 15L, time.milliseconds, -1)) // Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of the log. // The replicas will no longer be in ISR. We do 2 fetches because we want to simulate the case where the replica is lagging but is not stuck @@ -137,7 +133,7 @@ class IsrExpirationTest { time.sleep(75) (partition0.assignedReplicas() - leaderReplica).foreach( - r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(11L), MemoryRecords.EMPTY), -1L, -1L, -1L, -1))) + r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(11L), MemoryRecords.EMPTY), 11L, 15L, time.milliseconds, -1))) partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -149,7 +145,7 @@ class IsrExpirationTest { // Now actually make a fetch to the end of the log. The replicas should be back in ISR (partition0.assignedReplicas() - leaderReplica).foreach( - r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), -1L, -1L, -1L, -1))) + r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), 15L, 15L, time.milliseconds, -1))) partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -166,6 +162,10 @@ class IsrExpirationTest { allReplicas.foreach(r => partition.addReplicaIfNotExists(r)) // set in sync replicas for this partition to all the assigned replicas partition.inSyncReplicas = allReplicas.toSet + // make the remote replica read to the end of log + for(replica <- partition.assignedReplicas() - leaderReplica) + replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MemoryRecords.EMPTY), 10L, 10L, time.milliseconds, -1)) + // set the leader and its hw and the hw update time partition.leaderReplicaIdOpt = Some(leaderId) partition diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index df46ca41332b..5a476cb32028 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -111,7 +111,7 @@ class SimpleFetchTest { // create the follower replica with defined log end offset val followerReplica= new Replica(configs(1).brokerId, partition, time) val leo = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt) - followerReplica.updateLogReadResult(new LogReadResult(FetchDataInfo(leo, MemoryRecords.EMPTY), -1L, -1L, -1L, -1)) + followerReplica.updateLogReadResult(new LogReadResult(FetchDataInfo(leo, MemoryRecords.EMPTY), leo.messageOffset, leo.messageOffset, time.milliseconds, -1)) // add both of them to ISR val allReplicas = List(leaderReplica, followerReplica) From 31ac89319f67a1187145e86f9c249007dee95115 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Mon, 19 Dec 2016 18:20:28 -0800 Subject: [PATCH 5/9] Address comment --- .../kafka/clients/CommonClientConfigs.java | 3 +- .../clients/consumer/ConsumerConfig.java | 4 ++- .../main/scala/kafka/cluster/Partition.scala | 31 ++++++++++------- .../main/scala/kafka/cluster/Replica.scala | 6 ++-- .../main/scala/kafka/server/KafkaConfig.scala | 3 +- .../unit/kafka/server/ISRExpirationTest.scala | 33 +++++++++++++++---- .../unit/kafka/server/SimpleFetchTest.scala | 6 +++- 7 files changed, 59 insertions(+), 27 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index 49396c58b3bf..3327815e5e8b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -77,8 +77,7 @@ public class CommonClientConfigs { public static final String REQUEST_TIMEOUT_MS_DOC = "The configuration controls the maximum amount of time the client will wait " + "for the response of a request. If the response is not received before the timeout " + "elapses the client will resend the request if necessary or fail the request if " - + "retries are exhausted. request.timeout.ms should be larger than replica.lag.time.max.ms " - + "to reduce message duplication caused by unnecessary producer retry."; + + "retries are exhausted."; private static List nonTestingSecurityProtocolNames() { List names = new ArrayList<>(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 13cc4c4adf8c..a8e3a78b7df4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -198,7 +198,9 @@ public class ConsumerConfig extends AbstractConfig { /** request.timeout.ms */ public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; - private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC; + private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC + + " request.timeout.ms should be larger than replica.lag.time.max.ms " + + "to reduce message duplication caused by unnecessary producer retry."; /** interceptor.classes */ public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes"; diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 22e557b316c4..cb7119ba2612 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -186,8 +186,12 @@ class Partition(val topic: String, // record the epoch of the controller that made the leadership decision. This is useful while updating the isr // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = partitionStateInfo.controllerEpoch - // add replicas that are new and reset lastCatchUpTime of all replicas to exclude offline replicas from ISR - allReplicas.foreach(replica => getOrCreateReplica(replica).resetLastCatchUpTime()) + // add replicas that are new and reset lastCatchUpTime of non-isr replicas + allReplicas.foreach(replica => { + val r = getOrCreateReplica(replica) + if (!partitionStateInfo.isr.contains(replica)) + r.resetLastCatchUpTime() + }) val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet // remove assigned replicas that have been removed by the controller (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica) @@ -273,8 +277,12 @@ class Partition(val topic: String, } /** - * Check and maybe expand the ISR of the partition. A replica is in ISR of the partition if and only if - * replica's LEO >= HW and replica's lag <= replicaLagTimeMaxMs. + * Check and maybe expand the ISR of the partition. + * A replica will be added to ISR if its LEO >= current hw of the partition. + * + * Technically, a replica shouldn't be in ISR if it hasn't caught up for longer than replicaLagTimeMaxMs, + * even if its log end offset is >= HW. However, to be consistent with how the follower determines + * whether a replica is in-sync, we only check HW. * * This function can be triggered when a replica's LEO has incremented */ @@ -284,12 +292,10 @@ class Partition(val topic: String, leaderReplicaIfLocal() match { case Some(leaderReplica) => val replica = getReplica(replicaId).get + val leaderHW = leaderReplica.highWatermark if(!inSyncReplicas.contains(replica) && assignedReplicas.map(_.brokerId).contains(replicaId) && - // This approximates the requirement logReadResult.fetchTimeMs - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs. - // We don't directly specify the above requirement in order to make maybeExpandIsr() consistent with ReplicaFetcherThread.shouldFollowerThrottle() - // A offset replica whose lag > ReplicaFetcherThread may still exceed hw because maybeShrinkIsr() is called periodically - logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.hw) { + replica.logEndOffset.offsetDiff(leaderHW) >= 0) { val newInSyncReplicas = inSyncReplicas + replica info("Expanding ISR for partition [%s,%d] from %s to %s" .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), @@ -365,9 +371,12 @@ class Partition(val topic: String, * 1. Partition ISR changed * 2. Any replica's LEO changed * - * We only increase HW if HW is smaller than LEO of all replicas whose lag <= replicaLagTimeMaxMs. - * This means that if a replica does not lag much behind leader and the replica's LEO is smaller than HW, HW will - * wait for this replica to catch up so that this replica can be added to ISR set. + * The HW is determined by the smallest log end offset among all replicas that are in sync or are considered caught-up. + * This way, if a replica is considered caught-up, but its log end offset is smaller than HW, we will wait for this + * replica to catch up to the HW before advancing the HW. This helps the situation when the ISR only includes the + * leader replica and a follower tries to catch up. If we don't wait for the follower when advancing the HW, the + * follower's log end offset may keep falling behind the HW (determined by the leader's log end offset) and therefore + * will never be added to ISR. * * Returns true if the HW was incremented, and false otherwise. * Note There is no need to acquire the leaderIsrUpdate lock here diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index f8236c0dce92..640c232b0442 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -38,11 +38,11 @@ class Replica(val brokerId: Int, // The log end offset value at the time the leader receives last FetchRequest from this follower. // This is used to determine the last catch-up time of the follower - @volatile var lastFetchLeaderLogEndOffset: Long = 0L + @volatile private var lastFetchLeaderLogEndOffset: Long = 0L // The time when the leader receives last FetchRequest from this follower // This is used to determine the last catch-up time of the follower - @volatile var lastFetchTimeMs: Long = 0L + @volatile private var lastFetchTimeMs: Long = 0L val topic = partition.topic val partitionId = partition.partitionId @@ -59,7 +59,7 @@ class Replica(val brokerId: Int, * If the FetchRequest reads up to the log end offset of the leader when the current fetch request was received, * set the lastCaughtUpTimeMsUnderlying to the time when the current fetch request was received. * - * Else if the FetchRequest reads up to the log end offset of the the leader when the previous fetch request was received, + * Else if the FetchRequest reads up to the log end offset of the leader when the previous fetch request was received, * set the lastCaughtUpTimeMsUnderlying to the time when the previous fetch request was received. * * This is needed to enforce the semantics of ISR, i.e. a replica is in ISR if and only if it lags behind leader's LEO diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index c471264e76c2..ff615c99aff2 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -476,8 +476,7 @@ object KafkaConfig { val ControllerMessageQueueSizeDoc = "The buffer size for controller-to-broker-channels" val DefaultReplicationFactorDoc = "default replication factors for automatically created topics" val ReplicaLagTimeMaxMsDoc = "If a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time," + - " the leader will remove the follower from isr. replica.lag.time.max.ms should be smaller than request.timeout.ms" + - " to reduce message duplication caused by unnecessary producer retry." + " the leader will remove the follower from isr." val ReplicaSocketTimeoutMsDoc = "The socket timeout for network requests. Its value should be at least replica.fetch.wait.max.ms" val ReplicaSocketReceiveBufferBytesDoc = "The socket receive buffer for network requests" val ReplicaFetchMaxBytesDoc = "The number of bytes of messages to attempt to fetch for each partition. This is not an absolute maximum, " + diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 3aded576474e..e4dc05aa1d6d 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -75,7 +75,11 @@ class IsrExpirationTest { // let the follower catch up to the Leader logEndOffset (15) for(replica <- partition0.assignedReplicas() - leaderReplica) - replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), 15L, 15L, time.milliseconds, -1)) + replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), + hw = 15L, + leaderLogEndOffset = 15L, + fetchTimeMs =time.milliseconds, + readSize = -1)) var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -123,7 +127,11 @@ class IsrExpirationTest { // Make the remote replica not read to the end of log. It should be not be out of sync for at least 100 ms for(replica <- partition0.assignedReplicas() - leaderReplica) - replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MemoryRecords.EMPTY), 10L, 15L, time.milliseconds, -1)) + replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MemoryRecords.EMPTY), + hw = 10L, + leaderLogEndOffset = 15L, + fetchTimeMs =time.milliseconds, + readSize = -1)) // Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of the log. // The replicas will no longer be in ISR. We do 2 fetches because we want to simulate the case where the replica is lagging but is not stuck @@ -133,7 +141,11 @@ class IsrExpirationTest { time.sleep(75) (partition0.assignedReplicas() - leaderReplica).foreach( - r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(11L), MemoryRecords.EMPTY), 11L, 15L, time.milliseconds, -1))) + r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(11L), MemoryRecords.EMPTY), + hw = 11L, + leaderLogEndOffset = 15L, + fetchTimeMs =time.milliseconds, + readSize = -1))) partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -145,7 +157,11 @@ class IsrExpirationTest { // Now actually make a fetch to the end of the log. The replicas should be back in ISR (partition0.assignedReplicas() - leaderReplica).foreach( - r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), 15L, 15L, time.milliseconds, -1))) + r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), + hw = 15L, + leaderLogEndOffset = 15L, + fetchTimeMs =time.milliseconds, + readSize = -1))) partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -162,10 +178,13 @@ class IsrExpirationTest { allReplicas.foreach(r => partition.addReplicaIfNotExists(r)) // set in sync replicas for this partition to all the assigned replicas partition.inSyncReplicas = allReplicas.toSet - // make the remote replica read to the end of log + // set lastCatchUpTime to current time for(replica <- partition.assignedReplicas() - leaderReplica) - replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MemoryRecords.EMPTY), 10L, 10L, time.milliseconds, -1)) - + replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(0L), MemoryRecords.EMPTY), + hw = 0L, + leaderLogEndOffset = 0L, + fetchTimeMs =time.milliseconds, + readSize = -1)) // set the leader and its hw and the hw update time partition.leaderReplicaIdOpt = Some(leaderId) partition diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 5a476cb32028..96a3a41d4cb3 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -111,7 +111,11 @@ class SimpleFetchTest { // create the follower replica with defined log end offset val followerReplica= new Replica(configs(1).brokerId, partition, time) val leo = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt) - followerReplica.updateLogReadResult(new LogReadResult(FetchDataInfo(leo, MemoryRecords.EMPTY), leo.messageOffset, leo.messageOffset, time.milliseconds, -1)) + followerReplica.updateLogReadResult(new LogReadResult(FetchDataInfo(leo, MemoryRecords.EMPTY), + hw = leo.messageOffset, + leaderLogEndOffset = leo.messageOffset, + fetchTimeMs = time.milliseconds, + readSize = -1)) // add both of them to ISR val allReplicas = List(leaderReplica, followerReplica) From fe73d278ff704e086aabaca18e1dd13f887ed626 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Tue, 20 Dec 2016 10:49:12 -0800 Subject: [PATCH 6/9] Update comment and variable names --- .../org/apache/kafka/clients/consumer/ConsumerConfig.java | 4 +--- .../org/apache/kafka/clients/producer/ProducerConfig.java | 4 +++- core/src/main/scala/kafka/cluster/Partition.scala | 4 ++-- core/src/main/scala/kafka/cluster/Replica.scala | 4 ++-- core/src/main/scala/kafka/server/ReplicaManager.scala | 7 ++++++- .../test/scala/unit/kafka/server/ISRExpirationTest.scala | 2 +- 6 files changed, 15 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index a8e3a78b7df4..13cc4c4adf8c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -198,9 +198,7 @@ public class ConsumerConfig extends AbstractConfig { /** request.timeout.ms */ public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; - private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC - + " request.timeout.ms should be larger than replica.lag.time.max.ms " - + "to reduce message duplication caused by unnecessary producer retry."; + private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC; /** interceptor.classes */ public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes"; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 4b0e999e018c..37aa030aedc4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -210,7 +210,9 @@ public class ProducerConfig extends AbstractConfig { /** request.timeout.ms */ public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; - private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC; + private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC + + " request.timeout.ms should be larger than replica.lag.time.max.ms " + + "to reduce message duplication caused by unnecessary producer retry."; /** interceptor.classes */ public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes"; diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index cb7119ba2612..f3be597e47bb 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -186,11 +186,11 @@ class Partition(val topic: String, // record the epoch of the controller that made the leadership decision. This is useful while updating the isr // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = partitionStateInfo.controllerEpoch - // add replicas that are new and reset lastCatchUpTime of non-isr replicas + // add replicas that are new and reset lastCaughtUpTime of non-isr replicas allReplicas.foreach(replica => { val r = getOrCreateReplica(replica) if (!partitionStateInfo.isr.contains(replica)) - r.resetLastCatchUpTime() + r.resetLastCaughtUpTime() }) val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet // remove assigned replicas that have been removed by the controller diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 640c232b0442..433f134ddb35 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -49,7 +49,7 @@ class Replica(val brokerId: Int, def isLocal: Boolean = log.isDefined - // lastCatchUpTimeMs is the largest time t such that the begin offset of most recent FetchRequest from this follower >= + // lastCaughtUpTimeMs is the largest time t such that the begin offset of most recent FetchRequest from this follower >= // the LEO of leader at time t. This is used to determine the lag of this follower and ISR of this partition. private[this] val lastCaughtUpTimeMsUnderlying = new AtomicLong(0L) @@ -77,7 +77,7 @@ class Replica(val brokerId: Int, lastFetchTimeMs = logReadResult.fetchTimeMs } - def resetLastCatchUpTime() { + def resetLastCaughtUpTime() { lastFetchLeaderLogEndOffset = 0L lastFetchTimeMs = 0L lastCaughtUpTimeMsUnderlying.set(0L) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6c2b814425dd..358ecbb10635 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -586,7 +586,12 @@ class ReplicaManager(val config: KafkaConfig, FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY) } - LogReadResult(logReadInfo, initialHighWatermark, initialLogEndOffset, fetchTimeMs, partitionFetchSize, None) + LogReadResult(info = logReadInfo, + hw = initialHighWatermark, + leaderLogEndOffset = initialLogEndOffset, + fetchTimeMs = fetchTimeMs, + readSize = partitionFetchSize, + error = None) } catch { // NOTE: Failed fetch requests metric is not incremented for known exceptions since it // is supposed to indicate un-expected failure of a broker in handling a fetch request diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index e4dc05aa1d6d..411370adff03 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -178,7 +178,7 @@ class IsrExpirationTest { allReplicas.foreach(r => partition.addReplicaIfNotExists(r)) // set in sync replicas for this partition to all the assigned replicas partition.inSyncReplicas = allReplicas.toSet - // set lastCatchUpTime to current time + // set lastCaughtUpTime to current time for(replica <- partition.assignedReplicas() - leaderReplica) replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(0L), MemoryRecords.EMPTY), hw = 0L, From 6025f9e75989be4f96c25b77374687c33260a452 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Tue, 20 Dec 2016 15:30:34 -0800 Subject: [PATCH 7/9] Address comment --- .../clients/producer/ProducerConfig.java | 4 +- .../main/scala/kafka/cluster/Partition.scala | 15 +++--- .../main/scala/kafka/cluster/Replica.scala | 8 +-- .../main/scala/kafka/server/KafkaConfig.scala | 2 +- .../scala/kafka/server/ReplicaManager.scala | 27 ++++++---- .../unit/kafka/server/ISRExpirationTest.scala | 50 +++++++++---------- .../unit/kafka/server/SimpleFetchTest.scala | 10 ++-- 7 files changed, 63 insertions(+), 53 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 37aa030aedc4..a1ca82c22064 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -211,8 +211,8 @@ public class ProducerConfig extends AbstractConfig { /** request.timeout.ms */ public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC - + " request.timeout.ms should be larger than replica.lag.time.max.ms " - + "to reduce message duplication caused by unnecessary producer retry."; + + " request.timeout.ms should be larger than value of broker side config replica.lag.time.max.ms" + + " to reduce message duplication caused by unnecessary producer retry."; /** interceptor.classes */ public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes"; diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index f3be597e47bb..8ce17889cd1d 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -186,12 +186,8 @@ class Partition(val topic: String, // record the epoch of the controller that made the leadership decision. This is useful while updating the isr // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = partitionStateInfo.controllerEpoch - // add replicas that are new and reset lastCaughtUpTime of non-isr replicas - allReplicas.foreach(replica => { - val r = getOrCreateReplica(replica) - if (!partitionStateInfo.isr.contains(replica)) - r.resetLastCaughtUpTime() - }) + // add replicas that are new + allReplicas.foreach(replica => getOrCreateReplica(replica)) val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet // remove assigned replicas that have been removed by the controller (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica) @@ -206,6 +202,13 @@ class Partition(val topic: String, true } val leaderReplica = getReplica().get + val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset + val curTimeMs = time.milliseconds + // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset. + (assignedReplicas() - leaderReplica).foreach(replica => { + val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L + replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs) + }) // we may need to increment high watermark since ISR could be down to 1 if (isNewLeader) { // construct the high watermark metadata for the new leader replica diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 433f134ddb35..2d43b646b512 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -77,10 +77,10 @@ class Replica(val brokerId: Int, lastFetchTimeMs = logReadResult.fetchTimeMs } - def resetLastCaughtUpTime() { - lastFetchLeaderLogEndOffset = 0L - lastFetchTimeMs = 0L - lastCaughtUpTimeMsUnderlying.set(0L) + def resetLastCaughtUpTime(curLeaderLogEndOffset: Long, curTimeMs: Long, lastCaughtUpTimeMs: Long) { + lastFetchLeaderLogEndOffset = curLeaderLogEndOffset + lastFetchTimeMs = curTimeMs + lastCaughtUpTimeMsUnderlying.set(lastCaughtUpTimeMs) } private def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index ff615c99aff2..40a5e1099d1b 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -476,7 +476,7 @@ object KafkaConfig { val ControllerMessageQueueSizeDoc = "The buffer size for controller-to-broker-channels" val DefaultReplicationFactorDoc = "default replication factors for automatically created topics" val ReplicaLagTimeMaxMsDoc = "If a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time," + - " the leader will remove the follower from isr." + " the leader will remove the follower from isr" val ReplicaSocketTimeoutMsDoc = "The socket timeout for network requests. Its value should be at least replica.fetch.wait.max.ms" val ReplicaSocketReceiveBufferBytesDoc = "The socket receive buffer for network requests" val ReplicaFetchMaxBytesDoc = "The number of bytes of messages to attempt to fetch for each partition. This is not an absolute maximum, " + diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 358ecbb10635..212bc06dd6af 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -83,12 +83,11 @@ case class LogReadResult(info: FetchDataInfo, case class FetchPartitionData(error: Short = Errors.NONE.code, hw: Long = -1L, records: Records) object LogReadResult { - val UnknownLogReadResult = LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, - MemoryRecords.EMPTY), - -1L, - -1L, - -1L, - -1) + val UnknownLogReadResult = LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), + hw = -1L, + leaderLogEndOffset = -1L, + fetchTimeMs = -1L, + readSize = -1) } case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[TopicPartition, Short], errorCode: Short) { @@ -599,14 +598,22 @@ class ReplicaManager(val config: KafkaConfig, _: NotLeaderForPartitionException | _: ReplicaNotAvailableException | _: OffsetOutOfRangeException) => - LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), -1L, - -1L, -1L, partitionFetchSize, Some(e)) + LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), + hw = -1L, + leaderLogEndOffset = -1L, + fetchTimeMs = -1L, + readSize = partitionFetchSize, + error = Some(e)) case e: Throwable => BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark() error(s"Error processing fetch operation on partition $tp, offset $offset", e) - LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), -1L, - -1L, -1L, partitionFetchSize, Some(e)) + LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), + hw = -1L, + leaderLogEndOffset = -1L, + fetchTimeMs = -1L, + readSize = partitionFetchSize, + error = Some(e)) } } diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 411370adff03..6156650b534e 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -75,11 +75,11 @@ class IsrExpirationTest { // let the follower catch up to the Leader logEndOffset (15) for(replica <- partition0.assignedReplicas() - leaderReplica) - replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), - hw = 15L, - leaderLogEndOffset = 15L, - fetchTimeMs =time.milliseconds, - readSize = -1)) + replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), + hw = 15L, + leaderLogEndOffset = 15L, + fetchTimeMs =time.milliseconds, + readSize = -1)) var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -127,11 +127,11 @@ class IsrExpirationTest { // Make the remote replica not read to the end of log. It should be not be out of sync for at least 100 ms for(replica <- partition0.assignedReplicas() - leaderReplica) - replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MemoryRecords.EMPTY), - hw = 10L, - leaderLogEndOffset = 15L, - fetchTimeMs =time.milliseconds, - readSize = -1)) + replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(10L), MemoryRecords.EMPTY), + hw = 10L, + leaderLogEndOffset = 15L, + fetchTimeMs =time.milliseconds, + readSize = -1)) // Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of the log. // The replicas will no longer be in ISR. We do 2 fetches because we want to simulate the case where the replica is lagging but is not stuck @@ -141,11 +141,11 @@ class IsrExpirationTest { time.sleep(75) (partition0.assignedReplicas() - leaderReplica).foreach( - r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(11L), MemoryRecords.EMPTY), - hw = 11L, - leaderLogEndOffset = 15L, - fetchTimeMs =time.milliseconds, - readSize = -1))) + r => r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(11L), MemoryRecords.EMPTY), + hw = 11L, + leaderLogEndOffset = 15L, + fetchTimeMs =time.milliseconds, + readSize = -1))) partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -157,11 +157,11 @@ class IsrExpirationTest { // Now actually make a fetch to the end of the log. The replicas should be back in ISR (partition0.assignedReplicas() - leaderReplica).foreach( - r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), - hw = 15L, - leaderLogEndOffset = 15L, - fetchTimeMs =time.milliseconds, - readSize = -1))) + r => r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), + hw = 15L, + leaderLogEndOffset = 15L, + fetchTimeMs =time.milliseconds, + readSize = -1))) partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -180,11 +180,11 @@ class IsrExpirationTest { partition.inSyncReplicas = allReplicas.toSet // set lastCaughtUpTime to current time for(replica <- partition.assignedReplicas() - leaderReplica) - replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(0L), MemoryRecords.EMPTY), - hw = 0L, - leaderLogEndOffset = 0L, - fetchTimeMs =time.milliseconds, - readSize = -1)) + replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(0L), MemoryRecords.EMPTY), + hw = 0L, + leaderLogEndOffset = 0L, + fetchTimeMs = time.milliseconds, + readSize = -1)) // set the leader and its hw and the hw update time partition.leaderReplicaIdOpt = Some(leaderId) partition diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 96a3a41d4cb3..9fda38d51bf6 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -111,11 +111,11 @@ class SimpleFetchTest { // create the follower replica with defined log end offset val followerReplica= new Replica(configs(1).brokerId, partition, time) val leo = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt) - followerReplica.updateLogReadResult(new LogReadResult(FetchDataInfo(leo, MemoryRecords.EMPTY), - hw = leo.messageOffset, - leaderLogEndOffset = leo.messageOffset, - fetchTimeMs = time.milliseconds, - readSize = -1)) + followerReplica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(leo, MemoryRecords.EMPTY), + hw = leo.messageOffset, + leaderLogEndOffset = leo.messageOffset, + fetchTimeMs = time.milliseconds, + readSize = -1)) // add both of them to ISR val allReplicas = List(leaderReplica, followerReplica) From 08d6a3ae8cc9b975e49b749f66db9b9d86df3009 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Tue, 20 Dec 2016 16:14:58 -0800 Subject: [PATCH 8/9] Update code style --- core/src/main/scala/kafka/cluster/Partition.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 8ce17889cd1d..1f739a91b2b7 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -205,10 +205,10 @@ class Partition(val topic: String, val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset val curTimeMs = time.milliseconds // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset. - (assignedReplicas() - leaderReplica).foreach(replica => { + (assignedReplicas() - leaderReplica).foreach{replica => val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs) - }) + } // we may need to increment high watermark since ISR could be down to 1 if (isNewLeader) { // construct the high watermark metadata for the new leader replica @@ -386,9 +386,9 @@ class Partition(val topic: String, * since all callers of this private API acquire that lock */ private def maybeIncrementLeaderHW(leaderReplica: Replica, curTime: Long = time.milliseconds): Boolean = { - val allLogEndOffsets = assignedReplicas.filter(replica => { + val allLogEndOffsets = assignedReplicas.filter{replica => curTime - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs || inSyncReplicas.contains(replica) - }).map(_.logEndOffset) + }.map(_.logEndOffset) val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering) val oldHighWatermark = leaderReplica.highWatermark if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) { From 0198fb4c66ce7555566e3c3ee593b2314e37b677 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Tue, 20 Dec 2016 18:51:18 -0800 Subject: [PATCH 9/9] Update comment --- .../org/apache/kafka/clients/producer/ProducerConfig.java | 2 +- core/src/main/scala/kafka/cluster/Replica.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index a1ca82c22064..39446f592a68 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -211,7 +211,7 @@ public class ProducerConfig extends AbstractConfig { /** request.timeout.ms */ public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC - + " request.timeout.ms should be larger than value of broker side config replica.lag.time.max.ms" + + " request.timeout.ms should be larger than replica.lag.time.max.ms, a broker side configuration," + " to reduce message duplication caused by unnecessary producer retry."; /** interceptor.classes */ diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 2d43b646b512..4d9081532e1b 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -37,11 +37,11 @@ class Replica(val brokerId: Int, @volatile private[this] var logEndOffsetMetadata: LogOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata // The log end offset value at the time the leader receives last FetchRequest from this follower. - // This is used to determine the last catch-up time of the follower + // This is used to determine the lastCaughtUpTimeMs of the follower @volatile private var lastFetchLeaderLogEndOffset: Long = 0L // The time when the leader receives last FetchRequest from this follower - // This is used to determine the last catch-up time of the follower + // This is used to determine the lastCaughtUpTimeMs of the follower @volatile private var lastFetchTimeMs: Long = 0L val topic = partition.topic