Skip to content

Commit

Permalink
KAFKA-9750; Fix race condition with log dir reassign completion (#8412)
Browse files Browse the repository at this point in the history
There is a race on receiving a LeaderAndIsr request for a replica with an active log dir reassignment. If the reassignment completes just before the LeaderAndIsr handler updates epoch information, it can lead to an illegal state error since no future log dir exists. This patch fixes the problem by ensuring that the future log dir exists when the fetcher is started. Removal cannot happen concurrently because it requires access the same partition state lock.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Arthur <mumrah@gmail.com>

Co-authored-by: Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
hachikuji and chia7712 committed Apr 3, 2020
1 parent 9a154c6 commit 62dcfa1
Show file tree
Hide file tree
Showing 8 changed files with 311 additions and 42 deletions.
14 changes: 9 additions & 5 deletions core/src/main/scala/kafka/server/AbstractFetcherManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
BrokerAndFetcherId(brokerAndInitialFetchOffset.leader, getFetcherId(topicPartition))
}

def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId, brokerIdAndFetcherId: BrokerIdAndFetcherId): AbstractFetcherThread = {
def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId,
brokerIdAndFetcherId: BrokerIdAndFetcherId): T = {
val fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
fetcherThreadMap.put(brokerIdAndFetcherId, fetcherThread)
fetcherThread.start()
Expand All @@ -144,14 +145,17 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
tp -> OffsetAndEpoch(brokerAndInitOffset.initOffset, brokerAndInitOffset.currentLeaderEpoch)
}

fetcherThread.addPartitions(initialOffsetAndEpochs)
info(s"Added fetcher to broker ${brokerAndFetcherId.broker} for partitions $initialOffsetAndEpochs")

failedPartitions.removeAll(partitionAndOffsets.keySet)
addPartitionsToFetcherThread(fetcherThread, initialOffsetAndEpochs)
}
}
}

protected def addPartitionsToFetcherThread(fetcherThread: T,
initialOffsetAndEpochs: collection.Map[TopicPartition, OffsetAndEpoch]): Unit = {
fetcherThread.addPartitions(initialOffsetAndEpochs)
info(s"Added fetcher to broker ${fetcherThread.sourceBroker.id} for partitions $initialOffsetAndEpochs")
}

def removeFetcherForPartitions(partitions: Set[TopicPartition]): Unit = {
lock synchronized {
for (fetcher <- fetcherThreadMap.values)
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ abstract class AbstractFetcherThread(name: String,
type EpochData = OffsetsForLeaderEpochRequest.PartitionData

private val partitionStates = new PartitionStates[PartitionFetchState]
private val partitionMapLock = new ReentrantLock
protected val partitionMapLock = new ReentrantLock
private val partitionMapCond = partitionMapLock.newCondition()

private val metricId = ClientIdAndBroker(clientId, sourceBroker.host, sourceBroker.port)
Expand Down Expand Up @@ -199,7 +199,7 @@ abstract class AbstractFetcherThread(name: String,
* - Send OffsetsForLeaderEpochRequest, retrieving the latest offset for each partition's
* leader epoch. This is the offset the follower should truncate to ensure
* accurate log replication.
* - Finally truncate the logs for partitions in the truncating phase and mark them
* - Finally truncate the logs for partitions in the truncating phase and mark the
* truncation complete. Do this within a lock to ensure no leadership changes can
* occur during truncation.
*/
Expand Down Expand Up @@ -419,10 +419,11 @@ abstract class AbstractFetcherThread(name: String,
warn(s"Partition $topicPartition marked as failed")
}


def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Unit = {
def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition] = {
partitionMapLock.lockInterruptibly()
try {
failedPartitions.removeAll(initialFetchStates.keySet)

initialFetchStates.foreach { case (tp, initialFetchState) =>
// We can skip the truncation step iff the leader epoch matches the existing epoch
val currentState = partitionStates.stateValue(tp)
Expand All @@ -437,6 +438,7 @@ abstract class AbstractFetcherThread(name: String,
}

partitionMapCond.signalAll()
initialFetchStates.keySet
} finally partitionMapLock.unlock()
}

Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kafka.server

import kafka.cluster.BrokerEndPoint
import org.apache.kafka.common.TopicPartition

class ReplicaAlterLogDirsManager(brokerConfig: KafkaConfig,
replicaManager: ReplicaManager,
Expand All @@ -34,6 +35,21 @@ class ReplicaAlterLogDirsManager(brokerConfig: KafkaConfig,
quotaManager, brokerTopicStats)
}

override protected def addPartitionsToFetcherThread(fetcherThread: ReplicaAlterLogDirsThread,
initialOffsetAndEpochs: collection.Map[TopicPartition, OffsetAndEpoch]): Unit = {
val addedPartitions = fetcherThread.addPartitions(initialOffsetAndEpochs)
val (addedInitialOffsets, notAddedInitialOffsets) = initialOffsetAndEpochs.partition { case (tp, _) =>
addedPartitions.contains(tp)
}

if (addedInitialOffsets.nonEmpty)
info(s"Added log dir fetcher for partitions with initial offsets $addedInitialOffsets")

if (notAddedInitialOffsets.nonEmpty)
info(s"Failed to add log dir fetch for partitions ${notAddedInitialOffsets.keySet} " +
s"since the log dir reassignment has already completed")
}

def shutdown(): Unit = {
info("shutting down")
closeAllFetchers()
Expand Down
22 changes: 20 additions & 2 deletions core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class ReplicaAlterLogDirsThread(name: String,
Request.FutureLocalReplicaId,
request.minBytes,
request.maxBytes,
request.version <= 2,
false,
request.fetchData.asScala.toSeq,
UnboundedQuota,
processResponseCallback,
Expand All @@ -116,7 +116,11 @@ class ReplicaAlterLogDirsThread(name: String,
throw new IllegalStateException("Offset mismatch for the future replica %s: fetched offset = %d, log end offset = %d.".format(
topicPartition, fetchOffset, futureLog.logEndOffset))

val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true)
val logAppendInfo = if (records.sizeInBytes() > 0)
partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true)
else
None

futureLog.updateHighWatermark(partitionData.highWatermark)
futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset)

Expand All @@ -127,6 +131,20 @@ class ReplicaAlterLogDirsThread(name: String,
logAppendInfo
}

override def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition] = {
partitionMapLock.lockInterruptibly()
try {
// It is possible that the log dir fetcher completed just before this call, so we
// filter only the partitions which still have a future log dir.
val filteredFetchStates = initialFetchStates.filter { case (tp, _) =>
replicaMgr.futureLogExists(tp)
}
super.addPartitions(filteredFetchStates)
} finally {
partitionMapLock.unlock()
}
}

override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = {
val partition = replicaMgr.getPartitionOrException(topicPartition, expectLeader = false)
partition.localLogOrException.logStartOffset
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,10 @@ class ReplicaManager(val config: KafkaConfig,
getPartitionOrException(topicPartition, expectLeader = false).futureLocalLogOrException
}

def futureLogExists(topicPartition: TopicPartition): Boolean = {
getPartitionOrException(topicPartition, expectLeader = false).futureLog.isDefined
}

def localLog(topicPartition: TopicPartition): Option[Log] = {
nonOfflinePartition(topicPartition).flatMap(_.log)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class AbstractFetcherManagerTest {

EasyMock.expect(fetcher.start())
EasyMock.expect(fetcher.addPartitions(Map(tp -> OffsetAndEpoch(fetchOffset, leaderEpoch))))
.andReturn(Set(tp))
EasyMock.expect(fetcher.fetchState(tp))
.andReturn(Some(PartitionFetchState(fetchOffset, None, leaderEpoch, Truncating)))
EasyMock.expect(fetcher.removePartitions(Set(tp)))
Expand Down Expand Up @@ -116,6 +117,7 @@ class AbstractFetcherManagerTest {

EasyMock.expect(fetcher.start())
EasyMock.expect(fetcher.addPartitions(Map(tp -> OffsetAndEpoch(fetchOffset, leaderEpoch))))
.andReturn(Set(tp))
EasyMock.expect(fetcher.isThreadFailed).andReturn(true)
EasyMock.replay(fetcher)

Expand Down
Loading

0 comments on commit 62dcfa1

Please sign in to comment.