Skip to content

Commit

Permalink
feat: make sure all logs are closed when shutting down (#135)
Browse files Browse the repository at this point in the history
Signed-off-by: Curtis Wan <wcy9988@163.com>
  • Loading branch information
mooc9988 authored Sep 14, 2023
1 parent b69aac1 commit 16326d0
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 13 deletions.
16 changes: 6 additions & 10 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -573,17 +573,12 @@ class Partition(val topicPartition: TopicPartition,
* 1) This broker(current leader) has closed the partition and the related streams.
* 2) The next leader will then open the partition and the related streams.
*/
def close(): Unit = {
val closeStartTime = System.currentTimeMillis()
def close(): CompletableFuture[Void] = {
logManager.removeFromCurrentLogs(topicPartition)
ElasticLogManager.removeLog(topicPartition)
inWriteLock(leaderIsrUpdateLock) {
closed = true
}
log.foreach( unifiedLog => {
val future = unifiedLog.close()
CoreUtils.swallow(future.get(), this)
})
// need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions due to log being deleted
inWriteLock(leaderIsrUpdateLock) {
remoteReplicasMap.clear()
Expand All @@ -595,10 +590,11 @@ class Partition(val topicPartition: TopicPartition,
leaderEpochStartOffsetOpt = None
Partition.removeMetrics(topicPartition)
}
val closeTimeCost = System.currentTimeMillis() - closeStartTime
info(s"closed with time cost $closeTimeCost ms, trigger leader re-election")
// trigger leader re-election
alterIsrManager.tryElectLeader(topicPartition)
if (log.isDefined) {
log.get.close()
} else {
CompletableFuture.completedFuture(null)
}
}
// elastic stream inject end

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ class BrokerServer(
// Note that logs are closed in logManager.shutdown().
// Make sure these thread pools are shutdown after the log manager's shutdown.
CoreUtils.swallow(replicaManager.shutdownAdditionalThreadPools(), this)
ElasticLogManager.shutdownNow()
CoreUtils.swallow(ElasticLogManager.shutdownNow(), this)
// elastic stream inject end

if (quotaManagers != null)
Expand Down
29 changes: 27 additions & 2 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ import java.util
import java.util.Optional
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.Lock
import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.{CompletableFuture, Executors, ThreadPoolExecutor, TimeUnit}
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, Set, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -225,6 +226,7 @@ class ReplicaManager(val config: KafkaConfig,
// This threadPool is used to handle partition open/close in case of throttling metadata replay.
val partitionOpenCloseExecutors = new ThreadPoolExecutor(4, 32, 30, TimeUnit.SECONDS, new util.concurrent.LinkedBlockingQueue[Runnable](32),
ThreadUtils.createThreadFactory("partition-open-close-executor-%d", true))
var partitionWaitingClosingFuture: CompletableFuture[Void] = CompletableFuture.completedFuture(null);

/* epoch of the controller that last changed the leader */
@volatile private[server] var controllerEpoch: Int = KafkaController.InitialControllerEpoch
Expand Down Expand Up @@ -466,6 +468,9 @@ class ReplicaManager(val config: KafkaConfig,
// Second remove deleted partitions from the partition map. Fetchers rely on the
// ReplicaManager to get Partition's information so they must be stopped first.
val partitionsToDelete = mutable.Set.empty[TopicPartition]
// Kafka on S3 inject start
val partitionClosingFutures = ArrayBuffer.empty[CompletableFuture[Void]]
// Kafka on S3 inject end
partitionsToStop.forKeyValue { (topicPartition, shouldDelete) =>
if (shouldDelete) {
getPartition(topicPartition) match {
Expand All @@ -480,7 +485,13 @@ class ReplicaManager(val config: KafkaConfig,
// For elastic stream, partition leader alter is triggered by setting isr/replicas.
// When broker is not response for the partition, we need to close the partition
// instead of delete the partition.
hostedPartition.partition.close()
val closingFuture = hostedPartition.partition.close()
.thenRun(() => {
// trigger leader election for this partition
info(s"partition $topicPartition is closed, trigger leader election")
alterPartitionManager.tryElectLeader(topicPartition)
})
partitionClosingFutures.append(closingFuture)
} else {
hostedPartition.partition.delete()
}
Expand All @@ -496,6 +507,19 @@ class ReplicaManager(val config: KafkaConfig,
completeDelayedFetchOrProduceRequests(topicPartition)
}

// Kafka on S3 inject start
if (partitionClosingFutures.nonEmpty) {
partitionWaitingClosingFuture = partitionWaitingClosingFuture.thenRun(() => {
val closeStartTime = System.currentTimeMillis()
// Wait for all logs to be closed.
CoreUtils.swallow(CompletableFuture.allOf(partitionClosingFutures.toArray: _*).get(), this)
val closeTimeCost = System.currentTimeMillis() - closeStartTime
debug(s"closing ${partitionClosingFutures.size} partitions with time cost $closeTimeCost ms")
})
}
// Kafka on S3 inject end


// Third delete the logs and checkpoint.
val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
if (partitionsToDelete.nonEmpty) {
Expand Down Expand Up @@ -2396,6 +2420,7 @@ class ReplicaManager(val config: KafkaConfig,

def shutdownAdditionalThreadPools(): Unit = {
slowFetchExecutors.shutdownNow()
CoreUtils.swallow(partitionWaitingClosingFuture.get(), this)
partitionOpenCloseExecutors.shutdownNow()
}
}

0 comments on commit 16326d0

Please sign in to comment.