Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm #13828

Merged
merged 8 commits into from
Jun 16, 2023
12 changes: 12 additions & 0 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package kafka.log.remote;

import kafka.cluster.EndPoint;
import kafka.cluster.Partition;
import kafka.log.LogSegment;
import kafka.log.UnifiedLog;
Expand Down Expand Up @@ -137,6 +138,8 @@ public class RemoteLogManager implements Closeable {
// topic ids that are received on leadership changes, this map is cleared on stop partitions
private final ConcurrentMap<TopicPartition, Uuid> topicPartitionIds = new ConcurrentHashMap<>();

// The endpoint for remote log metadata manager to connect to
private Optional<EndPoint> endpoint = Optional.empty();
showuon marked this conversation as resolved.
Show resolved Hide resolved
private boolean closed = false;

/**
Expand Down Expand Up @@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() {
});
}

public void endPoint(Optional<EndPoint> endpoint) {
this.endpoint = endpoint;
}

private void configureRLMM() {
final Map<String, Object> rlmmProps = new HashMap<>(rlmConfig.remoteLogMetadataManagerProps());

rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
endpoint.ifPresent(e -> {
showuon marked this conversation as resolved.
Show resolved Hide resolved
rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use the constant CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but that would add dependency with client module with core. That's why I use plain string instead

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

rlmmProps.put("security.protocol", e.securityProtocol().name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use the constant CommonClientConfigs.SECURITY_PROTOCOL_CONFIG

});

remoteLogMetadataManager.configure(rlmmProps);
}

Expand Down
15 changes: 12 additions & 3 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kafka.server

import kafka.cluster.Broker.ServerInfo
import kafka.cluster.EndPoint
import kafka.coordinator.group.GroupCoordinatorAdapter
import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator}
import kafka.log.LogManager
Expand Down Expand Up @@ -51,6 +52,7 @@ import org.apache.kafka.storage.internals.log.LogDirFailureChannel

import java.net.InetAddress
import java.util
import java.util.Optional
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, TimeoutException}
Expand Down Expand Up @@ -197,7 +199,8 @@ class BrokerServer(
logManager = LogManager(config, initialOfflineDirs, metadataCache, kafkaScheduler, time,
brokerTopicStats, logDirFailureChannel, keepPartitionMetadataFile = true)

remoteLogManager = createRemoteLogManager(config)
val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
showuon marked this conversation as resolved.
Show resolved Hide resolved
remoteLogManager = createRemoteLogManager(remoteLogManagerConfig)

// Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
// This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
Expand Down Expand Up @@ -473,6 +476,13 @@ class BrokerServer(
// contain the original configuration values.
new KafkaConfig(config.originals(), true)

remoteLogManager.foreach(rlm => {
val listenerName = ListenerName.normalised(remoteLogManagerConfig.remoteLogMetadataManagerListenerName())
val endpoint = endpoints.stream.filter(e => e.listenerName.equals(listenerName))
.findAny().orElse(endpoints.get(0))
rlm.endPoint(Optional.of(EndPoint.fromJava(endpoint)))
})

// Start RemoteLogManager before broker start serving the requests.
remoteLogManager.foreach(_.startup())

Expand Down Expand Up @@ -514,8 +524,7 @@ class BrokerServer(
}
}

protected def createRemoteLogManager(config: KafkaConfig): Option[RemoteLogManager] = {
val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
protected def createRemoteLogManager(remoteLogManagerConfig: RemoteLogManagerConfig): Option[RemoteLogManager] = {
if (remoteLogManagerConfig.enableRemoteStorageSystem()) {
if (config.logDirs.size > 1) {
throw new KafkaException("Tiered storage is not supported with multiple log dirs.");
Expand Down
14 changes: 11 additions & 3 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import org.apache.zookeeper.client.ZKClientConfig

import java.io.{File, IOException}
import java.net.{InetAddress, SocketTimeoutException}
import java.util.Optional
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import scala.collection.{Map, Seq}
Expand Down Expand Up @@ -280,7 +281,8 @@ class KafkaServer(
_brokerState = BrokerState.RECOVERY
logManager.startup(zkClient.getAllTopicsInCluster())

remoteLogManager = createRemoteLogManager(config)
val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
showuon marked this conversation as resolved.
Show resolved Hide resolved
remoteLogManager = createRemoteLogManager(remoteLogManagerConfig)
showuon marked this conversation as resolved.
Show resolved Hide resolved

if (config.migrationEnabled) {
kraftControllerNodes = RaftConfig.voterConnectionsToNodes(
Expand Down Expand Up @@ -504,6 +506,13 @@ class KafkaServer(
new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))

remoteLogManager.foreach(rlm => {
val listenerName = ListenerName.normalised(remoteLogManagerConfig.remoteLogMetadataManagerListenerName())
val endpoint = brokerInfo.broker.endPoints.find(e => e.listenerName.equals(listenerName))
.orElse(Some(brokerInfo.broker.endPoints.head))
showuon marked this conversation as resolved.
Show resolved Hide resolved
rlm.endPoint(Optional.of(endpoint.get))
})

// Start RemoteLogManager before broker start serving the requests.
remoteLogManager.foreach(_.startup())

Expand Down Expand Up @@ -596,8 +605,7 @@ class KafkaServer(
}
}

protected def createRemoteLogManager(config: KafkaConfig): Option[RemoteLogManager] = {
val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
protected def createRemoteLogManager(remoteLogManagerConfig: RemoteLogManagerConfig): Option[RemoteLogManager] = {
if (remoteLogManagerConfig.enableRemoteStorageSystem()) {
if(config.logDirs.size > 1) {
throw new KafkaException("Tiered storage is not supported with multiple log dirs.");
Expand Down
33 changes: 33 additions & 0 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package kafka.log.remote;

import kafka.cluster.EndPoint;
import kafka.cluster.Partition;
import kafka.log.LogSegment;
import kafka.log.UnifiedLog;
Expand All @@ -24,10 +25,12 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
Expand Down Expand Up @@ -189,6 +192,36 @@ void testRemoteLogMetadataManagerWithUserDefinedConfigs() {
assertFalse(metadataMangerConfig.containsKey("remote.log.metadata.y"));
}

@Test
showuon marked this conversation as resolved.
Show resolved Hide resolved
void testRemoteStorageManagerWithUserDefinedConfigs() {
String key = "key";
String configPrefix = "config.prefix";
Properties props = new Properties();
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, configPrefix);
props.put(configPrefix + key, "world");
props.put("remote.storage.manager.y", "z");

Map<String, Object> remoteStorageManagerConfig = createRLMConfig(props).remoteStorageManagerProps();
assertEquals(props.get(configPrefix + key), remoteStorageManagerConfig.get(key));
assertFalse(remoteStorageManagerConfig.containsKey("remote.storage.manager.y"));
}

@Test
void testRemoteLogMetadataManagerWithEndpointConfig() {
String host = "localhost";
String port = "1234";
String securityProtocol = "PLAINTEXT";
EndPoint endPoint = new EndPoint(host, Integer.parseInt(port), new ListenerName(securityProtocol),
SecurityProtocol.PLAINTEXT);
remoteLogManager.endPoint(Optional.of(endPoint));
remoteLogManager.startup();

ArgumentCaptor<Map<String, Object>> capture = ArgumentCaptor.forClass(Map.class);
verify(remoteLogMetadataManager, times(1)).configure(capture.capture());
assertEquals(host + ":" + port, capture.getValue().get("bootstrap.servers"));
assertEquals(securityProtocol, capture.getValue().get("security.protocol"));
}

@Test
void testStartup() {
remoteLogManager.startup();
Expand Down