Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15083: add config with "remote.log.metadata" prefix #14151

Merged
merged 3 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 7 additions & 5 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,17 @@ public void onEndPointCreated(EndPoint endpoint) {
}

private void configureRLMM() {
final Map<String, Object> rlmmProps = new HashMap<>(rlmConfig.remoteLogMetadataManagerProps());

rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
rlmmProps.put("cluster.id", clusterId);
final Map<String, Object> rlmmProps = new HashMap<>();
endpoint.ifPresent(e -> {
rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "bootstrap.servers", e.host() + ":" + e.port());
rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", e.securityProtocol().name);
});
// update the remoteLogMetadataProps here to override endpoint config if any
rlmmProps.putAll(rlmConfig.remoteLogMetadataManagerProps());

rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
rlmmProps.put("cluster.id", clusterId);

remoteLogMetadataManager.configure(rlmmProps);
}
Expand Down
73 changes: 71 additions & 2 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@
import java.util.concurrent.CountDownLatch;

import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_PRODUCER_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
Expand Down Expand Up @@ -122,6 +127,17 @@ public class RemoteLogManagerTest {
int brokerId = 0;
String logDir = TestUtils.tempDirectory("kafka-").toString();
String clusterId = "dummyId";
String remoteLogStorageTestProp = "remote.log.storage.test";
String remoteLogStorageTestVal = "storage.test";
String remoteLogMetadataTestProp = "remote.log.metadata.test";
String remoteLogMetadataTestVal = "metadata.test";
String remoteLogMetadataCommonClientTestProp = REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "common.client.test";
String remoteLogMetadataCommonClientTestVal = "common.test";
String remoteLogMetadataProducerTestProp = REMOTE_LOG_METADATA_PRODUCER_PREFIX + "producer.test";
String remoteLogMetadataProducerTestVal = "producer.test";
String remoteLogMetadataConsumerTestProp = REMOTE_LOG_METADATA_CONSUMER_PREFIX + "consumer.test";
String remoteLogMetadataConsumerTestVal = "consumer.test";
String remoteLogMetadataTopicPartitionsNum = "1";

RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class);
RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class);
Expand Down Expand Up @@ -248,16 +264,59 @@ void testRemoteLogMetadataManagerWithEndpointConfig() {
assertEquals(brokerId, capture.getValue().get(KafkaConfig.BrokerIdProp()));
}

@Test
void testRemoteLogMetadataManagerWithEndpointConfigOverridden() throws IOException {
Properties props = new Properties();
// override common security.protocol by adding "RLMM prefix" and "remote log metadata common client prefix"
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", "SSL");
try (RemoteLogManager remoteLogManager = new RemoteLogManager(createRLMConfig(props), brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), brokerTopicStats) {
public RemoteStorageManager createRemoteStorageManager() {
return remoteStorageManager;
}
public RemoteLogMetadataManager createRemoteLogMetadataManager() {
return remoteLogMetadataManager;
}
}) {

String host = "localhost";
String port = "1234";
String securityProtocol = "PLAINTEXT";
EndPoint endPoint = new EndPoint(host, Integer.parseInt(port), new ListenerName(securityProtocol),
SecurityProtocol.PLAINTEXT);
remoteLogManager.onEndPointCreated(endPoint);
remoteLogManager.startup();

ArgumentCaptor<Map<String, Object>> capture = ArgumentCaptor.forClass(Map.class);
verify(remoteLogMetadataManager, times(1)).configure(capture.capture());
assertEquals(host + ":" + port, capture.getValue().get(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "bootstrap.servers"));
// should be overridden as SSL
assertEquals("SSL", capture.getValue().get(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol"));
assertEquals(clusterId, capture.getValue().get("cluster.id"));
assertEquals(brokerId, capture.getValue().get(KafkaConfig.BrokerIdProp()));
}
}



@Test
void testStartup() {
remoteLogManager.startup();
ArgumentCaptor<Map<String, Object>> capture = ArgumentCaptor.forClass(Map.class);
verify(remoteStorageManager, times(1)).configure(capture.capture());
assertEquals(brokerId, capture.getValue().get("broker.id"));
assertEquals(remoteLogStorageTestVal, capture.getValue().get(remoteLogStorageTestProp));

verify(remoteLogMetadataManager, times(1)).configure(capture.capture());
assertEquals(brokerId, capture.getValue().get("broker.id"));
assertEquals(logDir, capture.getValue().get("log.dir"));

// verify the configs starting with "remote.log.metadata", "remote.log.metadata.common.client."
// "remote.log.metadata.producer.", and "remote.log.metadata.consumer." are correctly passed in
assertEquals(remoteLogMetadataTopicPartitionsNum, capture.getValue().get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP));
assertEquals(remoteLogMetadataTestVal, capture.getValue().get(remoteLogMetadataTestProp));
assertEquals(remoteLogMetadataConsumerTestVal, capture.getValue().get(remoteLogMetadataConsumerTestProp));
assertEquals(remoteLogMetadataProducerTestVal, capture.getValue().get(remoteLogMetadataProducerTestProp));
assertEquals(remoteLogMetadataCommonClientTestVal, capture.getValue().get(remoteLogMetadataCommonClientTestProp));
}

// This test creates 2 log segments, 1st one has start offset of 0, 2nd one (and active one) has start offset of 150.
Expand Down Expand Up @@ -726,13 +785,15 @@ private void verifyLogSegmentData(LogSegmentData logSegmentData,
@Test
void testGetClassLoaderAwareRemoteStorageManager() throws Exception {
ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class);
RemoteLogManager remoteLogManager =
try (RemoteLogManager remoteLogManager =
new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, t -> Optional.empty(), brokerTopicStats) {
public RemoteStorageManager createRemoteStorageManager() {
return rsmManager;
}
};
assertEquals(rsmManager, remoteLogManager.storageManager());
) {
assertEquals(rsmManager, remoteLogManager.storageManager());
}
}

private void verifyInCache(TopicIdPartition... topicIdPartitions) {
Expand Down Expand Up @@ -1007,6 +1068,14 @@ private RemoteLogManagerConfig createRLMConfig(Properties props) {
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true);
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, NoOpRemoteStorageManager.class.getName());
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, NoOpRemoteLogMetadataManager.class.getName());
props.put(DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX + remoteLogStorageTestProp, remoteLogStorageTestVal);
// adding configs with "remote log metadata manager config prefix"
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, remoteLogMetadataTopicPartitionsNum);
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataTestProp, remoteLogMetadataTestVal);
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataCommonClientTestProp, remoteLogMetadataCommonClientTestVal);
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataConsumerTestProp, remoteLogMetadataConsumerTestVal);
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataProducerTestProp, remoteLogMetadataProducerTestVal);

AbstractConfig config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props);
return new RemoteLogManagerConfig(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,18 @@ public final class RemoteLogManagerConfig {
*/
public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP = "remote.log.storage.manager.impl.prefix";
public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC = "Prefix used for properties to be passed to RemoteStorageManager " +
"implementation. For example this value can be `rsm.s3.`.";
"implementation. For example this value can be `rsm.config.`.";
public static final String DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX = "rsm.config.";

/**
* Prefix used for properties to be passed to {@link RemoteLogMetadataManager} implementation. Remote log subsystem collects all the properties having
* this prefix and passed to {@code RemoteLogMetadataManager} using {@link RemoteLogMetadataManager#configure(Map)}.
*/
public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP = "remote.log.metadata.manager.impl.prefix";
public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC = "Prefix used for properties to be passed to RemoteLogMetadataManager " +
"implementation. For example this value can be `rlmm.s3.`.";
"implementation. For example this value can be `rlmm.config.`.";
public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX = "rlmm.config.";


public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP = "remote.log.storage.system.enable";
public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC = "Whether to enable tier storage functionality in a broker or not. Valid values " +
Expand Down Expand Up @@ -152,13 +155,13 @@ public final class RemoteLogManagerConfig {
REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC)
.defineInternal(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP,
STRING,
null,
DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX,
new ConfigDef.NonEmptyString(),
MEDIUM,
REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC)
.defineInternal(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP,
STRING,
null,
DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX,
new ConfigDef.NonEmptyString(),
MEDIUM,
REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC)
Expand Down