Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 7 additions & 58 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package kafka.server

import com.automq.stream.api.exceptions.FastReadFailFastException
import com.automq.stream.s3.metrics.TimerUtil
import com.automq.stream.utils.FutureUtil
import com.yammer.metrics.core.Histogram
import kafka.admin.AdminUtils
import kafka.api.ElectLeadersRequestOps
Expand All @@ -28,7 +26,7 @@ import kafka.controller.ReplicaAssignment
import kafka.coordinator.group._
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.log.AppendOrigin
import kafka.log.streamaspect.{ElasticLogManager, ReadHint}
import kafka.log.streamaspect.ElasticLogManager
import kafka.message.ZStdCompressionCodec
import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsUtil}
import kafka.network.RequestChannel
Expand Down Expand Up @@ -127,8 +125,6 @@ class KafkaApis(val requestChannel: RequestChannel,
val aclApis = new AclApis(authHelper, authorizer, requestHelper, "broker", config)
val configManager = new ConfigAdminManager(brokerId, config, configRepository)

val fastFetchExecutor = Executors.newFixedThreadPool(4, ThreadUtils.createThreadFactory("kafka-apis-fast-fetch-executor-%d", true))
val slowFetchExecutor = Executors.newFixedThreadPool(12, ThreadUtils.createThreadFactory("kafka-apis-slow-fetch-executor-%d", true))
val asyncHandleExecutor = Executors.newSingleThreadExecutor(ThreadUtils.createThreadFactory("kafka-apis-async-handle-executor-%d", true))

def close(): Unit = {
Expand Down Expand Up @@ -1081,59 +1077,12 @@ class KafkaApis(val requestChannel: RequestChannel,
clientMetadata = clientMetadata
)

// AutoMQ for Kafka inject start
def doFetchingRecords(): Unit = {
// call the replica manager to fetch messages from the local replica
replicaManager.fetchMessages(
params = params,
fetchInfos = interesting,
quota = replicationQuota(fetchRequest),
responseCallback = processResponseCallback,
)
}

def handleError(e: Throwable): Unit = {
error(s"Unexpected error handling request ${request.requestDesc(true)} " +
s"with context ${request.context}", e)
requestHelper.handleError(request, e)
}

if (ElasticLogManager.enabled()) {
// The fetching is done is a separate thread pool to avoid blocking io thread.
fastFetchExecutor.submit(new Runnable {
override def run(): Unit = {
try {
ReadHint.markReadAll()
ReadHint.markFastRead();
doFetchingRecords()
ReadHint.clear()
} catch {
case e: Throwable =>
val ex = FutureUtil.cause(e)
val fastReadFailFast = ex.isInstanceOf[FastReadFailFastException]
if (fastReadFailFast) {
slowFetchExecutor.submit(new Runnable {
override def run(): Unit = {
try {
ReadHint.markReadAll()
doFetchingRecords()
} catch {
case slowEx: Throwable =>
handleError(slowEx)
}
}
})
} else {
handleError(e)
}
}
}
})
} else {
doFetchingRecords()
}

// AutoMQ for Kafka inject end
replicaManager.fetchMessages(
params = params,
fetchInfos = interesting,
quota = replicationQuota(fetchRequest),
responseCallback = processResponseCallback,
)
}
}

Expand Down
58 changes: 58 additions & 0 deletions core/src/main/scala/kafka/server/MemoryLimiter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server;

import java.util.concurrent.Semaphore;

public class MemoryLimiter {
private final int maxPermits;
private final Semaphore permits;

public MemoryLimiter(int size) {
maxPermits = size;
permits = new Semaphore(size);
}

/**
* Acquire permits, if not enough, block until enough.
* Note: the acquire is fair, the acquired will be permitted in the acquire order.
*/
public synchronized Handler acquire(int permit) throws InterruptedException {
if (permit > maxPermits) {
permit = maxPermits;
}
boolean acquireRst = permits.tryAcquire(permit);
if (!acquireRst) {
permits.acquire(permit);
}
return new Handler(permit);
}

public class Handler implements AutoCloseable {
private final int permit;

public Handler(int permit) {
this.permit = permit;
}

@Override
public void close() {
permits.release(permit);
}
}
}
84 changes: 83 additions & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import kafka.cluster.{BrokerEndPoint, Partition}
import kafka.common.RecordValidationException
import kafka.controller.{KafkaController, StateChangeLogger}
import kafka.log._
import kafka.log.streamaspect.ElasticLogManager
import kafka.log.streamaspect.{ElasticLogManager, ReadHint}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers
Expand Down Expand Up @@ -247,6 +247,11 @@ class ReplicaManager(val config: KafkaConfig,

private var logDirFailureHandler: LogDirFailureHandler = _

val fastFetchExecutor = Executors.newFixedThreadPool(4, ThreadUtils.createThreadFactory("kafka-apis-fast-fetch-executor-%d", true))
val slowFetchExecutor = Executors.newFixedThreadPool(12, ThreadUtils.createThreadFactory("kafka-apis-slow-fetch-executor-%d", true))
val fastFetchLimiter = new MemoryLimiter(100 * 1024 * 1024)
val slowFetchLimiter = new MemoryLimiter(100 * 1024 * 1024)

private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) {
override def doWork(): Unit = {
val newOfflineLogDir = logDirFailureChannel.takeNextOfflineLogDir()
Expand Down Expand Up @@ -1065,6 +1070,83 @@ class ReplicaManager(val config: KafkaConfig,
quota: ReplicaQuota,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
): Unit = {

def handleError(e: Throwable): Unit = {
error(s"Unexpected error handling request ${params} ${fetchInfos} ", e)
// convert fetchInfos to error Seq[(TopicPartition, FetchPartitionData)] for callback
val fetchPartitionData = fetchInfos.map { case (tp, _) =>
tp -> FetchPartitionData(
error = Errors.forException(e),
highWatermark = -1L,
lastStableOffset = None,
logStartOffset = -1L,
abortedTransactions = None,
preferredReadReplica = None,
records = MemoryRecords.EMPTY,
isReassignmentFetch = false,
divergingEpoch = None)
}
responseCallback(fetchPartitionData)
}

// sum the sizes of topics to fetch from fetchInfos
var bytesNeed = fetchInfos.foldLeft(0) { case (sum, (_, partitionData)) => sum + partitionData.maxBytes }
bytesNeed = math.min(bytesNeed, params.maxBytes)

// The fetching is done is a separate thread pool to avoid blocking io thread.
fastFetchExecutor.submit(new Runnable {
override def run(): Unit = {
val fastFetchLimiterHandler = fastFetchLimiter.acquire(bytesNeed)
try {
ReadHint.markReadAll()
ReadHint.markFastRead()
fetchMessages0(params, fetchInfos, quota, response => {
try {
responseCallback.apply(response)
} finally {
fastFetchLimiterHandler.close()
}
})
ReadHint.clear()
} catch {
case e: Throwable =>
fastFetchLimiterHandler.close()
val ex = FutureUtil.cause(e)
val fastReadFailFast = ex.isInstanceOf[FastReadFailFastException]
if (fastReadFailFast) {
slowFetchExecutor.submit(new Runnable {
override def run(): Unit = {
val slowFetchLimiterHandler = slowFetchLimiter.acquire(bytesNeed)
try {
ReadHint.markReadAll()
fetchMessages0(params, fetchInfos, quota, response => {
try {
responseCallback.apply(response)
} finally {
slowFetchLimiterHandler.close()
}
})
} catch {
case slowEx: Throwable =>
slowFetchLimiterHandler.close()
handleError(slowEx)
}
}
})
} else {
error(s"Unexpected error handling request ${params} ${fetchInfos} ", e)
}
}
}
})
}

private def fetchMessages0(
params: FetchParams,
fetchInfos: Seq[(TopicIdPartition, PartitionData)],
quota: ReplicaQuota,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
): Unit = {
// check if this fetch request can be satisfied right away
// AutoMQ for Kafka inject start
val logReadResults = readFromLocalLogV2(params, fetchInfos, quota, readFromPurgatory = false)
Expand Down