Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm #13828

Merged
merged 8 commits into from Jun 16, 2023

Conversation

showuon
Copy link
Contributor

@showuon showuon commented Jun 8, 2023

add "remote.log.metadata.manager.listener.name" config to rlmm to allow producer/consumer to connect to the server. Also add tests.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

private void configureRLMM() {
final Map<String, Object> rlmmProps = new HashMap<>(rlmConfig.remoteLogMetadataManagerProps());

rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
endpoint.ifPresent(e -> {
rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use the constant CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but that would add dependency with client module with core. That's why I use plain string instead

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

private void configureRLMM() {
final Map<String, Object> rlmmProps = new HashMap<>(rlmConfig.remoteLogMetadataManagerProps());

rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
endpoint.ifPresent(e -> {
rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port());
rlmmProps.put("security.protocol", e.securityProtocol().name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use the constant CommonClientConfigs.SECURITY_PROTOCOL_CONFIG

core/src/main/scala/kafka/server/KafkaServer.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/server/KafkaServer.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/server/KafkaServer.scala Outdated Show resolved Hide resolved
Copy link
Contributor

@divijvaidya divijvaidya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last question Luke, should a unit test have failed when cluster.id property was missing in first revision? It probably hints towards a gap in testing. Can we add that? (or it's ok to say it will be added when Satish adds rest of the tests)

@showuon
Copy link
Contributor Author

showuon commented Jun 12, 2023

One last question Luke, should a unit test have failed when cluster.id property was missing in first revision? It probably hints towards a gap in testing. Can we add that? (or it's ok to say it will be added when Satish adds rest of the tests)

The test is added in the last commit: 39de62c

Also, I reverted the Endpoint back to Optional because I re-read the javadoc of RemoteLogMetadataManager:

* <code>remote.log.metadata.manager.listener.name</code> property is about listener name of the local broker to which
* it should get connected if needed by RemoteLogMetadataManager implementation. When this is configured all other
* required properties can be passed as properties with prefix of 'remote.log.metadata.manager.listener.
* </p>

It's saying this is not a required config. Only used when needed by RemoteLogMetadataManager implementation. We should not throw exception if not provided.

Copy link
Contributor

@divijvaidya divijvaidya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We probably want to update the KIP-405 here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs and specify that this config is optional

  2. From the javadoc of RLMM

When this is configured all other
 required properties can be passed as properties with prefix of 'remote.log.metadata.manager.listener

Can we please add a test to verify this? (asking because while constructing the rlmmProps, we don't pass any other configs with the listener prefix)

core/src/main/scala/kafka/server/BrokerServer.scala Outdated Show resolved Hide resolved
Copy link
Member

@satishd satishd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @showuon for adding the missing integration of starting RLMM with the local endpoints of the broker. Had an initial pass of the PR, left a minor comment.

core/src/main/java/kafka/log/remote/RemoteLogManager.java Outdated Show resolved Hide resolved
@showuon
Copy link
Contributor Author

showuon commented Jun 13, 2023

@divijvaidya @satishd , PR updated. Thanks.

  1. We probably want to update the KIP-405 here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs and specify that this config is optional

Updated.

  1. From the javadoc of RLMM
    When this is configured all other
    required properties can be passed as properties with prefix of 'remote.log.metadata.manager.listener
    Can we please add a test to verify this? (asking because while constructing the rlmmProps, we don't pass any other configs with the listener prefix)

I think we don't have this implemented. We should pass remote.log.metadata.* into RLMM based on KIP-405. Created KAFKA-15083 for this issue.

Thanks.

Copy link
Contributor

@divijvaidya divijvaidya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for patiently working through all the comments Luke! Looks good to me.

Copy link
Member

@satishd satishd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @showuon for addressing the review comments. LGTM.

@satishd
Copy link
Member

satishd commented Jun 14, 2023

I think we don't have this implemented. We should pass remote.log.metadata.* into RLMM based on KIP-405. Created KAFKA-15083 for this issue.

@showuon This is no more valid, KIP needs to be updated with the prefix based configs for RSM and RLMM. Will update the KIP with those details.

@showuon
Copy link
Contributor Author

showuon commented Jun 14, 2023

I think we don't have this implemented. We should pass remote.log.metadata.* into RLMM based on KIP-405. Created KAFKA-15083 for this issue.

@showuon This is no more valid, KIP needs to be updated with the prefix based configs for RSM and RLMM. Will update the KIP with those details.

Good to know, thanks Satish. I've assigned KAFKA-15083 to you. You can close it once you've updated the KIP.

@divijvaidya divijvaidya added the tiered-storage Pull requests associated with KIP-405 (Tiered Storage) label Jun 15, 2023
@showuon
Copy link
Contributor Author

showuon commented Jun 16, 2023

Still cannot get a healthy CI build results. Try to rebase to the latest trunk.

@showuon
Copy link
Contributor Author

showuon commented Jun 16, 2023

Failed tests are unrelated and the failed testReplication also fail in trunk build. I've identified it's caused by this change: #13838 . Go ahead to merge it.

    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testReplication()
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
    Build / JDK 17 and Scala 2.13 / kafka.admin.AddPartitionsTest.testIncrementPartitions(String).quorum=zk
    Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
    Build / JDK 17 and Scala 2.13 / org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()
    Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()
    Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()

@showuon showuon merged commit 7423865 into apache:trunk Jun 16, 2023
1 check failed
@jeqo
Copy link
Contributor

jeqo commented Jun 28, 2023

@showuon I'm trying to test this, but TBRLMM is still complaining about missing bootstrap.servers, even when listener name is provided:

kafka-ts  | [2023-06-28 10:19:04,131] INFO Initializing the resources. (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager)
kafka-ts  | [2023-06-28 10:19:04,141] ERROR Uncaught exception in thread 'RLMMInitializationThread': (org.apache.kafka.common.utils.KafkaThread)
kafka-ts  | org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which has no default value.
kafka-ts  |     at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:496)
kafka-ts  |     at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:486)
kafka-ts  |     at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:112)
kafka-ts  |     at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:145)
kafka-ts  |     at org.apache.kafka.clients.admin.AdminClientConfig.<init>(AdminClientConfig.java:244)
kafka-ts  |     at org.apache.kafka.clients.admin.Admin.create(Admin.java:144)
kafka-ts  |     at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:49)
kafka-ts  |     at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.initializeResources(TopicBasedRemoteLogMetadataManager.java:366)
kafka-ts  |     at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.lambda$configure$1(TopicBasedRemoteLogMetadataManager.java:352)
kafka-ts  |     at java.base/java.lang.Thread.run(Thread.java:829)

Looking at the code, I can see listener name being passed,

kafka-ts  |     remote.log.metadata.manager.class.name = org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager
kafka-ts  |     remote.log.metadata.manager.class.path = null
kafka-ts  |     remote.log.metadata.manager.impl.prefix = rlmm.config.
kafka-ts  |     remote.log.metadata.manager.listener.name = BROKER

but when initializing the resources, properties without the right prefix are ignored:

private void initializeProducerConsumerProperties(Map<String, ?> configs) {
Map<String, Object> commonClientConfigs = new HashMap<>();
Map<String, Object> producerOnlyConfigs = new HashMap<>();
Map<String, Object> consumerOnlyConfigs = new HashMap<>();
for (Map.Entry<String, ?> entry : configs.entrySet()) {
String key = entry.getKey();
if (key.startsWith(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX)) {
commonClientConfigs.put(key.substring(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX.length()), entry.getValue());
} else if (key.startsWith(REMOTE_LOG_METADATA_PRODUCER_PREFIX)) {
producerOnlyConfigs.put(key.substring(REMOTE_LOG_METADATA_PRODUCER_PREFIX.length()), entry.getValue());
} else if (key.startsWith(REMOTE_LOG_METADATA_CONSUMER_PREFIX)) {
consumerOnlyConfigs.put(key.substring(REMOTE_LOG_METADATA_CONSUMER_PREFIX.length()), entry.getValue());
}
}
HashMap<String, Object> allProducerConfigs = new HashMap<>(commonClientConfigs);
allProducerConfigs.putAll(producerOnlyConfigs);
producerProps = createProducerProps(allProducerConfigs);
HashMap<String, Object> allConsumerConfigs = new HashMap<>(commonClientConfigs);
allConsumerConfigs.putAll(consumerOnlyConfigs);
consumerProps = createConsumerProps(allConsumerConfigs);
}

Let me know if I'm reading this properly to create an issue, otherwise I may be missing something (is this supposed to be handled by https://issues.apache.org/jira/browse/KAFKA-15083). Many thanks!

cc @satishd @divijvaidya

@showuon
Copy link
Contributor Author

showuon commented Jun 30, 2023

@jeqo , thanks for raising it. Yes, I forgot to pass the bootstart.server as commonClientConfigs, so that it won't be passed into the producer/consumer. Are you available to file a small PR to fix it? I can work on that next week if you are busy. Thanks.

@jeqo
Copy link
Contributor

jeqo commented Jun 30, 2023

Sure!, created this: #13938

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core tiered-storage Pull requests associated with KIP-405 (Tiered Storage)
Projects
None yet
4 participants