Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make sure all logs are closed when shutting down #135

Merged
merged 1 commit into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -573,17 +573,12 @@ class Partition(val topicPartition: TopicPartition,
* 1) This broker(current leader) has closed the partition and the related streams.
* 2) The next leader will then open the partition and the related streams.
*/
def close(): Unit = {
val closeStartTime = System.currentTimeMillis()
def close(): CompletableFuture[Void] = {
logManager.removeFromCurrentLogs(topicPartition)
ElasticLogManager.removeLog(topicPartition)
inWriteLock(leaderIsrUpdateLock) {
closed = true
}
log.foreach( unifiedLog => {
val future = unifiedLog.close()
CoreUtils.swallow(future.get(), this)
})
// need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions due to log being deleted
inWriteLock(leaderIsrUpdateLock) {
remoteReplicasMap.clear()
Expand All @@ -595,10 +590,11 @@ class Partition(val topicPartition: TopicPartition,
leaderEpochStartOffsetOpt = None
Partition.removeMetrics(topicPartition)
}
val closeTimeCost = System.currentTimeMillis() - closeStartTime
info(s"closed with time cost $closeTimeCost ms, trigger leader re-election")
// trigger leader re-election
alterIsrManager.tryElectLeader(topicPartition)
if (log.isDefined) {
log.get.close()
} else {
CompletableFuture.completedFuture(null)
}
}
// elastic stream inject end

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ class BrokerServer(
// Note that logs are closed in logManager.shutdown().
// Make sure these thread pools are shutdown after the log manager's shutdown.
CoreUtils.swallow(replicaManager.shutdownAdditionalThreadPools(), this)
ElasticLogManager.shutdownNow()
CoreUtils.swallow(ElasticLogManager.shutdownNow(), this)
// elastic stream inject end

if (quotaManagers != null)
Expand Down
29 changes: 27 additions & 2 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ import java.util
import java.util.Optional
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.Lock
import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.{CompletableFuture, Executors, ThreadPoolExecutor, TimeUnit}
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, Set, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -225,6 +226,7 @@ class ReplicaManager(val config: KafkaConfig,
// This threadPool is used to handle partition open/close in case of throttling metadata replay.
val partitionOpenCloseExecutors = new ThreadPoolExecutor(4, 32, 30, TimeUnit.SECONDS, new util.concurrent.LinkedBlockingQueue[Runnable](32),
ThreadUtils.createThreadFactory("partition-open-close-executor-%d", true))
var partitionWaitingClosingFuture: CompletableFuture[Void] = CompletableFuture.completedFuture(null);

/* epoch of the controller that last changed the leader */
@volatile private[server] var controllerEpoch: Int = KafkaController.InitialControllerEpoch
Expand Down Expand Up @@ -466,6 +468,9 @@ class ReplicaManager(val config: KafkaConfig,
// Second remove deleted partitions from the partition map. Fetchers rely on the
// ReplicaManager to get Partition's information so they must be stopped first.
val partitionsToDelete = mutable.Set.empty[TopicPartition]
// Kafka on S3 inject start
val partitionClosingFutures = ArrayBuffer.empty[CompletableFuture[Void]]
// Kafka on S3 inject end
partitionsToStop.forKeyValue { (topicPartition, shouldDelete) =>
if (shouldDelete) {
getPartition(topicPartition) match {
Expand All @@ -480,7 +485,13 @@ class ReplicaManager(val config: KafkaConfig,
// For elastic stream, partition leader alter is triggered by setting isr/replicas.
// When broker is not response for the partition, we need to close the partition
// instead of delete the partition.
hostedPartition.partition.close()
val closingFuture = hostedPartition.partition.close()
.thenRun(() => {
// trigger leader election for this partition
info(s"partition $topicPartition is closed, trigger leader election")
alterPartitionManager.tryElectLeader(topicPartition)
})
partitionClosingFutures.append(closingFuture)
} else {
hostedPartition.partition.delete()
}
Expand All @@ -496,6 +507,19 @@ class ReplicaManager(val config: KafkaConfig,
completeDelayedFetchOrProduceRequests(topicPartition)
}

// Kafka on S3 inject start
if (partitionClosingFutures.nonEmpty) {
partitionWaitingClosingFuture = partitionWaitingClosingFuture.thenRun(() => {
val closeStartTime = System.currentTimeMillis()
// Wait for all logs to be closed.
CoreUtils.swallow(CompletableFuture.allOf(partitionClosingFutures.toArray: _*).get(), this)
val closeTimeCost = System.currentTimeMillis() - closeStartTime
debug(s"closing ${partitionClosingFutures.size} partitions with time cost $closeTimeCost ms")
})
}
// Kafka on S3 inject end


// Third delete the logs and checkpoint.
val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
if (partitionsToDelete.nonEmpty) {
Expand Down Expand Up @@ -2396,6 +2420,7 @@ class ReplicaManager(val config: KafkaConfig,

def shutdownAdditionalThreadPools(): Unit = {
slowFetchExecutors.shutdownNow()
CoreUtils.swallow(partitionWaitingClosingFuture.get(), this)
partitionOpenCloseExecutors.shutdownNow()
}
}