diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index de60abce91..f01d0733d7 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1570,7 +1570,7 @@ object KafkaConfig { .define(S3WALWindowMaxProp, LONG, 536870912L, MEDIUM, S3WALWindowMaxDoc) .define(S3WALUploadThresholdProp, LONG, 104857600L, MEDIUM, S3WALUploadThresholdDoc) .define(S3StreamSplitSizeProp, INT, 16777216, MEDIUM, S3StreamSplitSizeDoc) - .define(S3ObjectBlockSizeProp, INT, 8388608, MEDIUM, S3ObjectBlockSizeDoc) + .define(S3ObjectBlockSizeProp, INT, 1048576, MEDIUM, S3ObjectBlockSizeDoc) .define(S3ObjectPartSizeProp, INT, 16777216, MEDIUM, S3ObjectPartSizeDoc) .define(S3BlockCacheSizeProp, LONG, 104857600L, MEDIUM, S3BlockCacheSizeDoc) .define(S3StreamObjectCompactionIntervalMinutesProp, INT, 60, MEDIUM, S3StreamObjectCompactionIntervalMinutesDoc) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 32935085d1..69e52e29ff 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1071,12 +1071,11 @@ class ReplicaManager(val config: KafkaConfig, responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit ): Unit = { - def handleError(e: Throwable): Unit = { - error(s"Unexpected error handling request ${params} ${fetchInfos} ", e) - // convert fetchInfos to error Seq[(TopicPartition, FetchPartitionData)] for callback + def responseEmpty(e: Throwable): Unit = { + val error = if (e == null) { Errors.NONE } else { Errors.forException(e) } val fetchPartitionData = fetchInfos.map { case (tp, _) => tp -> FetchPartitionData( - error = Errors.forException(e), + error = error, highWatermark = -1L, lastStableOffset = None, logStartOffset = -1L, @@ -1089,6 +1088,30 @@ class ReplicaManager(val config: KafkaConfig, responseCallback(fetchPartitionData) } + def handleError(e: Throwable): Unit = { + error(s"Unexpected error handling request ${params} ${fetchInfos} ", e) + // convert fetchInfos to error Seq[(TopicPartition, FetchPartitionData)] for callback + responseEmpty(e) + } + + val start = System.nanoTime() + + def checkMaxWaitMs(): Boolean = { + if (params.maxWaitMs <= 0) { + // If the max wait time is 0, then no need to check quota or linger. + true + } else { + val waitedTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start) + if (waitedTimeMs < params.maxWaitMs) { + return true + } + warn(s"Returning emtpy fetch response for fetch request ${fetchInfos} since the " + + s"wait time ${waitedTimeMs} exceed ${params.maxWaitMs} ms.") + responseEmpty(new TimeoutException(s"wait time ${waitedTimeMs}ms exceed ${params.maxWaitMs}ms.")) + false + } + } + // sum the sizes of topics to fetch from fetchInfos var bytesNeed = fetchInfos.foldLeft(0) { case (sum, (_, partitionData)) => sum + partitionData.maxBytes } bytesNeed = math.min(bytesNeed, params.maxBytes) @@ -1097,6 +1120,10 @@ class ReplicaManager(val config: KafkaConfig, fastFetchExecutor.submit(new Runnable { override def run(): Unit = { val fastFetchLimiterHandler = fastFetchLimiter.acquire(bytesNeed) + if (!checkMaxWaitMs()) { + fastFetchLimiterHandler.close() + return + } try { ReadHint.markReadAll() ReadHint.markFastRead() @@ -1117,6 +1144,10 @@ class ReplicaManager(val config: KafkaConfig, slowFetchExecutor.submit(new Runnable { override def run(): Unit = { val slowFetchLimiterHandler = slowFetchLimiter.acquire(bytesNeed) + if (!checkMaxWaitMs()) { + slowFetchLimiterHandler.close() + return + } try { ReadHint.markReadAll() fetchMessages0(params, fetchInfos, quota, response => { diff --git a/docker/scripts/start.sh b/docker/scripts/start.sh index 8adbd25708..f93df0b56b 100644 --- a/docker/scripts/start.sh +++ b/docker/scripts/start.sh @@ -178,21 +178,6 @@ turn_on_auto_balancer() { fi } -add_settings_for_s3() { - role=$1 - file_name=$2 - auto_balancer_setting_for_all "${file_name}" - if [[ "${role}" == "broker" || "${role}" == "server" ]]; then - add_or_setup_value "s3.wal.capacity" "4294967296" "${file_name}" - add_or_setup_value "s3.wal.cache.size" "1073741824" "${file_name}" - add_or_setup_value "s3.wal.upload.threshold" "536870912" "${file_name}" - add_or_setup_value "s3.stream.object.split.size" "16777216" "${file_name}" - add_or_setup_value "s3.object.block.size" "16777216" "${file_name}" - add_or_setup_value "s3.object.part.size" "33554432" "${file_name}" - add_or_setup_value "s3.block.cache.size" "1073741824" "${file_name}" - add_or_setup_value "stream.set.object.compaction.cache.size" "536870912" "${file_name}" - fi -} # monitor and change advertised ip for kafka kafka_monitor_ip() { @@ -290,8 +275,6 @@ kafka_up() { sed -i "s|Environment='KAFKA_S3_SECRET_KEY.*$|Environment='KAFKA_S3_SECRET_KEY=${s3_secret_key}'|" "${start_dir}/kafka.service" # turn on auto_balancer turn_on_auto_balancer "${role}" "${kafka_dir}/config/kraft/${role}.properties" - # change s3 related settings - add_settings_for_s3 "${role}" "${kafka_dir}/config/kraft/${role}.properties" done if [[ -n "${KAFKA_HEAP_OPTS}" ]]; then