diff --git a/build.gradle b/build.gradle
index 8011589c3f55b..e12b411aeff16 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2577,6 +2577,7 @@ project(':shell') {
implementation project(':core')
implementation project(':metadata')
implementation project(':raft')
+ implementation project(':server')
implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation
implementation libs.jacksonJakartarsJsonProvider
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index b1ef62ca3a26b..acb689091128b 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -284,6 +284,7 @@
+
diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index ad3eaead31261..c8e15162a9fb7 100644
--- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -1982,7 +1982,9 @@ public void testOnNewMetadataImage() {
verify(coordinator0).onLoaded(MetadataImage.EMPTY);
// Publish a new image.
- MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+ MetadataDelta delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build();
MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
runtime.onNewMetadataImage(newImage, delta);
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index e8427fa7e5380..573892af86301 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -242,6 +242,7 @@ class ControllerServer(
setCreateTopicPolicy(createTopicPolicy.toJava).
setAlterConfigPolicy(alterConfigPolicy.toJava).
setConfigurationValidator(new ControllerConfigurationValidator(sharedServer.brokerConfig)).
+ setSupportedConfigChecker(sharedServer.supportedConfigChecker).
setStaticConfig(config.originals).
setBootstrapMetadata(bootstrapMetadata).
setFatalFaultHandler(sharedServer.fatalQuorumControllerFaultHandler).
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala
index 69d2353fb833a..53ab8114104d4 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -30,11 +30,11 @@ import org.apache.kafka.image.loader.MetadataLoader
import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics
import org.apache.kafka.image.publisher.metrics.SnapshotEmitterMetrics
import org.apache.kafka.image.publisher.{SnapshotEmitter, SnapshotGenerator}
-import org.apache.kafka.metadata.ListenerInfo
-import org.apache.kafka.metadata.MetadataRecordSerde
+import org.apache.kafka.metadata.{SupportedConfigChecker, ListenerInfo, MetadataRecordSerde}
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble
import org.apache.kafka.raft.Endpoints
import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
+import org.apache.kafka.server.config.DefaultSupportedConfigChecker
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, ProcessTerminatingFaultHandler}
import org.apache.kafka.server.metrics.{BrokerServerMetrics, KafkaYammerMetrics}
@@ -112,6 +112,7 @@ class SharedServer(
private var usedByController: Boolean = false
val brokerConfig = new KafkaConfig(sharedServerConfig.props, false)
val controllerConfig = new KafkaConfig(sharedServerConfig.props, false)
+ val supportedConfigChecker: SupportedConfigChecker = new DefaultSupportedConfigChecker()
@volatile var metrics: Metrics = _metrics
@volatile var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _
@volatile var brokerMetrics: BrokerServerMetrics = _
@@ -315,7 +316,8 @@ class SharedServer(
setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
setFaultHandler(metadataLoaderFaultHandler).
setHighWaterMarkAccessor(() => _raftManager.client.highWatermark()).
- setMetrics(metadataLoaderMetrics)
+ setMetrics(metadataLoaderMetrics).
+ setSupportedConfigChecker(supportedConfigChecker)
loader = loaderBuilder.build()
snapshotEmitter = new SnapshotEmitter.Builder().
setNodeId(nodeId).
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index c0ce96dd67244..2382ad071f407 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -78,7 +78,9 @@ class LocalLeaderEndPointTest extends Logging {
alterPartitionManager = alterPartitionManager
)
- val delta = new MetadataDelta(MetadataImage.EMPTY)
+ val delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
@@ -254,7 +256,9 @@ class LocalLeaderEndPointTest extends Logging {
}
private def bumpLeaderEpoch(): Unit = {
- val delta = new MetadataDelta(image)
+ val delta = new MetadataDelta.Builder()
+ .setImage(image)
+ .build()
delta.replay(new PartitionChangeRecord()
.setTopicId(topicId)
.setPartitionId(partition)
diff --git a/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala b/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
index fa1209c917eb5..f73f7aa381b85 100644
--- a/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
@@ -38,7 +38,9 @@ class DefaultApiVersionManagerTest {
private val brokerFeatures = BrokerFeatures.createDefault(true)
private val metadataCache = {
val cache = new KRaftMetadataCache(1, () => KRaftVersion.LATEST_PRODUCTION)
- val delta = new MetadataDelta(MetadataImage.EMPTY)
+ val delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.latestProduction().featureLevel())
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index a6c2658963515..3c12844250599 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -227,7 +227,9 @@ class KafkaApisTest extends Logging {
def initializeMetadataCacheWithShareGroupsEnabled(enableShareGroups: Boolean = true): MetadataCache = {
val cache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1)
- val delta = new MetadataDelta(MetadataImage.EMPTY)
+ val delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
@@ -6258,7 +6260,9 @@ class KafkaApisTest extends Logging {
metadataCache = {
val cache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1)
- val delta = new MetadataDelta(MetadataImage.EMPTY)
+ val delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
@@ -6486,7 +6490,9 @@ class KafkaApisTest extends Logging {
metadataCache = {
val cache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1)
- val delta = new MetadataDelta(MetadataImage.EMPTY)
+ val delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
@@ -10066,7 +10072,9 @@ class KafkaApisTest extends Logging {
val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
metadataCache = {
val cache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1)
- val delta = new MetadataDelta(MetadataImage.EMPTY)
+ val delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
@@ -10200,7 +10208,9 @@ class KafkaApisTest extends Logging {
val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
metadataCache = {
val cache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1)
- val delta = new MetadataDelta(MetadataImage.EMPTY)
+ val delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
@@ -10708,7 +10718,9 @@ class KafkaApisTest extends Logging {
expectedResponse.groups.add(expectedDescribedGroup)
metadataCache = {
val cache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1)
- val delta = new MetadataDelta(MetadataImage.EMPTY)
+ val delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
@@ -10871,7 +10883,9 @@ class KafkaApisTest extends Logging {
expectedResponse.groups.add(expectedDescribedGroup)
metadataCache = {
val cache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1)
- val delta = new MetadataDelta(MetadataImage.EMPTY)
+ val delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build()
delta.replay(new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 9a7c18ad079e5..6575bda72df3e 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -214,7 +214,9 @@ class BrokerMetadataPublisherTest {
)
val topicId = Uuid.randomUuid()
- var delta = new MetadataDelta(MetadataImage.EMPTY)
+ var delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build()
delta.replay(new TopicRecord()
.setName(Topic.GROUP_METADATA_TOPIC_NAME)
.setTopicId(topicId)
@@ -231,7 +233,9 @@ class BrokerMetadataPublisherTest {
)
val image = delta.apply(MetadataProvenance.EMPTY)
- delta = new MetadataDelta(image)
+ delta = new MetadataDelta.Builder()
+ .setImage(image)
+ .build()
delta.replay(new RemoveTopicRecord()
.setTopicId(topicId)
)
@@ -339,7 +343,9 @@ class BrokerMetadataPublisherTest {
)
// Share version 1 is getting passed to features delta.
- val delta = new MetadataDelta(image)
+ val delta = new MetadataDelta.Builder()
+ .setImage(image)
+ .build()
delta.replay(new FeatureLevelRecord().setName(ShareVersion.FEATURE_NAME).setFeatureLevel(1))
metadataPublisher.onMetadataUpdate(
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 938809c5e7b2b..2db3ef27157f8 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -1089,7 +1089,9 @@ private void cancelGroupSizeCounter() {
*/
@Override
public void onLoaded(MetadataImage newImage) {
- MetadataDelta emptyDelta = new MetadataDelta(newImage);
+ MetadataDelta emptyDelta = new MetadataDelta.Builder()
+ .setImage(newImage)
+ .build();
groupMetadataManager.onNewMetadataImage(newImage, emptyDelta);
coordinatorMetrics.activateMetricsShard(metricsShard);
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 01c87696053a2..bd2fb846615c9 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -3152,7 +3152,12 @@ public void testOnPartitionsDeleted() {
.addTopic(Uuid.randomUuid(), "foo", 1)
.build();
- service.onNewMetadataImage(image, new MetadataDelta(image));
+ service.onNewMetadataImage(
+ image,
+ new MetadataDelta.Builder()
+ .setImage(image)
+ .build()
+ );
when(runtime.scheduleWriteAllOperation(
ArgumentMatchers.eq("on-partition-deleted"),
@@ -3210,7 +3215,12 @@ public void testOnPartitionsDeletedCleanupShareGroupState() {
.addTopic(Uuid.randomUuid(), "foo", 1)
.build();
- service.onNewMetadataImage(image, new MetadataDelta(image));
+ service.onNewMetadataImage(
+ image,
+ new MetadataDelta.Builder()
+ .setImage(image)
+ .build()
+ );
// No error in partition deleted callback
when(runtime.scheduleWriteAllOperation(
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index e9069f12f8c2c..673d41b0b709b 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -815,7 +815,9 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage))
context.groupMetadataManager.onNewMetadataImage(
newMetadataImage,
- new MetadataDelta(newMetadataImage)
+ new MetadataDelta.Builder()
+ .setImage(newMetadataImage)
+ .build()
);
// If a topic is updated, related topic hash is cleanup.
assertEquals(Map.of(), context.groupMetadataManager.topicHashCache());
@@ -900,13 +902,17 @@ public void testRemoveTopicCleanupTopicHash() {
));
// Remove foo topic from metadata image.
- MetadataDelta delta = new MetadataDelta(metadataImage);
+ MetadataDelta delta = new MetadataDelta.Builder()
+ .setImage(metadataImage)
+ .build();
delta.replay(new RemoveTopicRecord().setTopicId(fooTopicId));
MetadataImage newMetadataImage = delta.apply(MetadataProvenance.EMPTY);
context.groupMetadataManager.onNewMetadataImage(
newMetadataImage,
- new MetadataDelta(newMetadataImage)
+ new MetadataDelta.Builder()
+ .setImage(newMetadataImage)
+ .build()
);
// If a topic is removed, related topic hash is cleanup.
assertEquals(Map.of(), context.groupMetadataManager.topicHashCache());
@@ -3401,7 +3407,9 @@ public void testOnNewMetadataImageWithEmptyDelta() {
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new MockPartitionAssignor("range")))
.build();
- MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+ MetadataDelta delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build();
MetadataImage image = delta.apply(MetadataProvenance.EMPTY);
context.groupMetadataManager.onNewMetadataImage(image, delta);
@@ -3458,7 +3466,9 @@ public void testOnNewMetadataImage() {
Uuid topicE = Uuid.randomUuid();
// Create a first base image with topic a, b, c and d.
- MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+ MetadataDelta delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build();
delta.replay(new TopicRecord().setTopicId(topicA).setName("a"));
delta.replay(new PartitionRecord().setTopicId(topicA).setPartitionId(0));
delta.replay(new TopicRecord().setTopicId(topicB).setName("b"));
@@ -3470,7 +3480,9 @@ public void testOnNewMetadataImage() {
MetadataImage image = delta.apply(MetadataProvenance.EMPTY);
// Create a delta which updates topic B, deletes topic D and creates topic E.
- delta = new MetadataDelta(image);
+ delta = new MetadataDelta.Builder()
+ .setImage(image)
+ .build();
delta.replay(new PartitionRecord().setTopicId(topicB).setPartitionId(2));
delta.replay(new RemoveTopicRecord().setTopicId(topicD));
delta.replay(new TopicRecord().setTopicId(topicE).setName("e"));
@@ -18535,7 +18547,9 @@ public void testStreamsOnNewMetadataImage() {
Uuid topicE = Uuid.randomUuid();
// Create a first base image with topic a, b, c and d.
- MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+ MetadataDelta delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build();
delta.replay(new TopicRecord().setTopicId(topicA).setName("a"));
delta.replay(new PartitionRecord().setTopicId(topicA).setPartitionId(0));
delta.replay(new TopicRecord().setTopicId(topicB).setName("b"));
@@ -18547,7 +18561,9 @@ public void testStreamsOnNewMetadataImage() {
MetadataImage image = delta.apply(MetadataProvenance.EMPTY);
// Create a delta which updates topic B, deletes topic D and creates topic E.
- delta = new MetadataDelta(image);
+ delta = new MetadataDelta.Builder()
+ .setImage(image)
+ .build();
delta.replay(new PartitionRecord().setTopicId(topicB).setPartitionId(2));
delta.replay(new RemoveTopicRecord().setTopicId(topicD));
delta.replay(new TopicRecord().setTopicId(topicE).setName("e"));
@@ -20724,7 +20740,9 @@ public void testConsumerGroupMemberJoinRefreshesExpiredRegexes() {
context.groupMetadataManager.onNewMetadataImage(
newImage,
- new MetadataDelta(newImage)
+ new MetadataDelta.Builder()
+ .setImage(newImage)
+ .build()
);
// A member heartbeats.
@@ -21915,7 +21933,9 @@ public void testShareGroupDeleteRequestNoDeletingTopics() {
.addTopic(t2Uuid, t2Name, 2)
.build();
- MetadataDelta delta = new MetadataDelta(image);
+ MetadataDelta delta = new MetadataDelta.Builder()
+ .setImage(image)
+ .build();
context.groupMetadataManager.onNewMetadataImage(image, delta);
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0, 0));
@@ -21982,7 +22002,9 @@ public void testShareGroupDeleteRequestWithAlreadyDeletingTopics() {
.addTopic(t3Uuid, t3Name, 2)
.build();
- MetadataDelta delta = new MetadataDelta(image);
+ MetadataDelta delta = new MetadataDelta.Builder()
+ .setImage(image)
+ .build();
context.groupMetadataManager.onNewMetadataImage(image, delta);
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0, 0));
@@ -23074,7 +23096,12 @@ topicId, new InitMapValue(topicName, Set.of(0), timeNow)
.addTopic(t3Id, t3Name, 3)
.build();
- context.groupMetadataManager.onNewMetadataImage(metadataImage, new MetadataDelta(metadataImage));
+ context.groupMetadataManager.onNewMetadataImage(
+ metadataImage,
+ new MetadataDelta.Builder()
+ .setImage(metadataImage)
+ .build()
+ );
// Since t1 is initializing and t2 is initialized due to replay above.
timeNow = timeNow + 2 * offsetWriteTimeout + 1;
@@ -23120,7 +23147,9 @@ public void testUninitializeTopics() {
.addTopic(t2Id, t2Name, 3)
.build();
- MetadataDelta delta = new MetadataDelta(image);
+ MetadataDelta delta = new MetadataDelta.Builder()
+ .setImage(image)
+ .build();
context.groupMetadataManager.onNewMetadataImage(image, delta);
// Cleanup happens from initialzing state only.
@@ -23203,7 +23232,9 @@ public void testMaybeCleanupShareGroupStateInitDeletedTopicsPresent() {
.addTopic(t5Id, t5Name, 3)
.build();
- MetadataDelta delta = new MetadataDelta(image);
+ MetadataDelta delta = new MetadataDelta.Builder()
+ .setImage(image)
+ .build();
context.groupMetadataManager.onNewMetadataImage(image, delta);
context.groupMetadataManager.replay(
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MetadataImageBuilder.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MetadataImageBuilder.java
index 23a01a6024176..0d5a26a8dc0a7 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MetadataImageBuilder.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MetadataImageBuilder.java
@@ -34,7 +34,9 @@ public MetadataImageBuilder() {
}
public MetadataImageBuilder(MetadataImage image) {
- this.delta = new MetadataDelta(image);
+ this.delta = new MetadataDelta.Builder()
+ .setImage(image)
+ .build();
}
public MetadataImageBuilder addTopic(
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
index eff9cd10f8ce1..932fd68dd418c 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
@@ -100,7 +100,9 @@ public static MetadataImage createMetadataImage(
List allTopicNames,
int partitionsPerTopic
) {
- MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+ MetadataDelta delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build();
for (String topicName : allTopicNames) {
AssignorBenchmarkUtils.addTopic(
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
index 35ee42836c820..79d16ab2c556a 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java
@@ -95,7 +95,9 @@ public class RegexResolutionBenchmark {
public void setup() {
Random random = new Random();
- MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+ MetadataDelta delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build();
for (int i = 0; i < topicCount; i++) {
String topicName =
WORDS.get(random.nextInt(WORDS.size())) + "_" +
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
index b1986f639a739..fed08a4ebe340 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
@@ -82,7 +82,9 @@ public class TransactionalOffsetFetchBenchmark {
@Setup(Level.Trial)
public void setup() {
LogContext logContext = new LogContext();
- MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+ MetadataDelta delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build();
delta.replay(new TopicRecord()
.setTopicId(Uuid.randomUuid())
.setName(TOPIC_NAME));
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index 5064269938207..440e10ac3fd14 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -138,7 +138,9 @@ public void setup() {
}
private void initializeMetadataCache() {
- MetadataDelta buildupMetadataDelta = new MetadataDelta(MetadataImage.EMPTY);
+ MetadataDelta buildupMetadataDelta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build();
IntStream.range(0, 5).forEach(brokerId -> {
RegisterBrokerRecord.BrokerEndpointCollection endpoints = new RegisterBrokerRecord.BrokerEndpointCollection();
endpoints(brokerId).forEach(endpoint -> endpoints.add(endpoint));
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index d64695bd39b52..81471e8e3fc8b 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -31,6 +31,7 @@
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.KafkaConfigSchema;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.mutable.BoundedList;
@@ -76,6 +77,7 @@ public class ConfigurationControlManager {
private final Map staticConfig;
private final ConfigResource currentController;
private final FeatureControlManager featureControl;
+ private final SupportedConfigChecker supportedConfigChecker;
static class Builder {
private LogContext logContext = null;
@@ -87,6 +89,7 @@ static class Builder {
private Map staticConfig = Map.of();
private int nodeId = 0;
private FeatureControlManager featureControl = null;
+ private SupportedConfigChecker supportedConfigChecker = SupportedConfigChecker.TRUE;
Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
@@ -133,6 +136,11 @@ Builder setFeatureControl(FeatureControlManager featureControl) {
return this;
}
+ Builder setSupportedConfigChecker(SupportedConfigChecker supportedConfigChecker) {
+ this.supportedConfigChecker = supportedConfigChecker;
+ return this;
+ }
+
ConfigurationControlManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
@@ -151,7 +159,8 @@ ConfigurationControlManager build() {
validator,
staticConfig,
nodeId,
- featureControl);
+ featureControl,
+ supportedConfigChecker);
}
}
@@ -163,7 +172,8 @@ private ConfigurationControlManager(LogContext logContext,
ConfigurationValidator validator,
Map staticConfig,
int nodeId,
- FeatureControlManager featureControl
+ FeatureControlManager featureControl,
+ SupportedConfigChecker supportedConfigChecker
) {
this.log = logContext.logger(ConfigurationControlManager.class);
this.snapshotRegistry = snapshotRegistry;
@@ -176,6 +186,7 @@ private ConfigurationControlManager(LogContext logContext,
this.staticConfig = Map.copyOf(staticConfig);
this.currentController = new ConfigResource(Type.BROKER, Integer.toString(nodeId));
this.featureControl = featureControl;
+ this.supportedConfigChecker = supportedConfigChecker;
}
SnapshotRegistry snapshotRegistry() {
@@ -511,6 +522,13 @@ private List getParts(String value, String key, ConfigResource configRes
public void replay(ConfigRecord record) {
Type type = Type.forId(record.resourceType());
ConfigResource configResource = new ConfigResource(type, record.resourceName());
+
+ if (!supportedConfigChecker.isSupported(configResource.type(), record.name())) {
+ // We skip unsupported configs during replay. This can happen when the config was
+ // deprecated and removed, but old records still exist in the log.
+ log.info("Skipping unsupported config {} for resource {} during replay", record.name(), configResource);
+ return;
+ }
TimelineHashMap configs = configData.get(configResource);
if (configs == null) {
configs = new TimelineHashMap<>(snapshotRegistry, 0);
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 7dee1e3cd3de1..a3e51301d3eb1 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -100,6 +100,7 @@
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.KafkaConfigSchema;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.placement.ReplicaPlacer;
@@ -209,6 +210,7 @@ public static class Builder {
private Optional createTopicPolicy = Optional.empty();
private Optional alterConfigPolicy = Optional.empty();
private ConfigurationValidator configurationValidator = ConfigurationValidator.NO_OP;
+ private SupportedConfigChecker supportedConfigChecker = SupportedConfigChecker.TRUE;
private Map staticConfig = Map.of();
private BootstrapMetadata bootstrapMetadata = null;
private int maxRecordsPerBatch = DEFAULT_MAX_RECORDS_PER_BATCH;
@@ -345,6 +347,11 @@ public Builder setConfigurationValidator(ConfigurationValidator configurationVal
return this;
}
+ public Builder setSupportedConfigChecker(SupportedConfigChecker supportedConfigChecker) {
+ this.supportedConfigChecker = supportedConfigChecker;
+ return this;
+ }
+
public Builder setStaticConfig(Map staticConfig) {
this.staticConfig = staticConfig;
return this;
@@ -429,6 +436,7 @@ public QuorumController build() throws Exception {
createTopicPolicy,
alterConfigPolicy,
configurationValidator,
+ supportedConfigChecker,
staticConfig,
bootstrapMetadata,
maxRecordsPerBatch,
@@ -1473,6 +1481,7 @@ private QuorumController(
Optional createTopicPolicy,
Optional alterConfigPolicy,
ConfigurationValidator configurationValidator,
+ SupportedConfigChecker supportedConfigChecker,
Map staticConfig,
BootstrapMetadata bootstrapMetadata,
int maxRecordsPerBatch,
@@ -1535,6 +1544,7 @@ private QuorumController(
setStaticConfig(staticConfig).
setNodeId(nodeId).
setFeatureControl(featureControl).
+ setSupportedConfigChecker(supportedConfigChecker).
build();
this.producerIdControlManager = new ProducerIdControlManager.Builder().
setLogContext(logContext).
diff --git a/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java b/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java
index dc550d8c72aa1..63e6c2b270cfe 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java
@@ -17,7 +17,9 @@
package org.apache.kafka.image;
+import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import java.util.HashMap;
import java.util.Map;
@@ -29,11 +31,14 @@
* Represents changes to the configurations in the metadata image.
*/
public final class ConfigurationDelta {
+
private final ConfigurationImage image;
private final Map> changes = new HashMap<>();
+ private final SupportedConfigChecker supportedConfigChecker;
- public ConfigurationDelta(ConfigurationImage image) {
+ public ConfigurationDelta(ConfigurationImage image, SupportedConfigChecker supportedConfigChecker) {
this.image = image;
+ this.supportedConfigChecker = supportedConfigChecker;
}
public void finishSnapshot() {
@@ -45,6 +50,12 @@ public void finishSnapshot() {
}
public void replay(ConfigRecord record) {
+ ConfigResource.Type type = ConfigResource.Type.forId(record.resourceType());
+ if (!supportedConfigChecker.isSupported(type, record.name())) {
+ // We skip unsupported configs during replay. This can happen when the config was
+ // deprecated and removed, but old records still exist in the log.
+ return;
+ }
changes.put(record.name(), Optional.ofNullable(record.value()));
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
index 0b3fcbb386722..4d50abbbb2f17 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
@@ -21,6 +21,7 @@
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.HashMap;
@@ -34,9 +35,11 @@
public final class ConfigurationsDelta {
private final ConfigurationsImage image;
private final Map changes = new HashMap<>();
+ private final SupportedConfigChecker supportedConfigChecker;
- public ConfigurationsDelta(ConfigurationsImage image) {
+ public ConfigurationsDelta(ConfigurationsImage image, SupportedConfigChecker supportedConfigChecker) {
this.image = image;
+ this.supportedConfigChecker = supportedConfigChecker;
}
public Map changes() {
@@ -48,7 +51,7 @@ public void finishSnapshot() {
ConfigResource resource = entry.getKey();
ConfigurationImage configImage = entry.getValue();
ConfigurationDelta configDelta = changes.computeIfAbsent(resource,
- __ -> new ConfigurationDelta(configImage));
+ __ -> new ConfigurationDelta(configImage, supportedConfigChecker));
configDelta.finishSnapshot();
}
}
@@ -63,7 +66,7 @@ public void replay(ConfigRecord record) {
ConfigurationImage configImage = image.resourceData().getOrDefault(resource,
new ConfigurationImage(resource, Map.of()));
ConfigurationDelta delta = changes.computeIfAbsent(resource,
- __ -> new ConfigurationDelta(configImage));
+ __ -> new ConfigurationDelta(configImage, supportedConfigChecker));
delta.replay(record);
}
@@ -73,7 +76,7 @@ public void replay(RemoveTopicRecord record, String topicName) {
if (image.resourceData().containsKey(resource)) {
ConfigurationImage configImage = image.resourceData().get(resource);
ConfigurationDelta delta = changes.computeIfAbsent(resource,
- __ -> new ConfigurationDelta(configImage));
+ __ -> new ConfigurationDelta(configImage, supportedConfigChecker));
delta.deleteAll();
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
index b934d10f6d10d..36d68dff5a278 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
@@ -40,6 +40,7 @@
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.Optional;
@@ -51,19 +52,27 @@
public final class MetadataDelta {
public static class Builder {
private MetadataImage image = MetadataImage.EMPTY;
+ private SupportedConfigChecker supportedConfigChecker = SupportedConfigChecker.TRUE;
public Builder setImage(MetadataImage image) {
this.image = image;
return this;
}
+ public Builder setSupportedConfigChecker(SupportedConfigChecker supportedConfigChecker) {
+ this.supportedConfigChecker = supportedConfigChecker;
+ return this;
+ }
+
public MetadataDelta build() {
- return new MetadataDelta(image);
+ return new MetadataDelta(image, supportedConfigChecker);
}
}
private final MetadataImage image;
+ private final SupportedConfigChecker supportedConfigChecker;
+
private FeaturesDelta featuresDelta = null;
private ClusterDelta clusterDelta = null;
@@ -82,8 +91,9 @@ public MetadataDelta build() {
private DelegationTokenDelta delegationTokenDelta = null;
- public MetadataDelta(MetadataImage image) {
+ private MetadataDelta(MetadataImage image, SupportedConfigChecker supportedConfigChecker) {
this.image = image;
+ this.supportedConfigChecker = supportedConfigChecker;
}
public MetadataImage image() {
@@ -122,7 +132,7 @@ public ConfigurationsDelta configsDelta() {
}
public ConfigurationsDelta getOrCreateConfigsDelta() {
- if (configsDelta == null) configsDelta = new ConfigurationsDelta(image.configs());
+ if (configsDelta == null) configsDelta = new ConfigurationsDelta(image.configs(), supportedConfigChecker);
return configsDelta;
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
index c4b6286f2a902..f8cfa2005ee8b 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
@@ -23,6 +23,7 @@
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -58,6 +59,7 @@ public interface MetadataUpdater {
private final Time time;
private final FaultHandler faultHandler;
private final MetadataUpdater callback;
+ private final SupportedConfigChecker supportedConfigChecker;
private MetadataImage image;
private MetadataDelta delta;
@@ -74,12 +76,14 @@ public MetadataBatchLoader(
LogContext logContext,
Time time,
FaultHandler faultHandler,
- MetadataUpdater callback
+ MetadataUpdater callback,
+ SupportedConfigChecker supportedConfigChecker
) {
this.log = logContext.logger(MetadataBatchLoader.class);
this.time = time;
this.faultHandler = faultHandler;
this.callback = callback;
+ this.supportedConfigChecker = supportedConfigChecker;
this.resetToImage(MetadataImage.EMPTY);
this.hasSeenRecord = false;
}
@@ -101,7 +105,10 @@ public boolean hasSeenRecord() {
public final void resetToImage(MetadataImage image) {
this.image = image;
this.hasSeenRecord = !image.isEmpty();
- this.delta = new MetadataDelta.Builder().setImage(image).build();
+ this.delta = new MetadataDelta.Builder()
+ .setImage(image)
+ .setSupportedConfigChecker(supportedConfigChecker)
+ .build();
this.transactionState = TransactionState.NO_TRANSACTION;
this.lastOffset = image.provenance().lastContainedOffset();
this.lastEpoch = image.provenance().lastContainedEpoch();
@@ -199,7 +206,10 @@ public void maybeFlushBatches(LeaderAndEpoch leaderAndEpoch, boolean isOffsetBat
log.debug("handleCommit: publishing empty delta between {} and {} from {} batch(es) " +
"since a transaction was aborted", image.offset(), manifest.provenance().lastContainedOffset(),
manifest.numBatches());
- applyDeltaAndUpdate(new MetadataDelta.Builder().setImage(image).build(), manifest);
+ applyDeltaAndUpdate(new MetadataDelta.Builder()
+ .setImage(image)
+ .setSupportedConfigChecker(supportedConfigChecker)
+ .build(), manifest);
break;
case ENDED_TRANSACTION:
case NO_TRANSACTION:
diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index 4fabd1863b637..4562238916eb7 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -26,6 +26,7 @@
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.image.writer.ImageReWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
@@ -76,6 +77,7 @@ public static class Builder {
private FaultHandler faultHandler = FaultHandlerException::new;
private MetadataLoaderMetrics metrics = null;
private Supplier highWaterMarkAccessor = null;
+ private SupportedConfigChecker supportedConfigChecker = SupportedConfigChecker.TRUE;
public Builder setNodeId(int nodeId) {
this.nodeId = nodeId;
@@ -107,6 +109,11 @@ public Builder setMetrics(MetadataLoaderMetrics metrics) {
return this;
}
+ public Builder setSupportedConfigChecker(SupportedConfigChecker supportedConfigChecker) {
+ this.supportedConfigChecker = supportedConfigChecker;
+ return this;
+ }
+
public MetadataLoader build() {
if (logContext == null) {
logContext = new LogContext("[MetadataLoader id=" + nodeId + "] ");
@@ -126,7 +133,8 @@ public MetadataLoader build() {
threadNamePrefix,
faultHandler,
metrics,
- highWaterMarkAccessor);
+ highWaterMarkAccessor,
+ supportedConfigChecker);
}
}
@@ -190,19 +198,26 @@ public MetadataLoader build() {
*/
private final KafkaEventQueue eventQueue;
+ /**
+ * Config checker for filtering unsupported configurations.
+ */
+ private final SupportedConfigChecker supportedConfigChecker;
+
private MetadataLoader(
Time time,
LogContext logContext,
String threadNamePrefix,
FaultHandler faultHandler,
MetadataLoaderMetrics metrics,
- Supplier highWaterMarkAccessor
+ Supplier highWaterMarkAccessor,
+ SupportedConfigChecker supportedConfigChecker
) {
this.log = logContext.logger(MetadataLoader.class);
this.time = time;
this.faultHandler = faultHandler;
this.metrics = metrics;
this.highWaterMarkAccessor = highWaterMarkAccessor;
+ this.supportedConfigChecker = supportedConfigChecker;
this.uninitializedPublishers = new LinkedHashMap<>();
this.publishers = new LinkedHashMap<>();
this.image = MetadataImage.EMPTY;
@@ -210,7 +225,8 @@ private MetadataLoader(
logContext,
time,
faultHandler,
- this::maybePublishMetadata);
+ this::maybePublishMetadata,
+ supportedConfigChecker);
this.eventQueue = new KafkaEventQueue(
Time.SYSTEM,
logContext,
@@ -289,6 +305,7 @@ void initializeNewPublishers() {
// haven't seen anything previously.
MetadataDelta delta = new MetadataDelta.Builder().
setImage(MetadataImage.EMPTY).
+ setSupportedConfigChecker(supportedConfigChecker).
build();
ImageReWriter writer = new ImageReWriter(delta);
image.write(writer, new ImageWriterOptions.Builder(image.features().metadataVersionOrThrow()).
@@ -381,6 +398,7 @@ public void handleLoadSnapshot(SnapshotReader reader) {
snapshotName, numLoaded);
MetadataDelta delta = new MetadataDelta.Builder().
setImage(image).
+ setSupportedConfigChecker(supportedConfigChecker).
build();
SnapshotManifest manifest = loadSnapshot(delta, reader);
log.info("handleLoadSnapshot({}): generated a metadata delta between offset {} " +
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/SupportedConfigChecker.java b/metadata/src/main/java/org/apache/kafka/metadata/SupportedConfigChecker.java
new file mode 100644
index 0000000000000..df9bf984d7d86
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/SupportedConfigChecker.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.config.ConfigResource;
+
+/**
+ * Interface for checking if a configuration name is supported for a given resource type.
+ */
+@FunctionalInterface
+public interface SupportedConfigChecker {
+ /**
+ * Check if a configuration name is supported for the given resource type.
+ *
+ * @param resourceType the type of resource (broker, topic, user, etc.)
+ * @param configName the name of the configuration
+ * @return true if the configuration is supported for the resource type, false otherwise
+ */
+ boolean isSupported(ConfigResource.Type resourceType, String configName);
+
+ /**
+ * A SupportedConfigChecker that always returns true, accepting all configurations.
+ */
+ SupportedConfigChecker TRUE = (resourceType, configName) -> true;
+}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 2c93d1100ecae..81d8a30c65a6b 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -30,6 +30,7 @@
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
@@ -561,4 +562,63 @@ private FeatureControlManager createFeatureControlManager() {
setFeatureLevel(MetadataVersion.LATEST_PRODUCTION.featureLevel()));
return featureControlManager;
}
+
+ @Test
+ public void testValidateAlterConfigWithInvalidExistingConfigs() {
+ Set validConfigs = Set.of("abc", "def");
+ SupportedConfigChecker supportedConfigChecker = (resourceType, configName) -> validConfigs.contains(configName);
+
+ ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
+ setFeatureControl(createFeatureControlManager()).
+ setKafkaConfigSchema(SCHEMA).
+ setSupportedConfigChecker(supportedConfigChecker).
+ build();
+
+ manager.replay(new ConfigRecord().
+ setResourceType(TOPIC.id()).setResourceName("mytopic").
+ setName("abc").setValue("value1")); // valid
+ manager.replay(new ConfigRecord().
+ setResourceType(TOPIC.id()).setResourceName("mytopic").
+ setName("invalid.config").setValue("should-be-filtered")); // invalid, filtered in replay()
+
+ Map configs = manager.getConfigs(MYTOPIC);
+ assertTrue(configs.containsKey("abc"), "Valid config should be in configData");
+ assertFalse(configs.containsKey("invalid.config"), "Invalid config should be filtered out in replay()");
+
+ ControllerResult result = manager.incrementalAlterConfig(
+ MYTOPIC,
+ toMap(entry("def", entry(SET, "newValue"))),
+ false);
+
+ assertEquals(ApiError.NONE, result.response());
+ }
+
+ @Test
+ public void testReplayFiltersInvalidConfigs() {
+ Set validConfigs = Set.of("abc", "def", "ghi");
+ SupportedConfigChecker supportedConfigChecker = (resourceType, configName) -> validConfigs.contains(configName);
+
+ ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
+ setKafkaConfigSchema(SCHEMA).
+ setSupportedConfigChecker(supportedConfigChecker).
+ build();
+
+ // Replay valid configs
+ manager.replay(new ConfigRecord().
+ setResourceType(TOPIC.id()).setResourceName("mytopic").
+ setName("abc").setValue("value1"));
+ manager.replay(new ConfigRecord().
+ setResourceType(TOPIC.id()).setResourceName("mytopic").
+ setName("def").setValue("value2"));
+
+ manager.replay(new ConfigRecord().
+ setResourceType(TOPIC.id()).setResourceName("mytopic").
+ setName("invalid.config").setValue("should-be-filtered"));
+
+ Map configs = manager.getConfigs(MYTOPIC);
+ assertEquals(2, configs.size(), "Should only have valid configs");
+ assertTrue(configs.containsKey("abc"));
+ assertTrue(configs.containsKey("def"));
+ assertFalse(configs.containsKey("invalid.config"), "Invalid config should not be in configData");
+ }
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 4a2e823242566..56744788ff87c 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -97,6 +97,7 @@
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.RecordTestUtils.ImageDeltaPair;
import org.apache.kafka.metadata.RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.util.BatchFileWriter;
import org.apache.kafka.metalog.LocalLogManager;
@@ -1651,7 +1652,7 @@ private static void testToImages(List fromRecords) {
new ImageDeltaPair<>(() -> AclsImage.EMPTY, AclsDelta::new),
new ImageDeltaPair<>(() -> ClientQuotasImage.EMPTY, ClientQuotasDelta::new),
new ImageDeltaPair<>(() -> ClusterImage.EMPTY, ClusterDelta::new),
- new ImageDeltaPair<>(() -> ConfigurationsImage.EMPTY, ConfigurationsDelta::new),
+ new ImageDeltaPair<>(() -> ConfigurationsImage.EMPTY, image -> new ConfigurationsDelta(image, SupportedConfigChecker.TRUE)),
new ImageDeltaPair<>(() -> DelegationTokenImage.EMPTY, DelegationTokenDelta::new),
new ImageDeltaPair<>(() -> FeaturesImage.EMPTY, FeaturesDelta::new),
new ImageDeltaPair<>(() -> ProducerIdsImage.EMPTY, ProducerIdsDelta::new),
diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java
index ffde7b86be91e..6a743b026eba6 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java
@@ -144,7 +144,9 @@ static LoaderManifest fakeManifest(boolean isSnapshot) {
@Test
public void testLoadSnapshot() {
try (TestEnv env = new TestEnv()) {
- MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+ MetadataDelta delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build();
ImageReWriter writer = new ImageReWriter(delta);
IMAGE1.write(writer, new ImageWriterOptions.Builder(MetadataVersion.MINIMUM_VERSION).build());
env.publisher.onMetadataUpdate(delta, IMAGE1, fakeManifest(true));
diff --git a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
index 6e01c4dbf0384..a5345608d3327 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
@@ -22,6 +22,7 @@
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
@@ -33,10 +34,13 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
@@ -82,7 +86,7 @@ public class ConfigurationsImageTest {
setResourceName("2").setName("foo").setValue("bar"),
CONFIG_RECORD.highestSupportedVersion()));
- DELTA1 = new ConfigurationsDelta(IMAGE1);
+ DELTA1 = new ConfigurationsDelta(IMAGE1, SupportedConfigChecker.TRUE);
RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
Map map2 = new HashMap<>();
@@ -118,6 +122,43 @@ public void testImage2RoundTrip() {
testToImage(IMAGE2);
}
+ @Test
+ public void testConfigurationDeltaFiltering() {
+ Set validConfigs = Set.of("foo", "bar");
+ SupportedConfigChecker supportedConfigChecker = (resourceType, configName) -> validConfigs.contains(configName);
+
+ Map initialConfigs = Map.of("foo", "value1"); // valid
+ ConfigurationImage image = new ConfigurationImage(new ConfigResource(BROKER, "0"), initialConfigs);
+
+ ConfigurationDelta delta = new ConfigurationDelta(image, supportedConfigChecker);
+ delta.replay(new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0")
+ .setName("bar").setValue("value2"));
+ delta.replay(new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0")
+ .setName("qux").setValue("value3"));
+
+ ConfigurationImage result = delta.apply();
+
+ assertTrue(result.data().containsKey("foo"));
+ assertTrue(result.data().containsKey("bar"));
+ assertFalse(result.data().containsKey("qux"));
+ }
+
+ @Test
+ public void testConfigurationDeltaWithoutFiltering() {
+ Map initialConfigs = Map.of("foo", "value1", "bar", "value2");
+ ConfigurationImage image = new ConfigurationImage(new ConfigResource(BROKER, "0"), initialConfigs);
+
+ ConfigurationDelta delta = new ConfigurationDelta(image, SupportedConfigChecker.TRUE);
+ delta.replay(new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0")
+ .setName("baz").setValue("value3"));
+
+ ConfigurationImage result = delta.apply();
+
+ assertTrue(result.data().containsKey("foo"));
+ assertTrue(result.data().containsKey("bar"));
+ assertTrue(result.data().containsKey("baz"));
+ }
+
private static void testToImage(ConfigurationsImage image) {
testToImage(image, Optional.empty());
}
@@ -130,7 +171,7 @@ private static void testToImage(ConfigurationsImage image, List(
() -> ConfigurationsImage.EMPTY,
- ConfigurationsDelta::new
+ img -> new ConfigurationsDelta(img, SupportedConfigChecker.TRUE)
).test(image, fromRecords);
}
diff --git a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
index d4a091da809cb..5ee4ad79ec08b 100644
--- a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
@@ -140,7 +140,7 @@ private static void testToImage(MetadataImage image, List
// test from empty image stopping each of the various intermediate images along the way
new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>(
() -> MetadataImage.EMPTY,
- MetadataDelta::new
+ img -> new MetadataDelta.Builder().setImage(img).build()
) {
@Override
public MetadataImage createImageByApplyingDelta(MetadataDelta delta) {
diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataBatchLoaderTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataBatchLoaderTest.java
index 3df1844ca9cad..ebe4365b27fd5 100644
--- a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataBatchLoaderTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataBatchLoaderTest.java
@@ -18,8 +18,10 @@
package org.apache.kafka.image.loader;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.EndTransactionRecord;
import org.apache.kafka.common.metadata.NoOpRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
@@ -28,6 +30,7 @@
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -40,6 +43,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalInt;
+import java.util.Properties;
import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -148,7 +152,8 @@ public void testAlignedTransactionBatches() {
new LogContext(),
new MockTime(),
new MockFaultHandler("testAlignedTransactionBatches"),
- updater
+ updater,
+ SupportedConfigChecker.TRUE
);
batchLoader.resetToImage(MetadataImage.EMPTY);
@@ -184,7 +189,8 @@ public void testSingletonBeginAndEnd() {
new LogContext(),
new MockTime(),
new MockFaultHandler("testSingletonBeginAndEnd"),
- updater
+ updater,
+ SupportedConfigChecker.TRUE
);
// All in one commit
@@ -233,7 +239,8 @@ public void testUnexpectedBeginTransaction() {
new LogContext(),
new MockTime(),
faultHandler,
- updater
+ updater,
+ SupportedConfigChecker.TRUE
);
Batch batch1 = Batch.data(
@@ -263,7 +270,8 @@ public void testUnexpectedEndTransaction() {
new LogContext(),
new MockTime(),
faultHandler,
- updater
+ updater,
+ SupportedConfigChecker.TRUE
);
// First batch gets loaded fine
@@ -296,7 +304,8 @@ public void testUnexpectedAbortTransaction() {
new LogContext(),
new MockTime(),
faultHandler,
- updater
+ updater,
+ SupportedConfigChecker.TRUE
);
// First batch gets loaded fine
@@ -333,7 +342,8 @@ private MetadataBatchLoader loadSingleBatch(
new LogContext(),
new MockTime(),
faultHandler,
- updater
+ updater,
+ SupportedConfigChecker.TRUE
);
batchLoader.resetToImage(MetadataImage.EMPTY);
@@ -417,7 +427,8 @@ public void testOneTransactionInMultipleBatches(boolean abortTxn) {
new LogContext(),
new MockTime(),
new MockFaultHandler("testOneTransactionInMultipleBatches"),
- updater
+ updater,
+ SupportedConfigChecker.TRUE
);
batchLoader.resetToImage(MetadataImage.EMPTY);
@@ -449,6 +460,48 @@ public void testOneTransactionInMultipleBatches(boolean abortTxn) {
}
}
+ @Test
+ public void testUnsupportedConfigFilteredInBatch() {
+ SupportedConfigChecker checker = (type, name) ->
+ !(type == ConfigResource.Type.TOPIC && name.equals("unsupported.config"));
+
+ MockMetadataUpdater updater = new MockMetadataUpdater();
+ MetadataBatchLoader batchLoader = new MetadataBatchLoader(
+ new LogContext(),
+ new MockTime(),
+ new MockFaultHandler("testUnsupportedConfigFilteredInBatch"),
+ updater,
+ checker
+ );
+
+ List records = List.of(
+ new ApiMessageAndVersion(new TopicRecord()
+ .setName("foo")
+ .setTopicId(TOPIC_FOO), (short) 0),
+ new ApiMessageAndVersion(new ConfigRecord()
+ .setResourceType(ConfigResource.Type.TOPIC.id())
+ .setResourceName("foo")
+ .setName("unsupported.config")
+ .setValue("some-value"), (short) 0),
+ new ApiMessageAndVersion(new ConfigRecord()
+ .setResourceType(ConfigResource.Type.TOPIC.id())
+ .setResourceName("foo")
+ .setName("retention.ms")
+ .setValue("1000"), (short) 0)
+ );
+
+ Batch batch = Batch.data(10, 42, 0, 100, records);
+ batchLoader.resetToImage(MetadataImage.EMPTY);
+ batchLoader.loadBatch(batch, LEADER_AND_EPOCH);
+ batchLoader.maybeFlushBatches(LEADER_AND_EPOCH, true);
+
+ assertEquals(1, updater.updates);
+ ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "foo");
+ Properties props = updater.latestImage.configs().configProperties(resource);
+ assertFalse(props.containsKey("unsupported.config"));
+ assertEquals("1000", props.getProperty("retention.ms"));
+ }
+
@Test
public void testTransactionAlignmentOnBatchBoundary() {
List batchRecords = new ArrayList<>();
diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index 31960252bd815..22e9a749b12bb 100644
--- a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -18,9 +18,11 @@
package org.apache.kafka.image.loader;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.EndTransactionRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
@@ -31,6 +33,7 @@
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.ControlRecord;
@@ -854,4 +857,76 @@ public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, Loader
}
faultHandler.maybeRethrowFirstException();
}
+
+ @Test
+ public void testUnsupportedConfigFilteredInCommit() throws Exception {
+ // Create a checker that rejects "message.format.version"
+ SupportedConfigChecker checker = (type, name) ->
+ !name.equals("message.format.version");
+
+ MockFaultHandler faultHandler = new MockFaultHandler("testUnsupportedConfigFilteredInCommit");
+ MockPublisher publisher = new MockPublisher("testUnsupportedConfigFilteredInCommit");
+
+ try (MetadataLoader loader = new MetadataLoader.Builder().
+ setFaultHandler(faultHandler).
+ setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).
+ setSupportedConfigChecker(checker).
+ build()) {
+ loader.installPublishers(List.of(publisher)).get();
+ loadTestSnapshot(loader, 100);
+ publisher.firstPublish.get(10, TimeUnit.SECONDS);
+
+ // Commit a batch containing ConfigRecord with message.format.version
+ loader.handleCommit(MockBatchReader.newSingleBatchReader(200, 100, List.of(
+ new ApiMessageAndVersion(new ConfigRecord().
+ setResourceType(ConfigResource.Type.TOPIC.id()).
+ setResourceName("test-topic").
+ setName("message.format.version").
+ setValue("2.8"), (short) 0)
+ )));
+ loader.waitForAllEventsToBeHandled();
+
+ // Verify config was filtered out
+ assertTrue(publisher.latestImage.configs().configMapForResource(
+ new ConfigResource(ConfigResource.Type.TOPIC, "test-topic")).isEmpty());
+ }
+ faultHandler.maybeRethrowFirstException();
+ }
+
+ @Test
+ public void testUnsupportedConfigFilteredInSnapshot() throws Exception {
+ // Create a checker that rejects "message.format.version"
+ SupportedConfigChecker checker = (type, name) ->
+ !name.equals("message.format.version");
+
+ MockFaultHandler faultHandler = new MockFaultHandler("testUnsupportedConfigFilteredInSnapshot");
+ MockPublisher publisher = new MockPublisher("testUnsupportedConfigFilteredInSnapshot");
+
+ try (MetadataLoader loader = new MetadataLoader.Builder().
+ setFaultHandler(faultHandler).
+ setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
+ setSupportedConfigChecker(checker).
+ build()) {
+ loader.installPublishers(List.of(publisher)).get();
+
+ // Load a snapshot containing ConfigRecord with message.format.version
+ loader.handleLoadSnapshot(MockSnapshotReader.fromRecordLists(
+ new MetadataProvenance(200, 100, 4000, true), List.of(
+ List.of(new ApiMessageAndVersion(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(MINIMUM_VERSION.featureLevel()), (short) 0)),
+ List.of(new ApiMessageAndVersion(new ConfigRecord().
+ setResourceType(ConfigResource.Type.TOPIC.id()).
+ setResourceName("test-topic").
+ setName("message.format.version").
+ setValue("2.8"), (short) 0))
+ )));
+ loader.waitForAllEventsToBeHandled();
+
+ // Verify config was filtered out
+ assertTrue(publisher.latestImage.configs().configMapForResource(
+ new ConfigResource(ConfigResource.Type.TOPIC, "test-topic")).isEmpty());
+ }
+ faultHandler.maybeRethrowFirstException();
+ }
}
diff --git a/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java b/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java
new file mode 100644
index 0000000000000..301d9e1d6bfec
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.metadata.SupportedConfigChecker;
+import org.apache.kafka.server.metrics.ClientMetricsConfigs;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+
+/**
+ * Default implementation of SupportedConfigChecker that checks if a configuration name
+ * is supported for a given resource type based on the actual config definitions.
+ *
+ * This class maintains a predicate per resource type:
+ * - TOPIC: Configurations defined in LogConfig
+ * - BROKER: All config names are accepted. Broker configs include listener-specific
+ * prefixed configs (e.g., listener.name.<name>.ssl.keystore.location) whose names
+ * are user-defined at runtime and cannot be pre-enumerated. They also include
+ * plugin-defined configs (e.g., custom authorizer or quota callback configs) with
+ * arbitrary names. For these reasons BROKER configs are not filtered by name.
+ * - CLIENT_METRICS: Configurations defined in ClientMetricsConfigs
+ * - GROUP: Configurations defined in GroupConfig
+ *
+ * Config names for resource types not in this map are considered unsupported.
+ */
+public final class DefaultSupportedConfigChecker implements SupportedConfigChecker {
+ static final class SetContainsPredicate implements Predicate {
+ private final Set keys;
+
+ SetContainsPredicate(Set keys) {
+ this.keys = keys;
+ }
+
+ @Override
+ public boolean test(String key) {
+ return keys.contains(key);
+ }
+ }
+
+ private final Map> validConfigsByType;
+
+ public DefaultSupportedConfigChecker() {
+ this.validConfigsByType = Map.of(
+ ConfigResource.Type.TOPIC, new SetContainsPredicate(new HashSet<>(LogConfig.configNames())),
+ ConfigResource.Type.BROKER, ignore -> true,
+ ConfigResource.Type.CLIENT_METRICS, new SetContainsPredicate(ClientMetricsConfigs.configDef().names()),
+ ConfigResource.Type.GROUP, new SetContainsPredicate(GroupConfig.configDef().names())
+ );
+ }
+
+ @Override
+ public boolean isSupported(ConfigResource.Type resourceType, String configName) {
+ Predicate predicate = validConfigsByType.get(resourceType);
+ return predicate != null && predicate.test(configName);
+ }
+}
diff --git a/server/src/test/java/org/apache/kafka/server/config/DefaultSupportedConfigCheckerTest.java b/server/src/test/java/org/apache/kafka/server/config/DefaultSupportedConfigCheckerTest.java
new file mode 100644
index 0000000000000..b62626e42a6e8
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/config/DefaultSupportedConfigCheckerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.server.metrics.ClientMetricsConfigs;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
+import static org.apache.kafka.common.config.ConfigResource.Type.CLIENT_METRICS;
+import static org.apache.kafka.common.config.ConfigResource.Type.GROUP;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class DefaultSupportedConfigCheckerTest {
+ private final DefaultSupportedConfigChecker checker = new DefaultSupportedConfigChecker();
+
+ @Test
+ void testIsSupported() {
+ // Test valid topic configs
+ assertTrue(checker.isSupported(TOPIC, TopicConfig.SEGMENT_BYTES_CONFIG));
+ assertTrue(checker.isSupported(TOPIC, TopicConfig.SEGMENT_MS_CONFIG));
+ assertTrue(checker.isSupported(TOPIC, TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG));
+ assertFalse(checker.isSupported(TOPIC, "invalid.topic.config"));
+
+ // BROKER allows all config names, including listener-specific prefixed configs
+ // (e.g., listener.name..ssl.keystore.location) and plugin-defined configs
+ // (e.g., custom authorizer or quota callback configs) that cannot be pre-enumerated.
+ assertTrue(checker.isSupported(BROKER, "log.cleaner.threads"));
+ assertTrue(checker.isSupported(BROKER, "num.network.threads"));
+ assertTrue(checker.isSupported(BROKER, "log.segment.bytes"));
+ assertTrue(checker.isSupported(BROKER, "listener.name.EXTERNAL.ssl.keystore.location"));
+ assertTrue(checker.isSupported(BROKER, "fake.configurable.authorizer.foobar.config"));
+
+ // Test valid client metrics configs
+ assertTrue(checker.isSupported(CLIENT_METRICS, ClientMetricsConfigs.INTERVAL_MS_CONFIG));
+ assertTrue(checker.isSupported(CLIENT_METRICS, ClientMetricsConfigs.METRICS_CONFIG));
+ assertTrue(checker.isSupported(CLIENT_METRICS, ClientMetricsConfigs.MATCH_CONFIG));
+ assertFalse(checker.isSupported(CLIENT_METRICS, "invalid.client.metrics.config"));
+
+ // Test valid group configs
+ assertTrue(checker.isSupported(GROUP, GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG));
+ assertTrue(checker.isSupported(GROUP, GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG));
+ assertFalse(checker.isSupported(GROUP, "invalid.group.config"));
+
+ // Test that topic replication throttled replicas are supported for TOPIC
+ assertTrue(checker.isSupported(TOPIC, QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG));
+ assertTrue(checker.isSupported(TOPIC, QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG));
+
+ // Test that broker replication throttled rates are supported for BROKER
+ assertTrue(checker.isSupported(BROKER, QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG));
+ assertTrue(checker.isSupported(BROKER, QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG));
+ assertTrue(checker.isSupported(BROKER, QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG));
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
index 7af9557381dad..0614d268f542c 100644
--- a/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
@@ -22,7 +22,9 @@
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.image.loader.MetadataLoader;
+import org.apache.kafka.metadata.SupportedConfigChecker;
import org.apache.kafka.metadata.util.SnapshotFileReader;
+import org.apache.kafka.server.config.DefaultSupportedConfigChecker;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.LoggingFaultHandler;
import org.apache.kafka.server.util.FileLock;
@@ -148,10 +150,13 @@ public MetadataShell(
private void initializeWithSnapshotFileReader() throws Exception {
this.fileLock = takeDirectoryLockIfExists(parentParent(new File(snapshotPath)));
+ SupportedConfigChecker supportedConfigChecker = new DefaultSupportedConfigChecker();
+
this.loader = new MetadataLoader.Builder().
setFaultHandler(faultHandler).
setNodeId(-1).
setHighWaterMarkAccessor(() -> snapshotFileReader.highWaterMark()).
+ setSupportedConfigChecker(supportedConfigChecker).
build();
snapshotFileReader = new SnapshotFileReader(snapshotPath, loader);
snapshotFileReader.startup();