Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
122 commits
Select commit Hold shift + click to select a range
cb6f22f
Reduce remote storage by checking local retention
jiafu1115 Nov 18, 2025
4f96275
correct the configure name to keep same style with existed configures…
jiafu1115 Nov 19, 2025
c410ae3
fix the unit test for null point protect
jiafu1115 Nov 19, 2025
b765f33
Merge branch 'apache:trunk' into storage
jiafu1115 Jan 5, 2026
c4a77b9
correct the desc according to kip's review
jiafu1115 Jan 7, 2026
1b0705e
Clarify remote log latest enable documentation
jiafu1115 Jan 7, 2026
2cb87c7
change the naming according to KIP review
jiafu1115 Jan 29, 2026
aefdac1
add broker level configure according to KIP review
jiafu1115 Jan 29, 2026
3c5131b
fix unit test
jiafu1115 Jan 29, 2026
44d74ac
improve the code
jiafu1115 Jan 29, 2026
791881e
remove duplicated code for check
jiafu1115 Jan 30, 2026
31ecadb
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
f2b2464
Merge remote-tracking branch 'origin/storage' into storage
jiafu1115 Feb 5, 2026
5b059e1
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
4c108d3
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
05392d5
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
60060e2
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
997ecbd
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
3398d49
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
2ae53dd
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
7ef0a9e
KAFKA-19893: refactor the name and fix unit test
jiafu1115 Feb 6, 2026
b87f0af
KAFKA-19893: correct the java doc
jiafu1115 Feb 6, 2026
f54c033
KAFKA-19893: add protected for the configures
jiafu1115 Feb 6, 2026
ef1ee28
KAFKA-19893: correct the java doc
jiafu1115 Feb 6, 2026
1658a70
KAFKA-19893: correct the java doc
jiafu1115 Feb 6, 2026
d99d7de
fix compile issue
jiafu1115 Feb 6, 2026
a5b6733
simple the code
jiafu1115 Feb 6, 2026
8b35773
Revert "simple the code"
jiafu1115 Feb 6, 2026
9ea9ac2
Don't change other codes
jiafu1115 Feb 7, 2026
7eefc4c
Keep the same style
jiafu1115 Feb 7, 2026
f18b95a
Merge branch 'apache:trunk' into storage
jiafu1115 Feb 7, 2026
f0adf67
code refactor
jiafu1115 Feb 7, 2026
3d14856
Refactor remote copy lag validation methods
jiafu1115 Feb 7, 2026
b6c53b6
define the constants value
jiafu1115 Feb 8, 2026
e19efef
Update comments for MAX_REMOTE_COPY_LAG constants
jiafu1115 Feb 8, 2026
dea59a3
refactor
jiafu1115 Feb 8, 2026
b2d2079
Code format
jiafu1115 Feb 8, 2026
cf80f85
correct the total size calculate
jiafu1115 Feb 8, 2026
a747a1a
refactor
jiafu1115 Feb 8, 2026
a392516
Merge branch 'apache:trunk' into storage
jiafu1115 Feb 10, 2026
71cdaf9
Clarify segment upload eligibility in documentation
jiafu1115 Feb 10, 2026
b5ad798
Update documentation for REMOTE_COPY_LAG_MS_CONFIG
jiafu1115 Feb 10, 2026
eaa2e40
Update documentation for remote copy lag configurations
jiafu1115 Feb 10, 2026
4ffe31f
Improve the log
jiafu1115 Feb 13, 2026
7ee6b12
Update retention checks to allow zero values
jiafu1115 Feb 13, 2026
b3b1d13
Fix comparison for remote copy lag checks
jiafu1115 Feb 13, 2026
a6bc552
Fix conditions for copy lag checks in RemoteLogManager
jiafu1115 Feb 13, 2026
1761b0a
refactor document
jiafu1115 Feb 13, 2026
31478c5
refactor document
jiafu1115 Feb 13, 2026
d493ba0
refactor document
jiafu1115 Feb 13, 2026
f994e15
correct the document
jiafu1115 Feb 13, 2026
21200a0
correct the document
jiafu1115 Feb 13, 2026
7aa8e44
correct the document again
jiafu1115 Feb 13, 2026
32ca138
Fix documentation for remote copy lag configs
jiafu1115 Feb 13, 2026
ee9e6ca
Update comments for MAX_REMOTE_COPY_LAG constants
jiafu1115 Feb 13, 2026
95039d4
Fix typos in TopicConfig documentation
jiafu1115 Feb 13, 2026
fc71a83
Fix documentation for remote log copy lag properties
jiafu1115 Feb 13, 2026
017deec
improve the document and implement
jiafu1115 Feb 13, 2026
5b223b1
Update LogConfig.java
jiafu1115 Feb 13, 2026
f458edd
Update comments for MAX_REMOTE_COPY_LAG constants
jiafu1115 Feb 13, 2026
2087e06
remove useless null protected judgement
jiafu1115 Feb 13, 2026
e550341
Merge branch 'trunk' into storage
jiafu1115 Feb 23, 2026
7574235
correct the judgement for upload
jiafu1115 Feb 23, 2026
f2ffe86
correct the format
jiafu1115 Feb 23, 2026
95d096b
correct the format
jiafu1115 Feb 23, 2026
b5aa4a5
correct the format
jiafu1115 Feb 23, 2026
0c4c7c3
code refactor
jiafu1115 Feb 23, 2026
f9afae2
code refactor
jiafu1115 Feb 23, 2026
1c251f0
correct the code
jiafu1115 Feb 23, 2026
7148ac5
improve the document
jiafu1115 Feb 23, 2026
47a14ab
Merge branch 'apache:trunk' into storage
jiafu1115 Mar 25, 2026
c89f5ed
Merge branch 'apache:trunk' into storage
jiafu1115 Mar 28, 2026
cf52590
refactor the code using new definition for delay configure
jiafu1115 Mar 31, 2026
7bbed2e
Correct multiple log-related imports to RemoteLogManager
jiafu1115 Mar 31, 2026
7a4ef68
Update copy lag condition checks in RemoteLogManager
jiafu1115 Mar 31, 2026
fa34452
Refactor copy lag checks in RemoteLogManager
jiafu1115 Mar 31, 2026
8f2c427
Fix the check style issue
jiafu1115 Mar 31, 2026
ab81e29
correct the configure item's default value for delay upload
jiafu1115 Apr 1, 2026
2b5fede
refactor the method position
jiafu1115 Apr 1, 2026
4050a5b
Refine documentation for retention configuration options
jiafu1115 Apr 1, 2026
a1adb06
Update RemoteLogManagerConfig.java
jiafu1115 Apr 1, 2026
f2a1949
add more validation
jiafu1115 Apr 1, 2026
4dfa05b
Merge remote-tracking branch 'origin/storage' into storage
jiafu1115 Apr 1, 2026
4dbd1de
Fix return value in copy lag condition
jiafu1115 Apr 1, 2026
5573462
Fix documentation typos in RemoteLogManagerConfig
jiafu1115 Apr 1, 2026
a5dbf91
Fix documentation for remote copy lag configuration
jiafu1115 Apr 1, 2026
3ac973d
Fix documentation for LOG_REMOTE_COPY_LAG_MS_PROP
jiafu1115 Apr 1, 2026
075ea73
add more description
jiafu1115 Apr 1, 2026
6aa8c91
improve the documents
jiafu1115 Apr 1, 2026
7fe96a4
fix code issue which cause unit test failed
jiafu1115 Apr 2, 2026
571280a
Merge branch 'apache:trunk' into storage
jiafu1115 Apr 2, 2026
bfc7530
remove example line
jiafu1115 Apr 2, 2026
a06de2b
add more debug log
jiafu1115 Apr 3, 2026
9115afd
refactor the basic log
jiafu1115 Apr 3, 2026
a23d2ea
Refactor copy lag checks in RemoteLogManager
jiafu1115 Apr 3, 2026
3d966a1
Add log configure unit test
jiafu1115 Apr 3, 2026
fba4cea
Add log configure unit test
jiafu1115 Apr 3, 2026
0a7038b
Add log configure unit test
jiafu1115 Apr 3, 2026
f377844
Fix one bug and add more tests
jiafu1115 Apr 3, 2026
02819b2
Using real constants instead of magic number
jiafu1115 Apr 4, 2026
3d7c5c9
Add more unit tests
jiafu1115 Apr 4, 2026
79d9587
improve the unit test
jiafu1115 Apr 4, 2026
d85551e
Fix typos in TopicConfig documentation
jiafu1115 May 5, 2026
bc5b395
Fix typos in RemoteLogManagerConfig documentation
jiafu1115 May 5, 2026
eec8220
Merge branch 'trunk' into storage
jiafu1115 May 5, 2026
1122220
judge logger.isDebugEnabled() before print debug
jiafu1115 May 5, 2026
2fa65e7
change debug to trace
jiafu1115 May 5, 2026
e1a9c99
improve the code for config according to code review
jiafu1115 May 5, 2026
6d481c5
add more validation configure
jiafu1115 May 7, 2026
b302dac
add document for Constraint
jiafu1115 May 7, 2026
c32d37b
Merge branch 'apache:trunk' into storage
jiafu1115 May 7, 2026
31c686a
update document
jiafu1115 May 7, 2026
8036c81
remove the logic for -1 and -2
jiafu1115 May 7, 2026
abd0563
change to 0
jiafu1115 May 7, 2026
33944d4
remove useless test
jiafu1115 May 7, 2026
ec8ff4f
correct the unit test
jiafu1115 May 7, 2026
06a93df
correct the unit test
jiafu1115 May 7, 2026
a20e4d2
correct the unit test
jiafu1115 May 7, 2026
3cd8b26
correct the unit test
jiafu1115 May 7, 2026
8a0a031
correct the implement
jiafu1115 May 7, 2026
f26f152
remove the logic for -1 and -2
jiafu1115 May 7, 2026
5a68341
Merge remote-tracking branch 'origin/storageV2' into storageV2
jiafu1115 May 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ public class TopicConfig {
"(i.e. local.retention.ms/bytes) becomes irrelevant, and all data expiration follows the topic-wide retention configuration" +
"(i.e. retention.ms/bytes).";

public static final String REMOTE_COPY_LAG_MS_CONFIG = "remote.copy.lag.ms";
public static final String REMOTE_COPY_LAG_MS_DOC = "Controls how long to delay uploading segments to remote storage. " +
"When set to 0, immediate upload when time-based retention is used; otherwise no time-based delay check. " +
"When set to a positive value (ms), a segment can't become eligible for upload until the time since the latest record in the segment reaches the value. " +
"The value should not exceed the real local retention ms. " +
"For how the real local retention time is computed, see <code>local.retention.ms</code>.";

public static final String REMOTE_COPY_LAG_BYTES_CONFIG = "remote.copy.lag.bytes";
public static final String REMOTE_COPY_LAG_BYTES_DOC = "Controls size-based delay for uploading segments to remote storage. " +
"When set to 0, immediate upload when size-based retention is used; otherwise no size-based delay check. " +
"When set to a positive value (bytes), a segment can't become eligible for upload until the total bytes of log data after the segment reach the value. " +
"The value should not exceed the real local retention bytes. " +
"For how the real local retention size is computed, see <code>local.retention.bytes</code>.";

public static final String REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = "remote.log.delete.on.disable";
public static final String REMOTE_LOG_DELETE_ON_DISABLE_DOC = "Determines whether tiered data for a topic should be " +
"deleted after tiered storage is disabled on a topic. This configuration should be enabled when trying to " +
Expand Down
26 changes: 26 additions & 0 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,30 @@ class DynamicLogConfig(logManager: LogManager, directoryEventHandler: DirectoryE
}
}

def validateLogRemoteCopyLagMs(): Unit = {
val logRetentionMs: Long = newConfig.logRetentionTimeMillis
val logLocalRetentionMs = newConfig.remoteLogManagerConfig.logLocalRetentionMs
val effectiveLocalRetentionMs = if (logLocalRetentionMs == -2L) logRetentionMs else logLocalRetentionMs
val logRemoteCopyLagMs = newConfig.remoteLogManagerConfig.logRemoteCopyLagMs
val logRemoteCopyLagBytes = newConfig.remoteLogManagerConfig.logRemoteCopyLagBytes
if (logRemoteCopyLagMs > 0L && effectiveLocalRetentionMs >= 0L && logRemoteCopyLagMs > effectiveLocalRetentionMs) {
throw new ConfigException(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP, logRemoteCopyLagMs,
s"Value must not exceed ${RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP} (effective value: $effectiveLocalRetentionMs)")
}
}

def validateLogRemoteCopyLagBytes(): Unit = {
val logRetentionBytes: Long = newConfig.logRetentionBytes
val logLocalRetentionBytes = newConfig.remoteLogManagerConfig.logLocalRetentionBytes
val effectiveLocalRetentionBytes = if (logLocalRetentionBytes == -2L) logRetentionBytes else logLocalRetentionBytes
val logRemoteCopyLagBytes = newConfig.remoteLogManagerConfig.logRemoteCopyLagBytes
val logRemoteCopyLagMs = newConfig.remoteLogManagerConfig.logRemoteCopyLagMs
if (logRemoteCopyLagBytes > 0L && effectiveLocalRetentionBytes >= 0L && logRemoteCopyLagBytes > effectiveLocalRetentionBytes) {
throw new ConfigException(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP, logRemoteCopyLagBytes,
s"Value must not exceed ${RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP} (effective value: $effectiveLocalRetentionBytes)")
}
}

def validateCordonedLogDirs(): Unit = {
val logDirs = newConfig.logDirs()
val cordonedLogDirs = newConfig.cordonedLogDirs()
Expand All @@ -592,6 +616,8 @@ class DynamicLogConfig(logManager: LogManager, directoryEventHandler: DirectoryE

validateLogLocalRetentionMs()
validateLogLocalRetentionBytes()
validateLogRemoteCopyLagMs()
validateLogRemoteCopyLagBytes()
validateCordonedLogDirs()
}

Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, logMessageTimestampAfterMaxMs: java.lang.Long)
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, remoteLogManagerConfig.logLocalRetentionMs: java.lang.Long)
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, remoteLogManagerConfig.logLocalRetentionBytes: java.lang.Long)
logProps.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, remoteLogManagerConfig.logRemoteCopyLagMs: java.lang.Long)
logProps.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, remoteLogManagerConfig.logRemoteCopyLagBytes: java.lang.Long)
logProps
}
}
47 changes: 47 additions & 0 deletions core/src/test/scala/unit/kafka/log/LogConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class LogConfigTest {
case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG => assertPropertyInvalid(name, "not_a_number", "-0.1")
case TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0")
case TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0")
case TopicConfig.REMOTE_COPY_LAG_MS_CONFIG => assertPropertyInvalid(name, "not_a_number", "-1")
case TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG => assertPropertyInvalid(name, "not_a_number", "-1")
case TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG => assertPropertyInvalid(name, "not_a_boolean")
case LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG => // no op

Expand Down Expand Up @@ -258,6 +260,51 @@ class LogConfigTest {
doTestInvalidLocalLogRetentionProps(2000L, -1, 100, 1000L)
}

@Test
def testInvalidRemoteCopyLagMsWhenGreaterThanEffectiveLocalRetentionMs(): Unit = {
val props = new util.HashMap[String, String]()
props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
props.put(TopicConfig.RETENTION_MS_CONFIG, "1000")
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2")
props.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, "1001")

val exception = assertThrows(classOf[ConfigException], () => validateTopicLogConfig(props))
assertTrue(exception.getMessage.contains(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG))
}

@Test
def testInvalidRemoteCopyLagBytesWhenGreaterThanEffectiveLocalRetentionBytes(): Unit = {
val props = new util.HashMap[String, String]()
props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
props.put(TopicConfig.RETENTION_BYTES_CONFIG, "1000")
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2")
props.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, "1001")

val exception = assertThrows(classOf[ConfigException], () => validateTopicLogConfig(props))
assertTrue(exception.getMessage.contains(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG))
}

@Test
def testValidRemoteCopyLagWhenBothLagChecksAreDisabled(): Unit = {
val props = new util.HashMap[String, String]()
props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
props.put(TopicConfig.RETENTION_MS_CONFIG, "1000")
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2")
props.put(TopicConfig.RETENTION_BYTES_CONFIG, "1000")
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2")
props.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, "0")
props.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, "0")

validateTopicLogConfig(props)
}

private def validateTopicLogConfig(props: util.Map[String, String]): Unit = {
val kafkaProps = TestUtils.createDummyBrokerConfig()
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true")
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
LogConfig.validate(util.Map.of, props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
}

private def doTestInvalidLocalLogRetentionProps(localRetentionMs: Long,
localRetentionBytes: Int,
retentionBytes: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,55 @@ class DynamicBrokerConfigTest {
verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100)
}

@Test
def testDynamicRemoteCopyLagThrowsOnIncorrectConfig(): Unit = {
// remote copy lag ms cannot exceed effective local retention ms
verifyIncorrectRemoteCopyLagProps(
retentionMs = 1000L,
logLocalRetentionMs = -2L,
remoteCopyLagMs = 1001L,
retentionBytes = 1000L,
logLocalRetentionBytes = -2L,
remoteCopyLagBytes = 100L
)

// remote copy lag bytes cannot exceed effective local retention bytes
verifyIncorrectRemoteCopyLagProps(
retentionMs = 1000L,
logLocalRetentionMs = -2L,
remoteCopyLagMs = 100L,
retentionBytes = 1000L,
logLocalRetentionBytes = -2L,
remoteCopyLagBytes = 1001L
)

}

def verifyIncorrectRemoteCopyLagProps(retentionMs: Long,
logLocalRetentionMs: Long,
remoteCopyLagMs: Long,
retentionBytes: Long,
logLocalRetentionBytes: Long,
remoteCopyLagBytes: Long): Unit = {
val props = TestUtils.createBrokerConfig(0, port = 8181)
props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, retentionMs.toString)
props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, retentionBytes.toString)
val config = KafkaConfig(props)
val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[DirectoryEventHandler]))
config.dynamicConfig.initialize(None)
config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)

val newProps = new Properties()
newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs.toString)
newProps.put(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP, remoteCopyLagMs.toString)
newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, logLocalRetentionBytes.toString)
newProps.put(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP, remoteCopyLagBytes.toString)
// validate default config
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = false))
// validate per broker config
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = true))
}

@Test
def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = {
val props = TestUtils.createBrokerConfig(0, port = 8181)
Expand Down
6 changes: 6 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,8 @@ class KafkaConfigTest {
case RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", -3)
case RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", -3)
case RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1")
case RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1")

/** New group coordinator configs */
case GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
Expand Down Expand Up @@ -1200,6 +1202,10 @@ class KafkaConfigTest {
assertDynamic(kafkaConfigProp, 10015L, () => config.remoteLogManagerConfig.logLocalRetentionMs)
case TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG =>
assertDynamic(kafkaConfigProp, 10016L, () => config.remoteLogManagerConfig.logLocalRetentionBytes)
case TopicConfig.REMOTE_COPY_LAG_MS_CONFIG =>
assertDynamic(kafkaConfigProp, 10017L, () => config.remoteLogManagerConfig.logRemoteCopyLagMs)
case TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG =>
assertDynamic(kafkaConfigProp, 10018L, () => config.remoteLogManagerConfig.logRemoteCopyLagBytes)
// not dynamically updatable
case QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG =>
// topic only config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ public final class ServerTopicConfigSynonyms {
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG),
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG),
sameNameWithLogPrefix(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG),
sameNameWithLogPrefix(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)
sameNameWithLogPrefix(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG),
sameNameWithLogPrefix(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG),
sameNameWithLogPrefix(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG)
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.kafka.storage.internals.log.AsyncOffsetReader;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogSegment;
import org.apache.kafka.storage.internals.log.OffsetIndex;
Expand Down Expand Up @@ -916,6 +917,7 @@ private void maybeUpdateCopiedOffset(UnifiedLog log) throws RemoteStorageExcepti
* 1) Segment is not the active segment and
* 2) Segment end-offset is less than the last-stable-offset as remote storage should contain only
* committed/acked messages
* 3) Segment has exceeded copy lag by time or size when configured (remote.copy.lag.ms, remote.copy.lag.bytes)
* @param log The log from which the segments are to be copied
* @param fromOffset The offset from which the segments are to be copied
* @param lastStableOffset The last stable offset of the log
Expand All @@ -925,10 +927,17 @@ List<EnrichedLogSegment> candidateLogSegments(UnifiedLog log, Long fromOffset, L
List<EnrichedLogSegment> candidateLogSegments = new ArrayList<>();
List<LogSegment> segments = log.logSegments(fromOffset, Long.MAX_VALUE);
if (!segments.isEmpty()) {
long currentTimeMs = time.milliseconds();
long totalLogSize = UnifiedLog.sizeInBytes(segments);
long cumulativeSize = 0;
for (int idx = 1; idx < segments.size(); idx++) {
LogSegment previousSeg = segments.get(idx - 1);
LogSegment currentSeg = segments.get(idx);
if (currentSeg.baseOffset() <= lastStableOffset) {
cumulativeSize += previousSeg.size();
if (delayCopy(log.config(), previousSeg, currentTimeMs, totalLogSize, cumulativeSize)) {
break;
}
candidateLogSegments.add(new EnrichedLogSegment(previousSeg, currentSeg.baseOffset()));
}
}
Expand All @@ -937,6 +946,61 @@ List<EnrichedLogSegment> candidateLogSegments(UnifiedLog log, Long fromOffset, L
return candidateLogSegments;
}

private boolean delayCopy(LogConfig logConfig, LogSegment previousSeg, long currentTimeMs, long totalLogSize, long cumulativeSize) {
if (logConfig == null) {
return false;
}

long copyLagMs = logConfig.remoteCopyLagMs();
long copyLagBytes = logConfig.remoteCopyLagBytes();
if (logger.isTraceEnabled()) {
logger.trace("delayCopy check for segment {}: copyLagMs={}, copyLagBytes={}, currentTimeMs={}, totalLogSize={}, cumulativeSize={}, sizeLagBytes={}",
previousSeg, copyLagMs, copyLagBytes, currentTimeMs, totalLogSize, cumulativeSize, totalLogSize - cumulativeSize);
}

boolean needCheckCopyLagMs = copyLagMs > 0;
boolean needCheckCopyLagBytes = copyLagBytes > 0;

if (!needCheckCopyLagMs && !needCheckCopyLagBytes) {
return false;
}

if (needCheckCopyLagMs && needCheckCopyLagBytes) {
return notExceededCopyLagTime(previousSeg, currentTimeMs, copyLagMs) && notExceededCopyLagSize(previousSeg, totalLogSize, cumulativeSize, copyLagBytes);
}

if (needCheckCopyLagBytes) {
return notExceededCopyLagTime(previousSeg, currentTimeMs, copyLagMs);
}

return notExceededCopyLagSize(previousSeg, totalLogSize, cumulativeSize, copyLagBytes);
}

private boolean notExceededCopyLagTime(LogSegment segment, long currentTimeMs, long copyLagMs) {
try {
long segmentAgeMs = currentTimeMs - segment.largestTimestamp();
boolean exceeded = segmentAgeMs >= copyLagMs;
if (logger.isTraceEnabled()) {
logger.trace("{} eligible for upload by time? {} (segment age {} ms, copy lag {} ms)",
segment, exceeded, segmentAgeMs, copyLagMs);
}
return !exceeded;
} catch (IOException e) {
logger.warn("Failed to get largest timestamp for segment {}, take it as eligible for upload based on time", segment, e);
return false;
}
}

private boolean notExceededCopyLagSize(LogSegment segment, long totalLogSize, long cumulativeSize, long copyLagBytes) {
long sizeLagBytes = totalLogSize - cumulativeSize;
boolean exceeded = sizeLagBytes >= copyLagBytes;
if (logger.isTraceEnabled()) {
logger.trace("{} eligible for upload by size? {} (size lag {} bytes, copy lag {} bytes, totalLogSize={}, cumulativeSize={})",
segment, exceeded, sizeLagBytes, copyLagBytes, totalLogSize, cumulativeSize);
}
return !exceeded;
}

public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException, RetriableRemoteStorageException {
if (isCancelled())
return;
Expand Down
Loading