Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15329: Make default remote.log.metadata.manager.class.name as topic based RLMM #14202

Merged
merged 4 commits into from Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -71,8 +71,7 @@ public final class RemoteLogManagerConfig {

public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP = "remote.log.metadata.manager.class.name";
public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_DOC = "Fully qualified class name of `RemoteLogMetadataManager` implementation.";
//todo add the default topic based RLMM class name.
public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME = "";
public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME = "org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager";

public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP = "remote.log.metadata.manager.class.path";
public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC = "Class path of the `RemoteLogMetadataManager` implementation." +
Expand Down Expand Up @@ -176,7 +175,8 @@ public final class RemoteLogManagerConfig {
MEDIUM,
REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC)
.defineInternal(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
STRING, null,
STRING,
DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME,
new ConfigDef.NonEmptyString(),
MEDIUM,
REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_DOC)
Expand Down
Expand Up @@ -18,12 +18,16 @@

import org.apache.kafka.common.config.AbstractConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;

public class RemoteLogManagerConfigTest {

private static class TestConfig extends AbstractConfig {
Expand All @@ -32,21 +36,28 @@ public TestConfig(Map<?, ?> originals) {
}
}

@Test
public void testValidConfigs() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testValidConfigs(boolean useDefaultRemoteLogMetadataManagerClass) {
String rsmPrefix = "__custom.rsm.";
String rlmmPrefix = "__custom.rlmm.";
Map<String, Object> rsmProps = Collections.singletonMap("rsm.prop", "val");
Map<String, Object> rlmmProps = Collections.singletonMap("rlmm.prop", "val");
String remoteLogMetadataManagerClass = useDefaultRemoteLogMetadataManagerClass ? DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME : "dummy.remote.log.metadata.class";
System.out.println(remoteLogMetadataManagerClass);
vamossagar12 marked this conversation as resolved.
Show resolved Hide resolved
RemoteLogManagerConfig expectedRemoteLogManagerConfig
= new RemoteLogManagerConfig(true, "dummy.remote.storage.class", "dummy.remote.storage.class.path",
"dummy.remote.log.metadata.class", "dummy.remote.log.metadata.class.path",
remoteLogMetadataManagerClass, "dummy.remote.log.metadata.class.path",
vamossagar12 marked this conversation as resolved.
Show resolved Hide resolved
"listener.name", 1024 * 1024L, 1, 60000L, 100L, 60000L, 0.3, 10, 100, 100,
rsmPrefix, rsmProps, rlmmPrefix, rlmmProps);

Map<String, Object> props = extractProps(expectedRemoteLogManagerConfig);
rsmProps.forEach((k, v) -> props.put(rsmPrefix + k, v));
rlmmProps.forEach((k, v) -> props.put(rlmmPrefix + k, v));
// Removing remote.log.metadata.manager.class.name so that the default value gets picked up.
if (useDefaultRemoteLogMetadataManagerClass) {
props.remove(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP);
}
TestConfig config = new TestConfig(props);
RemoteLogManagerConfig remoteLogManagerConfig = new RemoteLogManagerConfig(config);
Assertions.assertEquals(expectedRemoteLogManagerConfig, remoteLogManagerConfig);
Expand Down