Skip to content

Commit

Permalink
KAFKA-16884 Refactor RemoteLogManagerConfig with AbstractConfig (apac…
Browse files Browse the repository at this point in the history
…he#16199)

Reviewers: Greg Harris <gharris1727@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
muralibasani committed Jun 6, 2024
1 parent 0ed104c commit a41f7a4
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 354 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ object KafkaConfig {
}

/** ********* Remote Log Management Configuration *********/
RemoteLogManagerConfig.CONFIG_DEF.configKeys().values().forEach(key => configDef.define(key))
RemoteLogManagerConfig.configDef().configKeys().values().forEach(key => configDef.define(key))

def configNames: Seq[String] = configDef.names.asScala.toBuffer.sorted
private[server] def defaultValues: Map[String, _] = configDef.defaultValues.asScala
Expand Down Expand Up @@ -590,7 +590,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val zkEnableSecureAcls: Boolean = getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG)
val zkMaxInFlightRequests: Int = getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG)

private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this)
private val _remoteLogManagerConfig = new RemoteLogManagerConfig(props)
def remoteLogManagerConfig = _remoteLogManagerConfig

private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: String): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
Expand Down Expand Up @@ -2754,8 +2753,7 @@ private RemoteLogManagerConfig createRLMConfig(Properties props) {
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataConsumerTestProp, remoteLogMetadataConsumerTestVal);
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataProducerTestProp, remoteLogMetadataProducerTestVal);

AbstractConfig config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props);
return new RemoteLogManagerConfig(config);
return new RemoteLogManagerConfig(props);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.{AbstractConfig, TopicConfig}
import org.apache.kafka.common.config.{TopicConfig}
import org.apache.kafka.common.errors.{InvalidPidMappingException, KafkaStorageException}
import org.apache.kafka.common.message.LeaderAndIsrRequestData
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
Expand Down Expand Up @@ -4093,8 +4093,7 @@ class ReplicaManagerTest {
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
// set log reader threads number to 2
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, 2.toString)
val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props)
val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
val remoteLogManagerConfig = new RemoteLogManagerConfig(props)
val mockLog = mock(classOf[UnifiedLog])
val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.enableRemoteStorageSystem())
val remoteLogManager = new RemoteLogManager(
Expand Down Expand Up @@ -4193,8 +4192,7 @@ class ReplicaManagerTest {
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString)
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props)
val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
val remoteLogManagerConfig = new RemoteLogManagerConfig(props)
val dummyLog = mock(classOf[UnifiedLog])
val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.enableRemoteStorageSystem())
val remoteLogManager = new RemoteLogManager(
Expand Down
Loading

0 comments on commit a41f7a4

Please sign in to comment.