New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests #15719
KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests #15719
Changes from 1 commit
37987bf
de7eecb
b41ab1a
97768f3
cdf4e06
0cd7dcd
70c99bd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -183,6 +183,7 @@ object KafkaConfig { | |
val LogFlushOffsetCheckpointIntervalMsProp = LogConfigPrefix + "flush.offset.checkpoint.interval.ms" | ||
val LogFlushStartOffsetCheckpointIntervalMsProp = LogConfigPrefix + "flush.start.offset.checkpoint.interval.ms" | ||
val LogPreAllocateProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.PREALLOCATE_CONFIG) | ||
val LogInitialTaskDelayMsProp = LogConfigPrefix + "initial.task.delay.ms" | ||
|
||
/* See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for details */ | ||
@deprecated("3.0") | ||
|
@@ -835,6 +836,7 @@ object KafkaConfig { | |
.define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc) | ||
.define(AlterConfigPolicyClassNameProp, CLASS, null, LOW, AlterConfigPolicyClassNameDoc) | ||
.define(LogMessageDownConversionEnableProp, BOOLEAN, LogConfig.DEFAULT_MESSAGE_DOWNCONVERSION_ENABLE, LOW, LogMessageDownConversionEnableDoc) | ||
.defineInternal(LogInitialTaskDelayMsProp, LONG, LogConfig.DEFAULT_INITIAL_TASK_DELAY_MS, LOW) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we add a doc at the last parameter for other developers know what this config is doing for? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, we can set |
||
|
||
/** ********* Replication configuration ***********/ | ||
.define(ControllerSocketTimeoutMsProp, INT, Defaults.CONTROLLER_SOCKET_TIMEOUT_MS, MEDIUM, ControllerSocketTimeoutMsDoc) | ||
|
@@ -1410,6 +1412,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami | |
def logFlushIntervalMs: java.lang.Long = Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)) | ||
def minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp) | ||
def logPreAllocateEnable: java.lang.Boolean = getBoolean(KafkaConfig.LogPreAllocateProp) | ||
def logInitialTaskDelayMs: java.lang.Long = Option(getLong(KafkaConfig.LogInitialTaskDelayMsProp)).getOrElse(LogConfig.DEFAULT_INITIAL_TASK_DELAY_MS) | ||
|
||
// We keep the user-provided String as `MetadataVersion.fromVersionString` can choose a slightly different version (eg if `0.10.0` | ||
// is passed, `0.10.0-IV0` may be picked) | ||
|
@@ -1820,6 +1823,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami | |
} | ||
} | ||
} | ||
require(logInitialTaskDelayMs >= 0, s"`${KafkaConfig.LogInitialTaskDelayMsProp}` must be greater than or equal to 0") | ||
require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1") | ||
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0") | ||
require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, greater than or equal to 1") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -413,7 +413,7 @@ class LogManagerTest { | |
assertEquals(numMessages * setSize / segmentBytes, log.numberOfSegments, "Check we have the expected number of segments.") | ||
|
||
// this cleanup shouldn't find any expired segments but should delete some to reduce size | ||
time.sleep(logManager.InitialTaskDelayMs) | ||
time.sleep(logManager.initialTaskDelayMs) | ||
assertEquals(6, log.numberOfSegments, "Now there should be exactly 6 segments") | ||
time.sleep(log.config.fileDeleteDelayMs + 1) | ||
Comment on lines
-416
to
420
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we create a test in LogManagerTest to verify the logManager will start these tasks after customized There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Huge thanks ! I'm a little confused about the comment above, the test in LogManagerTest itself verify that tasks like log cleanup, flush logs are triggered after sleeping initialTaskDelayMs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, and currently, all of them are setting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the explanation !
I decided to follow the comment as you mentioned earlier, and updated the initialTaskDelayMs to 10s in LogManagerTests. Please take another look 😃 |
||
|
||
|
@@ -482,7 +482,7 @@ class LogManagerTest { | |
val set = TestUtils.singletonRecords("test".getBytes()) | ||
log.appendAsLeader(set, leaderEpoch = 0) | ||
} | ||
time.sleep(logManager.InitialTaskDelayMs) | ||
time.sleep(logManager.initialTaskDelayMs) | ||
assertTrue(lastFlush != log.lastFlushTime, "Time based flush should have been triggered") | ||
} | ||
|
||
|
@@ -637,9 +637,9 @@ class LogManagerTest { | |
fileInIndex.get.getAbsolutePath) | ||
} | ||
|
||
time.sleep(logManager.InitialTaskDelayMs) | ||
time.sleep(logManager.initialTaskDelayMs) | ||
assertTrue(logManager.hasLogsToBeDeleted, "Logs deleted too early") | ||
time.sleep(logManager.currentDefaultConfig.fileDeleteDelayMs - logManager.InitialTaskDelayMs) | ||
time.sleep(logManager.currentDefaultConfig.fileDeleteDelayMs - logManager.initialTaskDelayMs) | ||
assertFalse(logManager.hasLogsToBeDeleted, "Logs not deleted") | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1524,7 +1524,8 @@ object TestUtils extends Logging { | |
logDirFailureChannel = new LogDirFailureChannel(logDirs.size), | ||
keepPartitionMetadataFile = true, | ||
interBrokerProtocolVersion = interBrokerProtocolVersion, | ||
remoteStorageSystemEnable = remoteStorageSystemEnable) | ||
remoteStorageSystemEnable = remoteStorageSystemEnable, | ||
initialTaskDelayMs = LogConfig.DEFAULT_INITIAL_TASK_DELAY_MS) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the goal is to speed up tests, shouldn't we use a lower value here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for your comment @soarez 😃 ! If I understand correctly TestUtils#createLogManager use MockTime and the clock would advance immediately after invoking the time#sleep method in the test, pointing to the corresponding sleep time thereafter. The goal in this pr is to introduce a new internal config to speed up for tests like e2e/integration tests which can't use MockTime as above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. Thanks for clarifying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No problem, thanks for reviewing the pr ! |
||
|
||
if (log.isDefined) { | ||
val spyLogManager = Mockito.spy(logManager) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know currently we don't use LogManagerBuilder in the tests, but I still think we should add a
initialTaskDelayMs
setting and set default value toLogConfig.DEFAULT_INITIAL_TASK_DELAY_MS
.