Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests #15719

Merged
merged 7 commits into from Apr 20, 2024
10 changes: 9 additions & 1 deletion core/src/main/java/kafka/server/builders/LogManagerBuilder.java
Expand Up @@ -22,6 +22,7 @@
import kafka.server.metadata.ConfigRepository;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class LogManagerBuilder {
private Time time = Time.SYSTEM;
private boolean keepPartitionMetadataFile = true;
private boolean remoteStorageSystemEnable = false;
private long initialTaskDelayMs = ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT;

public LogManagerBuilder setLogDirs(List<File> logDirs) {
this.logDirs = logDirs;
Expand Down Expand Up @@ -151,6 +153,11 @@ public LogManagerBuilder setRemoteStorageSystemEnable(boolean remoteStorageSyste
return this;
}

public LogManagerBuilder setInitialTaskDelayMs(long initialTaskDelayMs) {
this.initialTaskDelayMs = initialTaskDelayMs;
return this;
}

public LogManager build() {
if (logDirs == null) throw new RuntimeException("you must set logDirs");
if (configRepository == null) throw new RuntimeException("you must set configRepository");
Expand Down Expand Up @@ -179,6 +186,7 @@ public LogManager build() {
logDirFailureChannel,
time,
keepPartitionMetadataFile,
remoteStorageSystemEnable);
remoteStorageSystemEnable,
initialTaskDelayMs);
}
}
18 changes: 9 additions & 9 deletions core/src/main/scala/kafka/log/LogManager.scala
Expand Up @@ -81,14 +81,13 @@ class LogManager(logDirs: Seq[File],
logDirFailureChannel: LogDirFailureChannel,
time: Time,
val keepPartitionMetadataFile: Boolean,
remoteStorageSystemEnable: Boolean) extends Logging {
remoteStorageSystemEnable: Boolean,
val initialTaskDelayMs: Long) extends Logging {

import LogManager._

private val metricsGroup = new KafkaMetricsGroup(this.getClass)

val InitialTaskDelayMs: Int = 30 * 1000

private val logCreationOrDeletionLock = new Object
private val currentLogs = new Pool[TopicPartition, UnifiedLog]()
// Future logs are put in the directory with "-future" suffix. Future log is created when user wants to move replica
Expand Down Expand Up @@ -628,24 +627,24 @@ class LogManager(logDirs: Seq[File],
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
() => cleanupLogs(),
InitialTaskDelayMs,
initialTaskDelayMs,
retentionCheckMs)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
() => flushDirtyLogs(),
InitialTaskDelayMs,
initialTaskDelayMs,
flushCheckMs)
scheduler.schedule("kafka-recovery-point-checkpoint",
() => checkpointLogRecoveryOffsets(),
InitialTaskDelayMs,
initialTaskDelayMs,
flushRecoveryOffsetCheckpointMs)
scheduler.schedule("kafka-log-start-offset-checkpoint",
() => checkpointLogStartOffsets(),
InitialTaskDelayMs,
initialTaskDelayMs,
flushStartOffsetCheckpointMs)
scheduler.scheduleOnce("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
() => deleteLogs(),
InitialTaskDelayMs)
initialTaskDelayMs)
}
if (cleanerConfig.enableCleaner) {
_cleaner = new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time)
Expand Down Expand Up @@ -1584,7 +1583,8 @@ object LogManager {
time = time,
keepPartitionMetadataFile = keepPartitionMetadataFile,
interBrokerProtocolVersion = config.interBrokerProtocolVersion,
remoteStorageSystemEnable = config.remoteLogManagerConfig.enableRemoteStorageSystem())
remoteStorageSystemEnable = config.remoteLogManagerConfig.enableRemoteStorageSystem(),
initialTaskDelayMs = config.logInitialTaskDelayMs)
}

/**
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Expand Up @@ -573,6 +573,7 @@ object KafkaConfig {
.define(ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, CLASS, null, LOW, ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_DOC)
.define(ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CLASS, null, LOW, ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, BOOLEAN, ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_DEFAULT, LOW, ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_DOC)
.defineInternal(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, LONG, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT, atLeast(0), LOW, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DOC)

/** ********* Replication configuration ***********/
.define(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG, INT, ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DEFAULT, MEDIUM, ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DOC)
Expand Down Expand Up @@ -1150,6 +1151,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
def logFlushIntervalMs: java.lang.Long = Option(getLong(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG)).getOrElse(getLong(ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG))
def minInSyncReplicas = getInt(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG)
def logPreAllocateEnable: java.lang.Boolean = getBoolean(ServerLogConfigs.LOG_PRE_ALLOCATE_CONFIG)
def logInitialTaskDelayMs: java.lang.Long = Option(getLong(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG)).getOrElse(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT)

// We keep the user-provided String as `MetadataVersion.fromVersionString` can choose a slightly different version (eg if `0.10.0`
// is passed, `0.10.0-IV0` may be picked)
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
Expand Up @@ -127,7 +127,8 @@ class LogLoaderTest {
logDirFailureChannel = logDirFailureChannel,
time = time,
keepPartitionMetadataFile = config.usesTopicId,
remoteStorageSystemEnable = config.remoteLogManagerConfig.enableRemoteStorageSystem()) {
remoteStorageSystemEnable = config.remoteLogManagerConfig.enableRemoteStorageSystem(),
initialTaskDelayMs = config.logInitialTaskDelayMs) {

override def loadLog(logDir: File, hadCleanShutdown: Boolean, recoveryPoints: Map[TopicPartition, Long],
logStartOffsets: Map[TopicPartition, Long], defaultConfig: LogConfig,
Expand Down
13 changes: 8 additions & 5 deletions core/src/test/scala/unit/kafka/log/LogManagerTest.scala
Expand Up @@ -69,12 +69,14 @@ class LogManagerTest {
var logManager: LogManager = _
val name = "kafka"
val veryLargeLogFlushInterval = 10000000L
val initialTaskDelayMs: Long = 10 * 1000

@BeforeEach
def setUp(): Unit = {
logDir = TestUtils.tempDir()
logManager = createLogManager()
logManager.startup(Set.empty)
assertEquals(initialTaskDelayMs, logManager.initialTaskDelayMs)
}

@AfterEach
Expand Down Expand Up @@ -413,7 +415,7 @@ class LogManagerTest {
assertEquals(numMessages * setSize / segmentBytes, log.numberOfSegments, "Check we have the expected number of segments.")

// this cleanup shouldn't find any expired segments but should delete some to reduce size
time.sleep(logManager.InitialTaskDelayMs)
time.sleep(logManager.initialTaskDelayMs)
assertEquals(6, log.numberOfSegments, "Now there should be exactly 6 segments")
time.sleep(log.config.fileDeleteDelayMs + 1)
Comment on lines -416 to 420
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we create a test in LogManagerTest to verify the logManager will start these tasks after customized initialTaskDelayMs? Adding a simple test like what we see here, or maybe we can directly add verification inside these tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we create a test in LogManagerTest to verify the logManager will start these tasks after customized initialTaskDelayMs

Huge thanks ! I'm a little confused about the comment above, the test in LogManagerTest itself verify that tasks like log cleanup, flush logs are triggered after sleeping initialTaskDelayMs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huge thanks ! I'm a little confused about the comment above, the test in LogManagerTest itself verify that tasks like log cleanup, flush logs are triggered after sleeping initialTaskDelayMs.

Yes, and currently, all of them are setting initialTaskDelayMs=LogConfig.DEFAULT_INITIAL_TASK_DELAY_MS. I'm thinking we could have a test and set initialTaskDelayMs to, let's say, 1000, and we can verify the change is adopted by verifying if we sleep only 1000ms, the log cleanup will be triggered.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation !

or maybe we can directly add verification inside these tests?

I decided to follow the comment as you mentioned earlier, and updated the initialTaskDelayMs to 10s in LogManagerTests. Please take another look 😃


Expand Down Expand Up @@ -482,7 +484,7 @@ class LogManagerTest {
val set = TestUtils.singletonRecords("test".getBytes())
log.appendAsLeader(set, leaderEpoch = 0)
}
time.sleep(logManager.InitialTaskDelayMs)
time.sleep(logManager.initialTaskDelayMs)
assertTrue(lastFlush != log.lastFlushTime, "Time based flush should have been triggered")
}

Expand Down Expand Up @@ -604,7 +606,8 @@ class LogManagerTest {
configRepository = configRepository,
logDirs = logDirs,
time = this.time,
recoveryThreadsPerDataDir = recoveryThreadsPerDataDir)
recoveryThreadsPerDataDir = recoveryThreadsPerDataDir,
initialTaskDelayMs = initialTaskDelayMs)
}

@Test
Expand Down Expand Up @@ -637,9 +640,9 @@ class LogManagerTest {
fileInIndex.get.getAbsolutePath)
}

time.sleep(logManager.InitialTaskDelayMs)
time.sleep(logManager.initialTaskDelayMs)
assertTrue(logManager.hasLogsToBeDeleted, "Logs deleted too early")
time.sleep(logManager.currentDefaultConfig.fileDeleteDelayMs - logManager.InitialTaskDelayMs)
time.sleep(logManager.currentDefaultConfig.fileDeleteDelayMs - logManager.initialTaskDelayMs)
assertFalse(logManager.hasLogsToBeDeleted, "Logs not deleted")
}

Expand Down
6 changes: 4 additions & 2 deletions core/src/test/scala/unit/kafka/utils/TestUtils.scala
Expand Up @@ -1506,7 +1506,8 @@ object TestUtils extends Logging {
recoveryThreadsPerDataDir: Int = 4,
transactionVerificationEnabled: Boolean = false,
log: Option[UnifiedLog] = None,
remoteStorageSystemEnable: Boolean = false): LogManager = {
remoteStorageSystemEnable: Boolean = false,
initialTaskDelayMs: Long = ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT): LogManager = {
val logManager = new LogManager(logDirs = logDirs.map(_.getAbsoluteFile),
initialOfflineDirs = Array.empty[File],
configRepository = configRepository,
Expand All @@ -1526,7 +1527,8 @@ object TestUtils extends Logging {
logDirFailureChannel = new LogDirFailureChannel(logDirs.size),
keepPartitionMetadataFile = true,
interBrokerProtocolVersion = interBrokerProtocolVersion,
remoteStorageSystemEnable = remoteStorageSystemEnable)
remoteStorageSystemEnable = remoteStorageSystemEnable,
initialTaskDelayMs = initialTaskDelayMs)

if (log.isDefined) {
val spyLogManager = Mockito.spy(logManager)
Expand Down
Expand Up @@ -22,6 +22,7 @@
import kafka.server.BrokerFeatures;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.storage.internals.log.LogConfig;
Expand Down Expand Up @@ -109,7 +110,7 @@ public void setup() {
JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
this.logManager = TestUtils.createLogManager(JavaConverters.asScalaBuffer(files),
new LogConfig(new Properties()), new MockConfigRepository(), new CleanerConfig(1, 4 * 1024 * 1024L, 0.9d,
1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true), time, MetadataVersion.latestTesting(), 4, false, Option.empty(), false);
1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true), time, MetadataVersion.latestTesting(), 4, false, Option.empty(), false, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT);
scheduler.startup();
final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
final MetadataCache metadataCache =
Expand Down
Expand Up @@ -185,4 +185,9 @@ public class ServerLogConfigs {
"broker will not perform down-conversion for consumers expecting an older message format. The broker responds " +
"with <code>UNSUPPORTED_VERSION</code> error for consume requests from such older clients. This configuration" +
"does not apply to any message format conversion that might be required for replication to followers.";

public static final String LOG_INITIAL_TASK_DELAY_MS_CONFIG = LOG_PREFIX + "initial.task.delay.ms";
public static final long LOG_INITIAL_TASK_DELAY_MS_DEFAULT = 30 * 1000L;
public static final String LOG_INITIAL_TASK_DELAY_MS_DOC = "The initial task delay in millisecond when initializing " +
"tasks in LogManager. This should be used for testing only.";
}