Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm #13828

Merged
merged 8 commits into from
Jun 16, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.kafka.common.network;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;

import java.util.Locale;
import java.util.Objects;
Expand All @@ -36,6 +38,9 @@ public static ListenerName forSecurityProtocol(SecurityProtocol securityProtocol
* Create an instance with the provided value converted to uppercase.
*/
public static ListenerName normalised(String value) {
if (Utils.isBlank(value)) {
throw new ConfigException("The provided listener name is null or empty string");
}
return new ListenerName(value.toUpperCase(Locale.ROOT));
}

Expand Down
18 changes: 17 additions & 1 deletion core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package kafka.log.remote;

import kafka.cluster.EndPoint;
import kafka.cluster.Partition;
import kafka.log.LogSegment;
import kafka.log.UnifiedLog;
Expand Down Expand Up @@ -136,7 +137,10 @@ public class RemoteLogManager implements Closeable {

// topic ids that are received on leadership changes, this map is cleared on stop partitions
private final ConcurrentMap<TopicPartition, Uuid> topicPartitionIds = new ConcurrentHashMap<>();
private final String clusterId;

// The endpoint for remote log metadata manager to connect to
private Optional<EndPoint> endpoint = Optional.empty();
showuon marked this conversation as resolved.
Show resolved Hide resolved
private boolean closed = false;

/**
Expand All @@ -146,17 +150,19 @@ public class RemoteLogManager implements Closeable {
* @param brokerId id of the current broker.
* @param logDir directory of Kafka log segments.
* @param time Time instance.
* @param clusterId The cluster id.
* @param fetchLog function to get UnifiedLog instance for a given topic.
*/
public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
int brokerId,
String logDir,
String clusterId,
Time time,
Function<TopicPartition, Optional<UnifiedLog>> fetchLog) {

this.rlmConfig = rlmConfig;
this.brokerId = brokerId;
this.logDir = logDir;
this.clusterId = clusterId;
this.time = time;
this.fetchLog = fetchLog;

Expand Down Expand Up @@ -220,11 +226,21 @@ public RemoteLogMetadataManager run() {
});
}

public void onEndPointCreated(EndPoint endpoint) {
this.endpoint = Optional.of(endpoint);
}

private void configureRLMM() {
final Map<String, Object> rlmmProps = new HashMap<>(rlmConfig.remoteLogMetadataManagerProps());

rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
rlmmProps.put("cluster.id", clusterId);
endpoint.ifPresent(e -> {
showuon marked this conversation as resolved.
Show resolved Hide resolved
rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use the constant CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but that would add dependency with client module with core. That's why I use plain string instead

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

rlmmProps.put("security.protocol", e.securityProtocol().name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use the constant CommonClientConfigs.SECURITY_PROTOCOL_CONFIG

});

remoteLogMetadataManager.configure(rlmmProps);
}

Expand Down
26 changes: 16 additions & 10 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kafka.server

import kafka.cluster.Broker.ServerInfo
import kafka.cluster.EndPoint
import kafka.coordinator.group.GroupCoordinatorAdapter
import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator}
import kafka.log.LogManager
Expand All @@ -44,7 +45,6 @@ import org.apache.kafka.metadata.{BrokerState, VersionRange}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
Expand Down Expand Up @@ -99,7 +99,7 @@ class BrokerServer(

var logDirFailureChannel: LogDirFailureChannel = _
var logManager: LogManager = _
var remoteLogManager: Option[RemoteLogManager] = None
var remoteLogManagerOpt: Option[RemoteLogManager] = None

var tokenManager: DelegationTokenManager = _

Expand Down Expand Up @@ -197,7 +197,7 @@ class BrokerServer(
logManager = LogManager(config, initialOfflineDirs, metadataCache, kafkaScheduler, time,
brokerTopicStats, logDirFailureChannel, keepPartitionMetadataFile = true)

remoteLogManager = createRemoteLogManager(config)
remoteLogManagerOpt = createRemoteLogManager()

// Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
// This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
Expand Down Expand Up @@ -261,7 +261,7 @@ class BrokerServer(
time = time,
scheduler = kafkaScheduler,
logManager = logManager,
remoteLogManager = remoteLogManager,
remoteLogManager = remoteLogManagerOpt,
quotaManagers = quotaManagers,
metadataCache = metadataCache,
logDirFailureChannel = logDirFailureChannel,
Expand Down Expand Up @@ -474,7 +474,14 @@ class BrokerServer(
new KafkaConfig(config.originals(), true)

// Start RemoteLogManager before broker start serving the requests.
remoteLogManager.foreach(_.startup())
remoteLogManagerOpt.foreach(rlm => {
val listenerName = ListenerName.normalised(config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName())
endpoints.stream.filter(e => e.listenerName.equals(listenerName))
.findFirst()
.ifPresent(e => rlm.onEndPointCreated(EndPoint.fromJava(e)))

rlm.startup()
})

// If we are using a ClusterMetadataAuthorizer which stores its ACLs in the metadata log,
// notify it that the loading process is complete.
Expand Down Expand Up @@ -514,14 +521,13 @@ class BrokerServer(
}
}

protected def createRemoteLogManager(config: KafkaConfig): Option[RemoteLogManager] = {
val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
if (remoteLogManagerConfig.enableRemoteStorageSystem()) {
protected def createRemoteLogManager(): Option[RemoteLogManager] = {
if (config.remoteLogManagerConfig.enableRemoteStorageSystem()) {
if (config.logDirs.size > 1) {
throw new KafkaException("Tiered storage is not supported with multiple log dirs.");
}

Some(new RemoteLogManager(remoteLogManagerConfig, config.brokerId, config.logDirs.head, time,
Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
(tp: TopicPartition) => logManager.getLog(tp).asJava));
} else {
None
Expand Down Expand Up @@ -598,7 +604,7 @@ class BrokerServer(

// Close remote log manager to give a chance to any of its underlying clients
// (especially in RemoteStorageManager and RemoteLogMetadataManager) to close gracefully.
CoreUtils.swallow(remoteLogManager.foreach(_.close()), this)
CoreUtils.swallow(remoteLogManagerOpt.foreach(_.close()), this)

if (quotaManagers != null)
CoreUtils.swallow(quotaManagers.shutdown(), this)
Expand Down
25 changes: 15 additions & 10 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.fault.LoggingFaultHandler
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.KafkaScheduler
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
Expand Down Expand Up @@ -128,7 +127,7 @@ class KafkaServer(

var logDirFailureChannel: LogDirFailureChannel = _
@volatile private var _logManager: LogManager = _
var remoteLogManager: Option[RemoteLogManager] = None
var remoteLogManagerOpt: Option[RemoteLogManager] = None

@volatile private var _replicaManager: ReplicaManager = _
var adminManager: ZkAdminManager = _
Expand Down Expand Up @@ -280,7 +279,7 @@ class KafkaServer(
_brokerState = BrokerState.RECOVERY
logManager.startup(zkClient.getAllTopicsInCluster())

remoteLogManager = createRemoteLogManager(config)
remoteLogManagerOpt = createRemoteLogManager()

if (config.migrationEnabled) {
kraftControllerNodes = RaftConfig.voterConnectionsToNodes(
Expand Down Expand Up @@ -505,7 +504,14 @@ class KafkaServer(
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))

// Start RemoteLogManager before broker start serving the requests.
remoteLogManager.foreach(_.startup())
remoteLogManagerOpt.foreach(rlm => {
val listenerName = ListenerName.normalised(config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName())
brokerInfo.broker.endPoints
.find(e => e.listenerName.equals(listenerName))
.foreach(e => rlm.onEndPointCreated(e))

rlm.startup()
})

/* start processing requests */
val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache, brokerEpochManager)
Expand Down Expand Up @@ -596,14 +602,13 @@ class KafkaServer(
}
}

protected def createRemoteLogManager(config: KafkaConfig): Option[RemoteLogManager] = {
val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
if (remoteLogManagerConfig.enableRemoteStorageSystem()) {
protected def createRemoteLogManager(): Option[RemoteLogManager] = {
if (config.remoteLogManagerConfig.enableRemoteStorageSystem()) {
if(config.logDirs.size > 1) {
throw new KafkaException("Tiered storage is not supported with multiple log dirs.");
}

Some(new RemoteLogManager(remoteLogManagerConfig, config.brokerId, config.logDirs.head, time,
Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
(tp: TopicPartition) => logManager.getLog(tp).asJava));
} else {
None
Expand All @@ -621,7 +626,7 @@ class KafkaServer(
time = time,
scheduler = kafkaScheduler,
logManager = logManager,
remoteLogManager = remoteLogManager,
remoteLogManager = remoteLogManagerOpt,
quotaManagers = quotaManagers,
metadataCache = metadataCache,
logDirFailureChannel = logDirFailureChannel,
Expand Down Expand Up @@ -932,7 +937,7 @@ class KafkaServer(
// Close remote log manager before stopping processing requests, to give a chance to any
// of its underlying clients (especially in RemoteStorageManager and RemoteLogMetadataManager)
// to close gracefully.
CoreUtils.swallow(remoteLogManager.foreach(_.close()), this)
CoreUtils.swallow(remoteLogManagerOpt.foreach(_.close()), this)

if (featureChangeListener != null)
CoreUtils.swallow(featureChangeListener.close(), this)
Expand Down
41 changes: 39 additions & 2 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@
*/
package kafka.log.remote;

import kafka.cluster.EndPoint;
import kafka.cluster.Partition;
import kafka.log.LogSegment;
import kafka.log.UnifiedLog;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
Expand Down Expand Up @@ -102,6 +106,7 @@ public class RemoteLogManagerTest {
Time time = new MockTime();
int brokerId = 0;
String logDir = TestUtils.tempDirectory("kafka-").toString();
String clusterId = "dummyId";

RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class);
RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class);
Expand Down Expand Up @@ -137,7 +142,7 @@ void setUp() throws Exception {
topicIds.put(followerTopicIdPartition.topicPartition().topic(), followerTopicIdPartition.topicId());
Properties props = new Properties();
remoteLogManagerConfig = createRLMConfig(props);
remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, time, tp -> Optional.of(mockLog)) {
remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog)) {
public RemoteStorageManager createRemoteStorageManager() {
return remoteStorageManager;
}
Expand Down Expand Up @@ -189,6 +194,38 @@ void testRemoteLogMetadataManagerWithUserDefinedConfigs() {
assertFalse(metadataMangerConfig.containsKey("remote.log.metadata.y"));
}

@Test
showuon marked this conversation as resolved.
Show resolved Hide resolved
void testRemoteStorageManagerWithUserDefinedConfigs() {
String key = "key";
String configPrefix = "config.prefix";
Properties props = new Properties();
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, configPrefix);
props.put(configPrefix + key, "world");
props.put("remote.storage.manager.y", "z");

Map<String, Object> remoteStorageManagerConfig = createRLMConfig(props).remoteStorageManagerProps();
assertEquals(props.get(configPrefix + key), remoteStorageManagerConfig.get(key));
assertFalse(remoteStorageManagerConfig.containsKey("remote.storage.manager.y"));
}

@Test
void testRemoteLogMetadataManagerWithEndpointConfig() {
String host = "localhost";
String port = "1234";
String securityProtocol = "PLAINTEXT";
EndPoint endPoint = new EndPoint(host, Integer.parseInt(port), new ListenerName(securityProtocol),
SecurityProtocol.PLAINTEXT);
remoteLogManager.onEndPointCreated(endPoint);
remoteLogManager.startup();

ArgumentCaptor<Map<String, Object>> capture = ArgumentCaptor.forClass(Map.class);
verify(remoteLogMetadataManager, times(1)).configure(capture.capture());
assertEquals(host + ":" + port, capture.getValue().get("bootstrap.servers"));
assertEquals(securityProtocol, capture.getValue().get("security.protocol"));
assertEquals(clusterId, capture.getValue().get("cluster.id"));
assertEquals(brokerId, capture.getValue().get(KafkaConfig.BrokerIdProp()));
}

@Test
void testStartup() {
remoteLogManager.startup();
Expand Down Expand Up @@ -374,7 +411,7 @@ private void verifyLogSegmentData(LogSegmentData logSegmentData,
void testGetClassLoaderAwareRemoteStorageManager() throws Exception {
ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class);
RemoteLogManager remoteLogManager =
new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, time, t -> Optional.empty()) {
new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, t -> Optional.empty()) {
public RemoteStorageManager createRemoteStorageManager() {
return rsmManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ class KRaftClusterTest {
cluster.format()
cluster.startup()
cluster.brokers().forEach((_, server) => {
server.remoteLogManager match {
server.remoteLogManagerOpt match {
case Some(_) =>
case None => fail("RemoteLogManager should be initialized")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class KafkaServerTest extends QuorumTestHarness {
"org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager")

val server = TestUtils.createServer(KafkaConfig.fromProps(props))
server.remoteLogManager match {
server.remoteLogManagerOpt match {
case Some(_) =>
case None => fail("RemoteLogManager should be initialized")
}
Expand Down
19 changes: 19 additions & 0 deletions core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package kafka.server

import kafka.utils.TestUtils
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig}
import org.apache.zookeeper.KeeperException.NodeExistsException
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
Expand Down Expand Up @@ -61,6 +63,23 @@ class ServerStartupTest extends QuorumTestHarness {
assertThrows(classOf[IllegalArgumentException], () => TestUtils.createServer(KafkaConfig.fromProps(props2)))
}

@Test
def testRemoteStorageEnabled(): Unit = {
// Create and start first broker
val brokerId1 = 0
val props1 = TestUtils.createBrokerConfig(brokerId1, zkConnect)
props1.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true")
props1.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
props1.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
assertThrows(classOf[NullPointerException], () => TestUtils.createServer(KafkaConfig.fromProps(props1)))
props1.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, "badListenerName")
assertThrows(classOf[ConfigException], () => TestUtils.createServer(KafkaConfig.fromProps(props1)))

// should not throw exception after adding a correct value for "remote.log.metadata.manager.listener.name"
props1.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, "PLAINTEXT")
server = TestUtils.createServer(KafkaConfig.fromProps(props1))
}

@Test
def testConflictBrokerRegistration(): Unit = {
// Try starting a broker with the a conflicting broker id.
Expand Down