diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 3825672230..636cafad19 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -17,9 +17,7 @@ package kafka.server -import com.automq.stream.api.exceptions.FastReadFailFastException import com.automq.stream.s3.metrics.TimerUtil -import com.automq.stream.utils.FutureUtil import com.yammer.metrics.core.Histogram import kafka.admin.AdminUtils import kafka.api.ElectLeadersRequestOps @@ -28,7 +26,7 @@ import kafka.controller.ReplicaAssignment import kafka.coordinator.group._ import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator} import kafka.log.AppendOrigin -import kafka.log.streamaspect.{ElasticLogManager, ReadHint} +import kafka.log.streamaspect.ElasticLogManager import kafka.message.ZStdCompressionCodec import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsUtil} import kafka.network.RequestChannel @@ -127,8 +125,6 @@ class KafkaApis(val requestChannel: RequestChannel, val aclApis = new AclApis(authHelper, authorizer, requestHelper, "broker", config) val configManager = new ConfigAdminManager(brokerId, config, configRepository) - val fastFetchExecutor = Executors.newFixedThreadPool(4, ThreadUtils.createThreadFactory("kafka-apis-fast-fetch-executor-%d", true)) - val slowFetchExecutor = Executors.newFixedThreadPool(12, ThreadUtils.createThreadFactory("kafka-apis-slow-fetch-executor-%d", true)) val asyncHandleExecutor = Executors.newSingleThreadExecutor(ThreadUtils.createThreadFactory("kafka-apis-async-handle-executor-%d", true)) def close(): Unit = { @@ -1081,59 +1077,12 @@ class KafkaApis(val requestChannel: RequestChannel, clientMetadata = clientMetadata ) - // AutoMQ for Kafka inject start - def doFetchingRecords(): Unit = { - // call the replica manager to fetch messages from the local replica - replicaManager.fetchMessages( - params = params, - fetchInfos = interesting, - quota = replicationQuota(fetchRequest), - responseCallback = processResponseCallback, - ) - } - - def handleError(e: Throwable): Unit = { - error(s"Unexpected error handling request ${request.requestDesc(true)} " + - s"with context ${request.context}", e) - requestHelper.handleError(request, e) - } - - if (ElasticLogManager.enabled()) { - // The fetching is done is a separate thread pool to avoid blocking io thread. - fastFetchExecutor.submit(new Runnable { - override def run(): Unit = { - try { - ReadHint.markReadAll() - ReadHint.markFastRead(); - doFetchingRecords() - ReadHint.clear() - } catch { - case e: Throwable => - val ex = FutureUtil.cause(e) - val fastReadFailFast = ex.isInstanceOf[FastReadFailFastException] - if (fastReadFailFast) { - slowFetchExecutor.submit(new Runnable { - override def run(): Unit = { - try { - ReadHint.markReadAll() - doFetchingRecords() - } catch { - case slowEx: Throwable => - handleError(slowEx) - } - } - }) - } else { - handleError(e) - } - } - } - }) - } else { - doFetchingRecords() - } - - // AutoMQ for Kafka inject end + replicaManager.fetchMessages( + params = params, + fetchInfos = interesting, + quota = replicationQuota(fetchRequest), + responseCallback = processResponseCallback, + ) } } diff --git a/core/src/main/scala/kafka/server/MemoryLimiter.java b/core/src/main/scala/kafka/server/MemoryLimiter.java new file mode 100644 index 0000000000..41ab954683 --- /dev/null +++ b/core/src/main/scala/kafka/server/MemoryLimiter.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import java.util.concurrent.Semaphore; + +public class MemoryLimiter { + private final int maxPermits; + private final Semaphore permits; + + public MemoryLimiter(int size) { + maxPermits = size; + permits = new Semaphore(size); + } + + /** + * Acquire permits, if not enough, block until enough. + * Note: the acquire is fair, the acquired will be permitted in the acquire order. + */ + public synchronized Handler acquire(int permit) throws InterruptedException { + if (permit > maxPermits) { + permit = maxPermits; + } + boolean acquireRst = permits.tryAcquire(permit); + if (!acquireRst) { + permits.acquire(permit); + } + return new Handler(permit); + } + + public class Handler implements AutoCloseable { + private final int permit; + + public Handler(int permit) { + this.permit = permit; + } + + @Override + public void close() { + permits.release(permit); + } + } +} diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index a89f9c923c..32935085d1 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -24,7 +24,7 @@ import kafka.cluster.{BrokerEndPoint, Partition} import kafka.common.RecordValidationException import kafka.controller.{KafkaController, StateChangeLogger} import kafka.log._ -import kafka.log.streamaspect.ElasticLogManager +import kafka.log.streamaspect.{ElasticLogManager, ReadHint} import kafka.metrics.KafkaMetricsGroup import kafka.server.HostedPartition.Online import kafka.server.QuotaFactory.QuotaManagers @@ -247,6 +247,11 @@ class ReplicaManager(val config: KafkaConfig, private var logDirFailureHandler: LogDirFailureHandler = _ + val fastFetchExecutor = Executors.newFixedThreadPool(4, ThreadUtils.createThreadFactory("kafka-apis-fast-fetch-executor-%d", true)) + val slowFetchExecutor = Executors.newFixedThreadPool(12, ThreadUtils.createThreadFactory("kafka-apis-slow-fetch-executor-%d", true)) + val fastFetchLimiter = new MemoryLimiter(100 * 1024 * 1024) + val slowFetchLimiter = new MemoryLimiter(100 * 1024 * 1024) + private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) { override def doWork(): Unit = { val newOfflineLogDir = logDirFailureChannel.takeNextOfflineLogDir() @@ -1065,6 +1070,83 @@ class ReplicaManager(val config: KafkaConfig, quota: ReplicaQuota, responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit ): Unit = { + + def handleError(e: Throwable): Unit = { + error(s"Unexpected error handling request ${params} ${fetchInfos} ", e) + // convert fetchInfos to error Seq[(TopicPartition, FetchPartitionData)] for callback + val fetchPartitionData = fetchInfos.map { case (tp, _) => + tp -> FetchPartitionData( + error = Errors.forException(e), + highWatermark = -1L, + lastStableOffset = None, + logStartOffset = -1L, + abortedTransactions = None, + preferredReadReplica = None, + records = MemoryRecords.EMPTY, + isReassignmentFetch = false, + divergingEpoch = None) + } + responseCallback(fetchPartitionData) + } + + // sum the sizes of topics to fetch from fetchInfos + var bytesNeed = fetchInfos.foldLeft(0) { case (sum, (_, partitionData)) => sum + partitionData.maxBytes } + bytesNeed = math.min(bytesNeed, params.maxBytes) + + // The fetching is done is a separate thread pool to avoid blocking io thread. + fastFetchExecutor.submit(new Runnable { + override def run(): Unit = { + val fastFetchLimiterHandler = fastFetchLimiter.acquire(bytesNeed) + try { + ReadHint.markReadAll() + ReadHint.markFastRead() + fetchMessages0(params, fetchInfos, quota, response => { + try { + responseCallback.apply(response) + } finally { + fastFetchLimiterHandler.close() + } + }) + ReadHint.clear() + } catch { + case e: Throwable => + fastFetchLimiterHandler.close() + val ex = FutureUtil.cause(e) + val fastReadFailFast = ex.isInstanceOf[FastReadFailFastException] + if (fastReadFailFast) { + slowFetchExecutor.submit(new Runnable { + override def run(): Unit = { + val slowFetchLimiterHandler = slowFetchLimiter.acquire(bytesNeed) + try { + ReadHint.markReadAll() + fetchMessages0(params, fetchInfos, quota, response => { + try { + responseCallback.apply(response) + } finally { + slowFetchLimiterHandler.close() + } + }) + } catch { + case slowEx: Throwable => + slowFetchLimiterHandler.close() + handleError(slowEx) + } + } + }) + } else { + error(s"Unexpected error handling request ${params} ${fetchInfos} ", e) + } + } + } + }) + } + + private def fetchMessages0( + params: FetchParams, + fetchInfos: Seq[(TopicIdPartition, PartitionData)], + quota: ReplicaQuota, + responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit + ): Unit = { // check if this fetch request can be satisfied right away // AutoMQ for Kafka inject start val logReadResults = readFromLocalLogV2(params, fetchInfos, quota, readFromPurgatory = false)