Skip to content

Commit

Permalink
MINOR: AbstractConfig cleanup Part 2 (#15639)
Browse files Browse the repository at this point in the history
Reviewers:  Manikumar Reddy <anikumar.reddy@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
gharris1727 committed Apr 3, 2024
1 parent ba665a4 commit cfc97a1
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
} else {
val (logConfigs, failed) = zkClient.getLogConfigs(
partitionsWithNoLiveInSyncReplicas.iterator.map { case (partition, _) => partition.topic }.toSet,
config.extractLogConfigMap
config.originals()
)

partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderAndIsr) =>
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -739,13 +739,13 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok
val originalLogConfig = logManager.currentDefaultConfig
val originalUncleanLeaderElectionEnable = originalLogConfig.uncleanLeaderElectionEnable
val newBrokerDefaults = new util.HashMap[String, Object](originalLogConfig.originals)
newConfig.extractLogConfigMap.forEach { (k, v) =>
newConfig.valuesFromThisConfig.forEach { (k, v) =>
if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) {
DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName =>
if (v == null)
newBrokerDefaults.remove(configName)
else
newBrokerDefaults.put(configName, v)
newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ class PartitionStateMachineTest {
.thenReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, ResponseMetadata(0, 0))))

when(mockZkClient.getLogConfigs(Set.empty, config.extractLogConfigMap))
when(mockZkClient.getLogConfigs(Set.empty, config.originals()))
.thenReturn((Map(partition.topic -> new LogConfig(new Properties)), Map.empty[String, Exception]))
val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
val updatedLeaderAndIsr = leaderAndIsrAfterElection.withPartitionEpoch(2)
Expand Down Expand Up @@ -434,7 +434,7 @@ class PartitionStateMachineTest {
}
prepareMockToGetTopicPartitionsStatesRaw()
def prepareMockToGetLogConfigs(): Unit = {
when(mockZkClient.getLogConfigs(Set.empty, config.extractLogConfigMap)).thenReturn((Map.empty[String, LogConfig], Map.empty[String, Exception]))
when(mockZkClient.getLogConfigs(Set.empty, config.originals())).thenReturn((Map.empty[String, LogConfig], Map.empty[String, Exception]))
}
prepareMockToGetLogConfigs()

Expand Down

0 comments on commit cfc97a1

Please sign in to comment.