Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] local metadata sync topic contains configuration events causing all operations stuck #22695

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,15 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private String configurationMetadataSyncEventTopic = null;

@FieldContext(
dynamic = false,
category = CATEGORY_SERVER,
doc = "Create a separate configuration metadata store object in memory even if the URL of the configuration"
+ " metadata store is the same as the local metadata store. It is useful for some case, for"
+ " example: to enable Metadata Synchronizer dynamically."
)
private boolean forceUseSeparatedConfigurationStoreInMemory = false;

@FieldContext(
dynamic = true,
doc = "Factory class-name to create topic with custom workflow"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.bookkeeper.mledger.offload.Offloaders;
import org.apache.bookkeeper.mledger.offload.OffloadersCache;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
Expand Down Expand Up @@ -784,10 +785,17 @@ public void start() throws PulsarServerException {

coordinationService = new CoordinationServiceImpl(localMetadataStore);

if (config.isConfigurationStoreSeparated()) {
configMetadataSynchronizer = StringUtils.isNotBlank(config.getConfigurationMetadataSyncEventTopic())
? new PulsarMetadataEventSynchronizer(this, config.getConfigurationMetadataSyncEventTopic())
: null;
configMetadataSynchronizer = StringUtils.isNotBlank(config.getConfigurationMetadataSyncEventTopic())
? new PulsarMetadataEventSynchronizer(this, config.getConfigurationMetadataSyncEventTopic())
: null;

// If "localMetadataStore" and "configurationMetadataStore" is the same object, "localMetadataSynchronizer"
// will receive both local metadata event and configuration metadata event, which is in-correct.
// In order to ensure that the two synchronizer receive events of interest to each other (in other words,
// ensure that different types of events are sent to different topics), create the single configuration
// metadata store.
if (config.isConfigurationStoreSeparated() || config.isForceUseSeparatedConfigurationStoreInMemory()
|| localMetadataSynchronizer != null || configMetadataSynchronizer != null) {
configurationMetadataStore = createConfigurationMetadataStore(configMetadataSynchronizer);
shouldShutdownConfigurationMetadataStore = true;
} else {
Expand Down Expand Up @@ -982,6 +990,17 @@ public void start() throws PulsarServerException {
}
}

public Pair<Boolean, String> hasConditionOfDynamicUpdateConf(String confName) {
if ("configurationMetadataSyncEventTopic".equals(confName) || "metadataSyncEventTopic".equals(confName)) {
if (localMetadataStore == configurationMetadataStore) {
return Pair.of(false, String.format("Can not update conf %s dynamically, please enable"
+ " forceUseSeparatedConfigurationStoreInMemory if you want to enable or disable the metadata"
+ " synchronizer dynamically.", confName));
}
}
return Pair.of(true, "");
}

protected BrokerInterceptor newBrokerInterceptor() throws IOException {
return BrokerInterceptors.load(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService.State;
Expand Down Expand Up @@ -203,7 +204,13 @@ public void updateDynamicConfiguration(@Suspended AsyncResponse asyncResponse,
@PathParam("configName") String configName,
@PathParam("configValue") String configValue) {
validateSuperUserAccessAsync()
.thenCompose(__ -> persistDynamicConfigurationAsync(configName, configValue))
.thenApply(__ -> {
Pair<Boolean, String> pair = pulsar().hasConditionOfDynamicUpdateConf(configName);
if (!pair.getLeft()) {
throw new RestException(Status.BAD_REQUEST, pair.getRight());
}
return null;
}).thenCompose(__ -> persistDynamicConfigurationAsync(configName, configValue))
.thenAccept(__ -> {
LOG.info("[{}] Updated Service configuration {}/{}", clientAppId(), configName, configValue);
asyncResponse.resume(Response.ok().build());
Expand All @@ -227,7 +234,13 @@ public void deleteDynamicConfiguration(
@Suspended AsyncResponse asyncResponse,
@PathParam("configName") String configName) {
validateSuperUserAccessAsync()
.thenCompose(__ -> internalDeleteDynamicConfigurationOnMetadataAsync(configName))
.thenApply(__ -> {
Pair<Boolean, String> pair = pulsar().hasConditionOfDynamicUpdateConf(configName);
if (!pair.getLeft()) {
throw new RestException(Status.BAD_REQUEST, pair.getRight());
}
return null;
}).thenCompose(__ -> internalDeleteDynamicConfigurationOnMetadataAsync(configName))
.thenAccept(__ -> {
LOG.info("[{}] Successfully to delete dynamic configuration {}", clientAppId(), configName);
asyncResponse.resume(Response.ok().build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ public String getClusterName() {

private void publishAsync(MetadataEvent event, CompletableFuture<Void> future) {
if (!isProducerStarted()) {
log.info("Producer is not started on {}, failed to publish {}", topicName, event);
log.warn("Producer is not started on {}, failed to publish {}", topicName, event);
future.completeExceptionally(new IllegalStateException("producer is not started yet"));
return;
}
producer.newMessage().value(event).sendAsync().thenAccept(__ -> {
log.info("successfully published metadata change event {}", event);
Expand Down Expand Up @@ -155,8 +156,8 @@ private void startProducer() {
State stateTransient = state;
log.info("[{}] Closing the new producer because the synchronizer state is {}", prod,
stateTransient);
CompletableFuture closeProducer = new CompletableFuture<>();
closeResource(() -> prod.closeAsync(), closeProducer);
CompletableFuture<Void> closeProducer = new CompletableFuture<>();
closeResource(prod::closeAsync, closeProducer);
closeProducer.thenRun(() -> {
log.info("[{}] Closed the new producer because the synchronizer state is {}", prod,
stateTransient);
Expand Down Expand Up @@ -220,11 +221,13 @@ private void startConsumer() {
log.info("successfully created consumer {}", topicName);
} else {
State stateTransient = state;
log.info("[{}] Closing the new consumer because the synchronizer state is {}", stateTransient);
CompletableFuture closeConsumer = new CompletableFuture<>();
closeResource(() -> consumer.closeAsync(), closeConsumer);
log.info("[{}] Closing the new consumer because the synchronizer state is {}", topicName,
stateTransient);
CompletableFuture<Void> closeConsumer = new CompletableFuture<>();
closeResource(consumer::closeAsync, closeConsumer);
closeConsumer.thenRun(() -> {
log.info("[{}] Closed the new consumer because the synchronizer state is {}", stateTransient);
log.info("[{}] Closed the new consumer because the synchronizer state is {}", topicName,
stateTransient);
});
}
}).exceptionally(ex -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,7 @@ public abstract class GeoReplicationWithConfigurationSyncTestBase extends TestRe

protected void startZKAndBK() throws Exception {
// Start ZK.
brokerConfigZk1 = new ZookeeperServerTest(0);
brokerConfigZk1.start();
brokerConfigZk2 = new ZookeeperServerTest(0);
brokerConfigZk2.start();
tryInitBrokerConfigZK();

// Start BK.
bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
Expand All @@ -76,6 +73,32 @@ protected void startZKAndBK() throws Exception {
bkEnsemble2.start();
}

protected void tryInitBrokerConfigZK() throws Exception {
brokerConfigZk1 = new ZookeeperServerTest(0);
brokerConfigZk1.start();
brokerConfigZk2 = new ZookeeperServerTest(0);
brokerConfigZk2.start();
}

protected void stopZKAndBK() throws Exception {
if (bkEnsemble1 != null) {
bkEnsemble1.stop();
bkEnsemble1 = null;
}
if (bkEnsemble2 != null) {
bkEnsemble2.stop();
bkEnsemble2 = null;
}
if (brokerConfigZk1 != null) {
brokerConfigZk1.stop();
brokerConfigZk1 = null;
}
if (brokerConfigZk2 != null) {
brokerConfigZk2.stop();
brokerConfigZk2 = null;
}
}

protected void startBrokers() throws Exception {
// Start brokers.
setConfigDefaults(config1, cluster1, bkEnsemble1, brokerConfigZk1);
Expand Down Expand Up @@ -162,7 +185,11 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName
config.setWebServicePort(Optional.of(0));
config.setWebServicePortTls(Optional.of(0));
config.setMetadataStoreUrl("zk:127.0.0.1:" + bookkeeperEnsemble.getZookeeperPort());
config.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" + brokerConfigZk.getZookeeperPort() + "/foo");
if (brokerConfigZk != null) {
config.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" + brokerConfigZk.getZookeeperPort());
} else {
config.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" + bookkeeperEnsemble.getZookeeperPort());
}
config.setBrokerDeleteInactiveTopicsEnabled(false);
config.setBrokerDeleteInactiveTopicsFrequencySeconds(60);
config.setBrokerShutdownTimeoutMs(0L);
Expand Down Expand Up @@ -210,22 +237,7 @@ protected void cleanup() throws Exception {
}

// Stop ZK and BK.
if (bkEnsemble1 != null) {
bkEnsemble1.stop();
bkEnsemble1 = null;
}
if (bkEnsemble2 != null) {
bkEnsemble2.stop();
bkEnsemble2 = null;
}
if (brokerConfigZk1 != null) {
brokerConfigZk1.stop();
brokerConfigZk1 = null;
}
if (brokerConfigZk2 != null) {
brokerConfigZk2.stop();
brokerConfigZk2 = null;
}
stopZKAndBK();

// Reset configs.
config1 = new ServiceConfiguration();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker")
public class SyncConfigStore1ZKPerClusterTest extends SyncConfigStore2ZKPerClusterTest {

@Override
@BeforeClass(alwaysRun = true, timeOut = 300000)
public void setup() throws Exception {
super.setup();
}

@Override
@AfterClass(alwaysRun = true, timeOut = 300000)
public void cleanup() throws Exception {
super.cleanup();
}

@Override
protected void tryInitBrokerConfigZK() {
// Init nothing, the super class will use the local metadata ZK URL as config metadata ZK URL.
}

@Override
protected void verifyMetadataStores() {
Awaitility.await().untilAsserted(() -> {
// Verify: config metadata store url is the same as local metadata store url.
assertFalse(pulsar1.getConfig().isConfigurationStoreSeparated());
assertFalse(pulsar2.getConfig().isConfigurationStoreSeparated());
// Verify: Pulsar initialized itself to update the metadata synchronizer dynamically.
assertTrue(pulsar1.hasConditionOfDynamicUpdateConf("configurationMetadataSyncEventTopic")
.getLeft());
assertTrue(pulsar2.hasConditionOfDynamicUpdateConf("configurationMetadataSyncEventTopic")
.getLeft());
assertTrue(pulsar1.hasConditionOfDynamicUpdateConf("metadataSyncEventTopic")
.getLeft());
assertTrue(pulsar2.hasConditionOfDynamicUpdateConf("metadataSyncEventTopic")
.getLeft());
});
}

@Test
public void testDynamicEnableConfigurationMetadataSyncEventTopic() throws Exception {
super.testDynamicEnableConfigurationMetadataSyncEventTopic();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -44,10 +45,10 @@

@Slf4j
@Test(groups = "broker")
public class SyncConfigStoreTest extends GeoReplicationWithConfigurationSyncTestBase {
public class SyncConfigStore2ZKPerClusterTest extends GeoReplicationWithConfigurationSyncTestBase {

private static final String CONF_NAME_SYNC_EVENT_TOPIC = "configurationMetadataSyncEventTopic";
private static final String SYNC_EVENT_TOPIC = TopicDomain.persistent.value() + "://" + SYSTEM_NAMESPACE
protected static final String CONF_NAME_SYNC_EVENT_TOPIC = "configurationMetadataSyncEventTopic";
protected static final String SYNC_EVENT_TOPIC = TopicDomain.persistent.value() + "://" + SYSTEM_NAMESPACE
+ "/__sync_config_meta";

@Override
Expand All @@ -58,6 +59,7 @@ public void setup() throws Exception {
tenantInfo.setAllowedClusters(new HashSet<>(Arrays.asList(cluster1, cluster2)));
admin1.tenants().createTenant(TopicName.get(SYNC_EVENT_TOPIC).getTenant(), tenantInfo);
admin1.namespaces().createNamespace(TopicName.get(SYNC_EVENT_TOPIC).getNamespace());
verifyMetadataStores();
}

@Override
Expand All @@ -66,23 +68,35 @@ public void cleanup() throws Exception {
super.cleanup();
}

@Override
protected void setConfigDefaults(ServiceConfiguration config, String clusterName,
LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, brokerConfigZk);
config.setForceUseSeparatedConfigurationStoreInMemory(true);
}

@Test
public void testDynamicEnableConfigurationMetadataSyncEventTopic() throws Exception {
// Verify the condition that supports synchronizer: the metadata store is a different one.
protected void verifyMetadataStores() {
Awaitility.await().untilAsserted(() -> {
boolean shouldShutdownConfigurationMetadataStore =
WhiteboxImpl.getInternalState(pulsar1, "shouldShutdownConfigurationMetadataStore");
assertTrue(shouldShutdownConfigurationMetadataStore);
// Verify: config metadata store url is not the same as local metadata store url.
assertTrue(pulsar1.getConfig().isConfigurationStoreSeparated());
assertTrue(pulsar2.getConfig().isConfigurationStoreSeparated());
// Verify: Pulsar initialized itself to update the metadata synchronizer dynamically.
assertTrue(pulsar1.hasConditionOfDynamicUpdateConf("configurationMetadataSyncEventTopic")
.getLeft());
assertTrue(pulsar2.hasConditionOfDynamicUpdateConf("configurationMetadataSyncEventTopic")
.getLeft());
assertTrue(pulsar1.hasConditionOfDynamicUpdateConf("metadataSyncEventTopic")
.getLeft());
assertTrue(pulsar2.hasConditionOfDynamicUpdateConf("metadataSyncEventTopic")
.getLeft());
});
}

@Test
public void testDynamicEnableConfigurationMetadataSyncEventTopic() throws Exception {
// Verify the synchronizer will be created dynamically.
admin1.brokers().updateDynamicConfiguration(CONF_NAME_SYNC_EVENT_TOPIC, SYNC_EVENT_TOPIC);
Awaitility.await().untilAsserted(() -> {
Awaitility.await().atMost(Duration.ofSeconds(3600)).untilAsserted(() -> {
assertEquals(pulsar1.getConfig().getConfigurationMetadataSyncEventTopic(), SYNC_EVENT_TOPIC);
PulsarMetadataEventSynchronizer synchronizer =
WhiteboxImpl.getInternalState(pulsar1, "configMetadataSynchronizer");
Expand All @@ -100,7 +114,7 @@ public void testDynamicEnableConfigurationMetadataSyncEventTopic() throws Except

// Verify the synchronizer will be closed dynamically.
admin1.brokers().deleteDynamicConfiguration(CONF_NAME_SYNC_EVENT_TOPIC);
Awaitility.await().untilAsserted(() -> {
Awaitility.await().atMost(Duration.ofSeconds(3600)).untilAsserted(() -> {
// The synchronizer that was started will be closed.
assertEquals(synchronizerStarted.getState(), PulsarMetadataEventSynchronizer.State.Closed);
assertTrue(synchronizerStarted.isClosingOrClosed());
Expand Down
Loading
Loading