Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7152: Avoid moving a replica out of isr if its LEO equals leader's LEO #5412

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,8 @@ class Partition(val topic: String,

def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] = {
/**
* there are two cases that will be handled here -
* If the follower already has the same leo as the leader, it will not be considered as out-of-sync,
* otherwise there are two cases that will be handled here -
* 1. Stuck followers: If the leo of the replica hasn't been updated for maxLagMs ms,
* the follower is stuck and should be removed from the ISR
* 2. Slow followers: If the replica has not read up to the leo within the last maxLagMs ms,
Expand All @@ -565,7 +566,8 @@ class Partition(val topic: String,
**/
val candidateReplicas = inSyncReplicas - leaderReplica

val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
val laggingReplicas = candidateReplicas.filter(r =>
r.logEndOffset.messageOffset != leaderReplica.logEndOffset.messageOffset && (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
if (laggingReplicas.nonEmpty)
debug("Lagging replicas are %s".format(laggingReplicas.map(_.brokerId).mkString(",")))

Expand Down
62 changes: 49 additions & 13 deletions core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class IsrExpirationTest {
var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
val replicaLagTimeMaxMs = 100L
val replicaFetchWaitMaxMs = 100
val leaderLogEndOffset = 20

val overridingProps = new Properties()
overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
Expand Down Expand Up @@ -81,12 +82,12 @@ class IsrExpirationTest {
assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
val leaderReplica = partition0.getReplica(configs.head.brokerId).get

// let the follower catch up to the Leader logEndOffset (15)
// let the follower catch up to the Leader logEndOffset - 1
for (replica <- partition0.assignedReplicas - leaderReplica)
replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY),
highWatermark = 15L,
replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(leaderLogEndOffset - 1), MemoryRecords.EMPTY),
highWatermark = leaderLogEndOffset - 1,
leaderLogStartOffset = 0L,
leaderLogEndOffset = 15L,
leaderLogEndOffset = leaderLogEndOffset,
followerLogStartOffset = 0L,
fetchTimeMs = time.milliseconds,
readSize = -1,
Expand Down Expand Up @@ -138,10 +139,10 @@ class IsrExpirationTest {

// Make the remote replica not read to the end of log. It should be not be out of sync for at least 100 ms
for (replica <- partition0.assignedReplicas - leaderReplica)
replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(10L), MemoryRecords.EMPTY),
highWatermark = 10L,
replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(leaderLogEndOffset - 2), MemoryRecords.EMPTY),
highWatermark = leaderLogEndOffset - 2,
leaderLogStartOffset = 0L,
leaderLogEndOffset = 15L,
leaderLogEndOffset = leaderLogEndOffset,
followerLogStartOffset = 0L,
fetchTimeMs = time.milliseconds,
readSize = -1,
Expand All @@ -155,10 +156,10 @@ class IsrExpirationTest {
time.sleep(75)

(partition0.assignedReplicas - leaderReplica).foreach { r =>
r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(11L), MemoryRecords.EMPTY),
highWatermark = 11L,
r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(leaderLogEndOffset - 1), MemoryRecords.EMPTY),
highWatermark = leaderLogEndOffset - 1,
leaderLogStartOffset = 0L,
leaderLogEndOffset = 15L,
leaderLogEndOffset = leaderLogEndOffset,
followerLogStartOffset = 0L,
fetchTimeMs = time.milliseconds,
readSize = -1,
Expand All @@ -175,10 +176,10 @@ class IsrExpirationTest {

// Now actually make a fetch to the end of the log. The replicas should be back in ISR
(partition0.assignedReplicas - leaderReplica).foreach { r =>
r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY),
highWatermark = 15L,
r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(leaderLogEndOffset), MemoryRecords.EMPTY),
highWatermark = leaderLogEndOffset,
leaderLogStartOffset = 0L,
leaderLogEndOffset = 15L,
leaderLogEndOffset = leaderLogEndOffset,
followerLogStartOffset = 0L,
fetchTimeMs = time.milliseconds,
readSize = -1,
Expand All @@ -190,6 +191,40 @@ class IsrExpirationTest {
EasyMock.verify(log)
}

/*
* Test the case where a follower has already caught up with same log end offset with the leader. This follower should not be considered as out-of-sync
*/
@Test
def testIsrExpirationForCaughtUpFollowers() {
val log = logMock

// create one partition and all replicas
val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
val leaderReplica = partition0.getReplica(configs.head.brokerId).get

// let the follower catch up to the Leader logEndOffset
for (replica <- partition0.assignedReplicas - leaderReplica)
replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(leaderLogEndOffset), MemoryRecords.EMPTY),
highWatermark = leaderLogEndOffset,
leaderLogStartOffset = 0L,
leaderLogEndOffset = leaderLogEndOffset,
followerLogStartOffset = 0L,
fetchTimeMs = time.milliseconds,
readSize = -1,
lastStableOffset = None))
var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))

// let some time pass
time.sleep(150)

// even though follower hasn't pulled any data for > replicaMaxLagTimeMs ms, the follower has already caught up. So it is not out-of-sync.
partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
EasyMock.verify(log)
}

private def getPartitionWithAllReplicasInIsr(topic: String, partitionId: Int, time: Time, config: KafkaConfig,
localLog: Log): Partition = {
val leaderId = config.brokerId
Expand Down Expand Up @@ -222,6 +257,7 @@ class IsrExpirationTest {
EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
EasyMock.expect(log.leaderEpochCache).andReturn(cache).anyTimes()
EasyMock.expect(log.onHighWatermarkIncremented(0L))
EasyMock.expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(leaderLogEndOffset)).anyTimes()
EasyMock.replay(log)
log
}
Expand Down