diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 2d368daef19b..081fc27e79bf 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -71,8 +71,7 @@ public final class RemoteLogManagerConfig { public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP = "remote.log.metadata.manager.class.name"; public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_DOC = "Fully qualified class name of `RemoteLogMetadataManager` implementation."; - //todo add the default topic based RLMM class name. - public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME = ""; + public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME = "org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager"; public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP = "remote.log.metadata.manager.class.path"; public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC = "Class path of the `RemoteLogMetadataManager` implementation." + @@ -176,7 +175,8 @@ public final class RemoteLogManagerConfig { MEDIUM, REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC) .defineInternal(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, - STRING, null, + STRING, + DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME, new ConfigDef.NonEmptyString(), MEDIUM, REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_DOC) diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java index c8b428ca547f..8bef2e3de79e 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java @@ -18,12 +18,16 @@ import org.apache.kafka.common.config.AbstractConfig; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP; + public class RemoteLogManagerConfigTest { private static class TestConfig extends AbstractConfig { @@ -32,21 +36,27 @@ public TestConfig(Map originals) { } } - @Test - public void testValidConfigs() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testValidConfigs(boolean useDefaultRemoteLogMetadataManagerClass) { String rsmPrefix = "__custom.rsm."; String rlmmPrefix = "__custom.rlmm."; Map rsmProps = Collections.singletonMap("rsm.prop", "val"); Map rlmmProps = Collections.singletonMap("rlmm.prop", "val"); + String remoteLogMetadataManagerClass = useDefaultRemoteLogMetadataManagerClass ? DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME : "dummy.remote.log.metadata.class"; RemoteLogManagerConfig expectedRemoteLogManagerConfig = new RemoteLogManagerConfig(true, "dummy.remote.storage.class", "dummy.remote.storage.class.path", - "dummy.remote.log.metadata.class", "dummy.remote.log.metadata.class.path", + remoteLogMetadataManagerClass, "dummy.remote.log.metadata.class.path", "listener.name", 1024 * 1024L, 1, 60000L, 100L, 60000L, 0.3, 10, 100, 100, rsmPrefix, rsmProps, rlmmPrefix, rlmmProps); Map props = extractProps(expectedRemoteLogManagerConfig); rsmProps.forEach((k, v) -> props.put(rsmPrefix + k, v)); rlmmProps.forEach((k, v) -> props.put(rlmmPrefix + k, v)); + // Removing remote.log.metadata.manager.class.name so that the default value gets picked up. + if (useDefaultRemoteLogMetadataManagerClass) { + props.remove(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP); + } TestConfig config = new TestConfig(props); RemoteLogManagerConfig remoteLogManagerConfig = new RemoteLogManagerConfig(config); Assertions.assertEquals(expectedRemoteLogManagerConfig, remoteLogManagerConfig);