Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1570,7 +1570,7 @@ object KafkaConfig {
.define(S3WALWindowMaxProp, LONG, 536870912L, MEDIUM, S3WALWindowMaxDoc)
.define(S3WALUploadThresholdProp, LONG, 104857600L, MEDIUM, S3WALUploadThresholdDoc)
.define(S3StreamSplitSizeProp, INT, 16777216, MEDIUM, S3StreamSplitSizeDoc)
.define(S3ObjectBlockSizeProp, INT, 8388608, MEDIUM, S3ObjectBlockSizeDoc)
.define(S3ObjectBlockSizeProp, INT, 1048576, MEDIUM, S3ObjectBlockSizeDoc)
.define(S3ObjectPartSizeProp, INT, 16777216, MEDIUM, S3ObjectPartSizeDoc)
.define(S3BlockCacheSizeProp, LONG, 104857600L, MEDIUM, S3BlockCacheSizeDoc)
.define(S3StreamObjectCompactionIntervalMinutesProp, INT, 60, MEDIUM, S3StreamObjectCompactionIntervalMinutesDoc)
Expand Down
39 changes: 35 additions & 4 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1071,12 +1071,11 @@ class ReplicaManager(val config: KafkaConfig,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
): Unit = {

def handleError(e: Throwable): Unit = {
error(s"Unexpected error handling request ${params} ${fetchInfos} ", e)
// convert fetchInfos to error Seq[(TopicPartition, FetchPartitionData)] for callback
def responseEmpty(e: Throwable): Unit = {
val error = if (e == null) { Errors.NONE } else { Errors.forException(e) }
val fetchPartitionData = fetchInfos.map { case (tp, _) =>
tp -> FetchPartitionData(
error = Errors.forException(e),
error = error,
highWatermark = -1L,
lastStableOffset = None,
logStartOffset = -1L,
Expand All @@ -1089,6 +1088,30 @@ class ReplicaManager(val config: KafkaConfig,
responseCallback(fetchPartitionData)
}

def handleError(e: Throwable): Unit = {
error(s"Unexpected error handling request ${params} ${fetchInfos} ", e)
// convert fetchInfos to error Seq[(TopicPartition, FetchPartitionData)] for callback
responseEmpty(e)
}

val start = System.nanoTime()

def checkMaxWaitMs(): Boolean = {
if (params.maxWaitMs <= 0) {
// If the max wait time is 0, then no need to check quota or linger.
true
} else {
val waitedTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)
if (waitedTimeMs < params.maxWaitMs) {
return true
}
warn(s"Returning emtpy fetch response for fetch request ${fetchInfos} since the " +
s"wait time ${waitedTimeMs} exceed ${params.maxWaitMs} ms.")
responseEmpty(new TimeoutException(s"wait time ${waitedTimeMs}ms exceed ${params.maxWaitMs}ms."))
false
}
}

// sum the sizes of topics to fetch from fetchInfos
var bytesNeed = fetchInfos.foldLeft(0) { case (sum, (_, partitionData)) => sum + partitionData.maxBytes }
bytesNeed = math.min(bytesNeed, params.maxBytes)
Expand All @@ -1097,6 +1120,10 @@ class ReplicaManager(val config: KafkaConfig,
fastFetchExecutor.submit(new Runnable {
override def run(): Unit = {
val fastFetchLimiterHandler = fastFetchLimiter.acquire(bytesNeed)
if (!checkMaxWaitMs()) {
fastFetchLimiterHandler.close()
return
}
try {
ReadHint.markReadAll()
ReadHint.markFastRead()
Expand All @@ -1117,6 +1144,10 @@ class ReplicaManager(val config: KafkaConfig,
slowFetchExecutor.submit(new Runnable {
override def run(): Unit = {
val slowFetchLimiterHandler = slowFetchLimiter.acquire(bytesNeed)
if (!checkMaxWaitMs()) {
slowFetchLimiterHandler.close()
return
}
try {
ReadHint.markReadAll()
fetchMessages0(params, fetchInfos, quota, response => {
Expand Down
17 changes: 0 additions & 17 deletions docker/scripts/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -178,21 +178,6 @@ turn_on_auto_balancer() {
fi
}

add_settings_for_s3() {
role=$1
file_name=$2
auto_balancer_setting_for_all "${file_name}"
if [[ "${role}" == "broker" || "${role}" == "server" ]]; then
add_or_setup_value "s3.wal.capacity" "4294967296" "${file_name}"
add_or_setup_value "s3.wal.cache.size" "1073741824" "${file_name}"
add_or_setup_value "s3.wal.upload.threshold" "536870912" "${file_name}"
add_or_setup_value "s3.stream.object.split.size" "16777216" "${file_name}"
add_or_setup_value "s3.object.block.size" "16777216" "${file_name}"
add_or_setup_value "s3.object.part.size" "33554432" "${file_name}"
add_or_setup_value "s3.block.cache.size" "1073741824" "${file_name}"
add_or_setup_value "stream.set.object.compaction.cache.size" "536870912" "${file_name}"
fi
}

# monitor and change advertised ip for kafka
kafka_monitor_ip() {
Expand Down Expand Up @@ -290,8 +275,6 @@ kafka_up() {
sed -i "s|Environment='KAFKA_S3_SECRET_KEY.*$|Environment='KAFKA_S3_SECRET_KEY=${s3_secret_key}'|" "${start_dir}/kafka.service"
# turn on auto_balancer
turn_on_auto_balancer "${role}" "${kafka_dir}/config/kraft/${role}.properties"
# change s3 related settings
add_settings_for_s3 "${role}" "${kafka_dir}/config/kraft/${role}.properties"
done

if [[ -n "${KAFKA_HEAP_OPTS}" ]]; then
Expand Down