From 921225d7d07ba238f1f2ccef62e885c6bd0b1c67 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sat, 11 May 2024 03:08:32 +0800 Subject: [PATCH 1/3] [fix] [broker] local metadata sync topic contains configuration events causing all operations stuck --- .../pulsar/broker/ServiceConfiguration.java | 8 ++ .../apache/pulsar/broker/PulsarService.java | 27 ++++- .../pulsar/broker/admin/impl/BrokersBase.java | 17 ++- .../PulsarMetadataEventSynchronizer.java | 3 +- ...licationWithConfigurationSyncTestBase.java | 54 +++++---- .../SyncConfigStore1ZKPerClusterTest.java | 72 ++++++++++++ ... => SyncConfigStore2ZKPerClusterTest.java} | 36 ++++-- .../SyncConfigStoreWrongConfigTest.java | 109 ++++++++++++++++++ .../pulsar/zookeeper/ZookeeperServerTest.java | 2 + 9 files changed, 289 insertions(+), 39 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStore1ZKPerClusterTest.java rename pulsar-broker/src/test/java/org/apache/pulsar/broker/service/{SyncConfigStoreTest.java => SyncConfigStore2ZKPerClusterTest.java} (76%) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStoreWrongConfigTest.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 2d2765287c0e0..1581b2a37e0dd 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -582,6 +582,14 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private String configurationMetadataSyncEventTopic = null; + @FieldContext( + dynamic = false, + category = CATEGORY_SERVER, + doc = "If you want to enable or disable the metadata synchronizer dynamically, this value should be true." + + "Enabled: Pulsar will initialize itself to update the metadata synchronizer dynamically." + ) + private boolean mayEnableMetadataSynchronizer = false; + @FieldContext( dynamic = true, doc = "Factory class-name to create topic with custom workflow" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index b23851a5ec464..dfea457aff24c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -80,6 +80,7 @@ import org.apache.bookkeeper.mledger.offload.Offloaders; import org.apache.bookkeeper.mledger.offload.OffloadersCache; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationService; @@ -867,10 +868,17 @@ public void start() throws PulsarServerException { coordinationService = new CoordinationServiceImpl(localMetadataStore); - if (config.isConfigurationStoreSeparated()) { - configMetadataSynchronizer = StringUtils.isNotBlank(config.getConfigurationMetadataSyncEventTopic()) - ? new PulsarMetadataEventSynchronizer(this, config.getConfigurationMetadataSyncEventTopic()) - : null; + configMetadataSynchronizer = StringUtils.isNotBlank(config.getConfigurationMetadataSyncEventTopic()) + ? new PulsarMetadataEventSynchronizer(this, config.getConfigurationMetadataSyncEventTopic()) + : null; + + // If "localMetadataStore" and "configurationMetadataStore" is the same object, "localMetadataSynchronizer" + // will receive both local metadata event and configuration metadata event, which is in-correct. + // In order to ensure that the two synchronizer receive events of interest to each other (in other words, + // ensure that different types of events are sent to different topics), create the single configuration + // metadata store. + if (config.isConfigurationStoreSeparated() || config.isMayEnableMetadataSynchronizer() + || localMetadataSynchronizer != null || configMetadataSynchronizer != null) { configurationMetadataStore = createConfigurationMetadataStore(configMetadataSynchronizer, openTelemetry.getOpenTelemetryService().getOpenTelemetry()); shouldShutdownConfigurationMetadataStore = true; @@ -1082,6 +1090,17 @@ public void waitUntilReadyForIncomingRequests() throws ExecutionException, Inter readyForIncomingRequestsFuture.get(); } + public Pair hasConditionOfDynamicUpdateConf(String confName) { + if ("configurationMetadataSyncEventTopic".equals(confName) || "metadataSyncEventTopic".equals(confName)) { + if (localMetadataStore == configurationMetadataStore) { + return Pair.of(false, String.format("Can not update conf %s dynamically, please enable" + + " mayEnableMetadataSynchronizer if you want to enable or disable the metadata synchronizer" + + " dynamically.", confName)); + } + } + return Pair.of(true, ""); + } + protected BrokerInterceptor newBrokerInterceptor() throws IOException { return BrokerInterceptors.load(config); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 4d0b598a8e4f1..16e403d3d090f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -48,6 +48,7 @@ import javax.ws.rs.container.Suspended; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService.State; @@ -203,7 +204,13 @@ public void updateDynamicConfiguration(@Suspended AsyncResponse asyncResponse, @PathParam("configName") String configName, @PathParam("configValue") String configValue) { validateSuperUserAccessAsync() - .thenCompose(__ -> persistDynamicConfigurationAsync(configName, configValue)) + .thenApply(__ -> { + Pair pair = pulsar().hasConditionOfDynamicUpdateConf(configName); + if (!pair.getLeft()) { + throw new RestException(Status.BAD_REQUEST, pair.getRight()); + } + return null; + }).thenCompose(__ -> persistDynamicConfigurationAsync(configName, configValue)) .thenAccept(__ -> { LOG.info("[{}] Updated Service configuration {}/{}", clientAppId(), configName, configValue); asyncResponse.resume(Response.ok().build()); @@ -227,7 +234,13 @@ public void deleteDynamicConfiguration( @Suspended AsyncResponse asyncResponse, @PathParam("configName") String configName) { validateSuperUserAccessAsync() - .thenCompose(__ -> internalDeleteDynamicConfigurationOnMetadataAsync(configName)) + .thenApply(__ -> { + Pair pair = pulsar().hasConditionOfDynamicUpdateConf(configName); + if (!pair.getLeft()) { + throw new RestException(Status.BAD_REQUEST, pair.getRight()); + } + return null; + }).thenCompose(__ -> internalDeleteDynamicConfigurationOnMetadataAsync(configName)) .thenAccept(__ -> { LOG.info("[{}] Successfully to delete dynamic configuration {}", clientAppId(), configName); asyncResponse.resume(Response.ok().build()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java index 8b2ebf200537e..b1a679e30b584 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java @@ -118,8 +118,9 @@ public String getClusterName() { private void publishAsync(MetadataEvent event, CompletableFuture future) { if (!isProducerStarted()) { - log.info("Producer is not started on {}, failed to publish {}", topicName, event); + log.warn("Producer is not started on {}, failed to publish {}", topicName, event); future.completeExceptionally(new IllegalStateException("producer is not started yet")); + return; } producer.newMessage().value(event).sendAsync().thenAccept(__ -> { log.info("successfully published metadata change event {}", event); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java index 9b4dd5192e1ec..ca44975a6c979 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java @@ -64,10 +64,7 @@ public abstract class GeoReplicationWithConfigurationSyncTestBase extends TestRe protected void startZKAndBK() throws Exception { // Start ZK. - brokerConfigZk1 = new ZookeeperServerTest(0); - brokerConfigZk1.start(); - brokerConfigZk2 = new ZookeeperServerTest(0); - brokerConfigZk2.start(); + tryInitBrokerConfigZK(); // Start BK. bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0); @@ -76,6 +73,32 @@ protected void startZKAndBK() throws Exception { bkEnsemble2.start(); } + protected void tryInitBrokerConfigZK() throws Exception { + brokerConfigZk1 = new ZookeeperServerTest(0); + brokerConfigZk1.start(); + brokerConfigZk2 = new ZookeeperServerTest(0); + brokerConfigZk2.start(); + } + + protected void stopZKAndBK() throws Exception { + if (bkEnsemble1 != null) { + bkEnsemble1.stop(); + bkEnsemble1 = null; + } + if (bkEnsemble2 != null) { + bkEnsemble2.stop(); + bkEnsemble2 = null; + } + if (brokerConfigZk1 != null) { + brokerConfigZk1.stop(); + brokerConfigZk1 = null; + } + if (brokerConfigZk2 != null) { + brokerConfigZk2.stop(); + brokerConfigZk2 = null; + } + } + protected void startBrokers() throws Exception { // Start brokers. setConfigDefaults(config1, cluster1, bkEnsemble1, brokerConfigZk1); @@ -162,7 +185,11 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName config.setWebServicePort(Optional.of(0)); config.setWebServicePortTls(Optional.of(0)); config.setMetadataStoreUrl("zk:127.0.0.1:" + bookkeeperEnsemble.getZookeeperPort()); - config.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" + brokerConfigZk.getZookeeperPort() + "/foo"); + if (brokerConfigZk != null) { + config.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" + brokerConfigZk.getZookeeperPort()); + } else { + config.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" + bookkeeperEnsemble.getZookeeperPort()); + } config.setBrokerDeleteInactiveTopicsEnabled(false); config.setBrokerDeleteInactiveTopicsFrequencySeconds(60); config.setBrokerShutdownTimeoutMs(0L); @@ -210,22 +237,7 @@ protected void cleanup() throws Exception { } // Stop ZK and BK. - if (bkEnsemble1 != null) { - bkEnsemble1.stop(); - bkEnsemble1 = null; - } - if (bkEnsemble2 != null) { - bkEnsemble2.stop(); - bkEnsemble2 = null; - } - if (brokerConfigZk1 != null) { - brokerConfigZk1.stop(); - brokerConfigZk1 = null; - } - if (brokerConfigZk2 != null) { - brokerConfigZk2.stop(); - brokerConfigZk2 = null; - } + stopZKAndBK(); // Reset configs. config1 = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStore1ZKPerClusterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStore1ZKPerClusterTest.java new file mode 100644 index 0000000000000..870c159701a7b --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStore1ZKPerClusterTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class SyncConfigStore1ZKPerClusterTest extends SyncConfigStore2ZKPerClusterTest { + + @Override + @BeforeClass(alwaysRun = true, timeOut = 300000) + public void setup() throws Exception { + super.setup(); + } + + @Override + @AfterClass(alwaysRun = true, timeOut = 300000) + public void cleanup() throws Exception { + super.cleanup(); + } + + @Override + protected void tryInitBrokerConfigZK() { + // Init nothing, the super class will use the local metadata ZK URL as config metadata ZK URL. + } + + @Override + protected void verifyMetadataStores() { + Awaitility.await().untilAsserted(() -> { + // Verify: config metadata store url is the same as local metadata store url. + assertFalse(pulsar1.getConfig().isConfigurationStoreSeparated()); + assertFalse(pulsar2.getConfig().isConfigurationStoreSeparated()); + // Verify: Pulsar initialized itself to update the metadata synchronizer dynamically. + assertTrue(pulsar1.hasConditionOfDynamicUpdateConf("configurationMetadataSyncEventTopic") + .getLeft()); + assertTrue(pulsar2.hasConditionOfDynamicUpdateConf("configurationMetadataSyncEventTopic") + .getLeft()); + assertTrue(pulsar1.hasConditionOfDynamicUpdateConf("metadataSyncEventTopic") + .getLeft()); + assertTrue(pulsar2.hasConditionOfDynamicUpdateConf("metadataSyncEventTopic") + .getLeft()); + }); + } + + @Test + public void testDynamicEnableConfigurationMetadataSyncEventTopic() throws Exception { + super.testDynamicEnableConfigurationMetadataSyncEventTopic(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStore2ZKPerClusterTest.java similarity index 76% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStoreTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStore2ZKPerClusterTest.java index 577725f96ed34..e1cf9491f9295 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStore2ZKPerClusterTest.java @@ -24,6 +24,7 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import java.time.Duration; import java.util.Arrays; import java.util.HashSet; import lombok.extern.slf4j.Slf4j; @@ -44,10 +45,10 @@ @Slf4j @Test(groups = "broker") -public class SyncConfigStoreTest extends GeoReplicationWithConfigurationSyncTestBase { +public class SyncConfigStore2ZKPerClusterTest extends GeoReplicationWithConfigurationSyncTestBase { - private static final String CONF_NAME_SYNC_EVENT_TOPIC = "configurationMetadataSyncEventTopic"; - private static final String SYNC_EVENT_TOPIC = TopicDomain.persistent.value() + "://" + SYSTEM_NAMESPACE + protected static final String CONF_NAME_SYNC_EVENT_TOPIC = "configurationMetadataSyncEventTopic"; + protected static final String SYNC_EVENT_TOPIC = TopicDomain.persistent.value() + "://" + SYSTEM_NAMESPACE + "/__sync_config_meta"; @Override @@ -58,6 +59,7 @@ public void setup() throws Exception { tenantInfo.setAllowedClusters(new HashSet<>(Arrays.asList(cluster1, cluster2))); admin1.tenants().createTenant(TopicName.get(SYNC_EVENT_TOPIC).getTenant(), tenantInfo); admin1.namespaces().createNamespace(TopicName.get(SYNC_EVENT_TOPIC).getNamespace()); + verifyMetadataStores(); } @Override @@ -66,23 +68,35 @@ public void cleanup() throws Exception { super.cleanup(); } + @Override protected void setConfigDefaults(ServiceConfiguration config, String clusterName, LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) { super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, brokerConfigZk); + config.setMayEnableMetadataSynchronizer(true); } - @Test - public void testDynamicEnableConfigurationMetadataSyncEventTopic() throws Exception { - // Verify the condition that supports synchronizer: the metadata store is a different one. + protected void verifyMetadataStores() { Awaitility.await().untilAsserted(() -> { - boolean shouldShutdownConfigurationMetadataStore = - WhiteboxImpl.getInternalState(pulsar1, "shouldShutdownConfigurationMetadataStore"); - assertTrue(shouldShutdownConfigurationMetadataStore); + // Verify: config metadata store url is not the same as local metadata store url. + assertTrue(pulsar1.getConfig().isConfigurationStoreSeparated()); + assertTrue(pulsar2.getConfig().isConfigurationStoreSeparated()); + // Verify: Pulsar initialized itself to update the metadata synchronizer dynamically. + assertTrue(pulsar1.hasConditionOfDynamicUpdateConf("configurationMetadataSyncEventTopic") + .getLeft()); + assertTrue(pulsar2.hasConditionOfDynamicUpdateConf("configurationMetadataSyncEventTopic") + .getLeft()); + assertTrue(pulsar1.hasConditionOfDynamicUpdateConf("metadataSyncEventTopic") + .getLeft()); + assertTrue(pulsar2.hasConditionOfDynamicUpdateConf("metadataSyncEventTopic") + .getLeft()); }); + } + @Test + public void testDynamicEnableConfigurationMetadataSyncEventTopic() throws Exception { // Verify the synchronizer will be created dynamically. admin1.brokers().updateDynamicConfiguration(CONF_NAME_SYNC_EVENT_TOPIC, SYNC_EVENT_TOPIC); - Awaitility.await().untilAsserted(() -> { + Awaitility.await().atMost(Duration.ofSeconds(3600)).untilAsserted(() -> { assertEquals(pulsar1.getConfig().getConfigurationMetadataSyncEventTopic(), SYNC_EVENT_TOPIC); PulsarMetadataEventSynchronizer synchronizer = WhiteboxImpl.getInternalState(pulsar1, "configMetadataSynchronizer"); @@ -100,7 +114,7 @@ public void testDynamicEnableConfigurationMetadataSyncEventTopic() throws Except // Verify the synchronizer will be closed dynamically. admin1.brokers().deleteDynamicConfiguration(CONF_NAME_SYNC_EVENT_TOPIC); - Awaitility.await().untilAsserted(() -> { + Awaitility.await().atMost(Duration.ofSeconds(3600)).untilAsserted(() -> { // The synchronizer that was started will be closed. assertEquals(synchronizerStarted.getState(), PulsarMetadataEventSynchronizer.State.Closed); assertTrue(synchronizerStarted.isClosingOrClosed()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStoreWrongConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStoreWrongConfigTest.java new file mode 100644 index 0000000000000..6486e12b02bcd --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStoreWrongConfigTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.apache.pulsar.zookeeper.ZookeeperServerTest; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/*** + * The test is used to test update the dynamic metadata store synchronizer will fail if the broker has not initialized + * itself to support yet. + */ +@Slf4j +@Test(groups = "broker") +public class SyncConfigStoreWrongConfigTest extends SyncConfigStore1ZKPerClusterTest { + + @Override + @BeforeClass(alwaysRun = true, timeOut = 300000) + public void setup() throws Exception { + super.setup(); + } + + @Override + @AfterClass(alwaysRun = true, timeOut = 300000) + public void cleanup() throws Exception { + super.cleanup(); + } + + @Override + protected void setConfigDefaults(ServiceConfiguration config, String clusterName, + LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) { + super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, brokerConfigZk); + config.setMayEnableMetadataSynchronizer(false); + } + + @Override + protected void verifyMetadataStores() { + Awaitility.await().untilAsserted(() -> { + // Verify: config metadata store url is the same as local metadata store url. + assertFalse(pulsar1.getConfig().isConfigurationStoreSeparated()); + assertFalse(pulsar2.getConfig().isConfigurationStoreSeparated()); + // Verify: Pulsar has not initialized itself to update the metadata synchronizer dynamically yet. + assertFalse(pulsar1.hasConditionOfDynamicUpdateConf("configurationMetadataSyncEventTopic") + .getLeft()); + assertFalse(pulsar2.hasConditionOfDynamicUpdateConf("configurationMetadataSyncEventTopic") + .getLeft()); + assertFalse(pulsar1.hasConditionOfDynamicUpdateConf("metadataSyncEventTopic") + .getLeft()); + assertFalse(pulsar2.hasConditionOfDynamicUpdateConf("metadataSyncEventTopic") + .getLeft()); + }); + } + + @Test + public void testDynamicEnableConfigurationMetadataSyncEventTopic() throws Exception { + // 1. update configurationMetadataSyncEventTopic. + try { + admin1.brokers().updateDynamicConfiguration("configurationMetadataSyncEventTopic", "123"); + fail("Expected an 400 error."); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("please enable mayEnableMetadataSynchronizer")); + } + // 2. update metadataSyncEventTopic. + try { + admin1.brokers().updateDynamicConfiguration("metadataSyncEventTopic", "123"); + fail("Expected an 400 error."); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("please enable mayEnableMetadataSynchronizer")); + } + // 3. delete configurationMetadataSyncEventTopic. + try { + admin1.brokers().deleteDynamicConfiguration("configurationMetadataSyncEventTopic"); + fail("Expected an 400 error."); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("please enable mayEnableMetadataSynchronizer")); + } + // 4. delete metadataSyncEventTopic. + try { + admin1.brokers().deleteDynamicConfiguration("metadataSyncEventTopic"); + fail("Expected an 400 error."); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("please enable mayEnableMetadataSynchronizer")); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/ZookeeperServerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/ZookeeperServerTest.java index 355d2a0b1dbe1..e2433794a686e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/ZookeeperServerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/ZookeeperServerTest.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import lombok.Getter; import org.apache.zookeeper.server.NIOServerCnxnFactory; import org.apache.zookeeper.server.ZooKeeperServer; import org.slf4j.Logger; @@ -29,6 +30,7 @@ public class ZookeeperServerTest implements Closeable { private final File zkTmpDir; + @Getter private ZooKeeperServer zks; private NIOServerCnxnFactory serverFactory; private int zkPort; From 20037ed8e47598e0a235140636b46e6dd883697b Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sat, 11 May 2024 16:12:13 +0800 Subject: [PATCH 2/3] address comments --- .../org/apache/pulsar/broker/ServiceConfiguration.java | 7 ++++--- .../main/java/org/apache/pulsar/broker/PulsarService.java | 6 +++--- .../broker/service/SyncConfigStore2ZKPerClusterTest.java | 2 +- .../broker/service/SyncConfigStoreWrongConfigTest.java | 2 +- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 1581b2a37e0dd..ef724717bd936 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -585,10 +585,11 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece @FieldContext( dynamic = false, category = CATEGORY_SERVER, - doc = "If you want to enable or disable the metadata synchronizer dynamically, this value should be true." - + "Enabled: Pulsar will initialize itself to update the metadata synchronizer dynamically." + doc = "Create a separate configuration metadata store object in memory even if the URL of the configuration" + + " metadata store is the same as the local metadata store. It is useful for some case, for" + + " example: to enable Metadata Synchronizer dynamically." ) - private boolean mayEnableMetadataSynchronizer = false; + private boolean forceUseSeparatedConfigurationStoreInMemory = false; @FieldContext( dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index dfea457aff24c..6267f4cba20d8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -877,7 +877,7 @@ public void start() throws PulsarServerException { // In order to ensure that the two synchronizer receive events of interest to each other (in other words, // ensure that different types of events are sent to different topics), create the single configuration // metadata store. - if (config.isConfigurationStoreSeparated() || config.isMayEnableMetadataSynchronizer() + if (config.isConfigurationStoreSeparated() || config.isForceUseSeparatedConfigurationStoreInMemory() || localMetadataSynchronizer != null || configMetadataSynchronizer != null) { configurationMetadataStore = createConfigurationMetadataStore(configMetadataSynchronizer, openTelemetry.getOpenTelemetryService().getOpenTelemetry()); @@ -1094,8 +1094,8 @@ public Pair hasConditionOfDynamicUpdateConf(String confName) { if ("configurationMetadataSyncEventTopic".equals(confName) || "metadataSyncEventTopic".equals(confName)) { if (localMetadataStore == configurationMetadataStore) { return Pair.of(false, String.format("Can not update conf %s dynamically, please enable" - + " mayEnableMetadataSynchronizer if you want to enable or disable the metadata synchronizer" - + " dynamically.", confName)); + + " forceUseSeparatedConfigurationStoreInMemory if you want to enable or disable the metadata" + + " synchronizer dynamically.", confName)); } } return Pair.of(true, ""); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStore2ZKPerClusterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStore2ZKPerClusterTest.java index e1cf9491f9295..34869fa121feb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStore2ZKPerClusterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStore2ZKPerClusterTest.java @@ -72,7 +72,7 @@ public void cleanup() throws Exception { protected void setConfigDefaults(ServiceConfiguration config, String clusterName, LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) { super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, brokerConfigZk); - config.setMayEnableMetadataSynchronizer(true); + config.setForceUseSeparatedConfigurationStoreInMemory(true); } protected void verifyMetadataStores() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStoreWrongConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStoreWrongConfigTest.java index 6486e12b02bcd..5a4b10168fe06 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStoreWrongConfigTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStoreWrongConfigTest.java @@ -54,7 +54,7 @@ public void cleanup() throws Exception { protected void setConfigDefaults(ServiceConfiguration config, String clusterName, LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) { super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, brokerConfigZk); - config.setMayEnableMetadataSynchronizer(false); + config.setForceUseSeparatedConfigurationStoreInMemory(false); } @Override From 8a59c501ccd8a8fcab351907a8a8d406b3b1b1a4 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 13 May 2024 15:03:14 +0800 Subject: [PATCH 3/3] address comment --- .../service/PulsarMetadataEventSynchronizer.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java index b1a679e30b584..e18ffca3ab759 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java @@ -156,8 +156,8 @@ private void startProducer() { State stateTransient = state; log.info("[{}] Closing the new producer because the synchronizer state is {}", prod, stateTransient); - CompletableFuture closeProducer = new CompletableFuture<>(); - closeResource(() -> prod.closeAsync(), closeProducer); + CompletableFuture closeProducer = new CompletableFuture<>(); + closeResource(prod::closeAsync, closeProducer); closeProducer.thenRun(() -> { log.info("[{}] Closed the new producer because the synchronizer state is {}", prod, stateTransient); @@ -221,11 +221,13 @@ private void startConsumer() { log.info("successfully created consumer {}", topicName); } else { State stateTransient = state; - log.info("[{}] Closing the new consumer because the synchronizer state is {}", stateTransient); - CompletableFuture closeConsumer = new CompletableFuture<>(); - closeResource(() -> consumer.closeAsync(), closeConsumer); + log.info("[{}] Closing the new consumer because the synchronizer state is {}", topicName, + stateTransient); + CompletableFuture closeConsumer = new CompletableFuture<>(); + closeResource(consumer::closeAsync, closeConsumer); closeConsumer.thenRun(() -> { - log.info("[{}] Closed the new consumer because the synchronizer state is {}", stateTransient); + log.info("[{}] Closed the new consumer because the synchronizer state is {}", topicName, + stateTransient); }); } }).exceptionally(ex -> {