Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix setReplicatedSubscriptionStatus incorrect behavior #21510

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,12 @@ public boolean setReplicated(boolean replicated) {

if (this.cursor != null) {
if (replicated) {
return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
if (!config.isEnableReplicatedSubscriptions()) {
log.warn("[{}][{}] Failed set replicated subscription status to {}, please enable the "
+ "configuration enableReplicatedSubscriptions", topicName, subName, replicated);
} else {
return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
}
} else {
return this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
Expand All @@ -50,11 +51,13 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -621,6 +624,27 @@ public void testReplicatedSubscriptionRestApi2() throws Exception {
String.format("numReceivedMessages2 (%d) should be less than %d", numReceivedMessages2, numMessages));
}

@Test(timeOut = 30000)
public void testReplicatedSubscriptionRestApi3() throws Exception {
final String namespace = BrokerTestUtil.newUniqueName("geo/replicatedsubscription");
final String topicName = "persistent://" + namespace + "/topic-rest-api3";
final String subName = "sub";
admin4.tenants().createTenant("geo",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid4"), Sets.newHashSet(cluster1, cluster4)));
admin4.namespaces().createNamespace(namespace);
admin4.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster4));
admin4.topics().createPartitionedTopic(topicName, 2);

@Cleanup
final PulsarClient client4 = PulsarClient.builder().serviceUrl(url4.toString())
.statsInterval(0, TimeUnit.SECONDS).build();

Consumer<byte[]> consumer4 = client4.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
Assert.expectThrows(PulsarAdminException.class, () ->
admin4.topics().setReplicatedSubscriptionStatus(topicName, subName, true));
consumer4.close();
}

/**
* Tests replicated subscriptions when replicator producer is closed
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ public abstract class ReplicatorTestBase extends TestRetrySupport {
PulsarAdmin admin3;
LocalBookkeeperEnsemble bkEnsemble3;

URL url4;
URL urlTls4;
ServiceConfiguration config4 = new ServiceConfiguration();
PulsarService pulsar4;
PulsarAdmin admin4;
LocalBookkeeperEnsemble bkEnsemble4;

ZookeeperServerTest globalZkS;

ExecutorService executor;
Expand Down Expand Up @@ -111,6 +118,7 @@ public abstract class ReplicatorTestBase extends TestRetrySupport {
protected final String cluster1 = "r1";
protected final String cluster2 = "r2";
protected final String cluster3 = "r3";
protected final String cluster4 = "r4";

// Default frequency
public int getBrokerServicePurgeInactiveFrequency() {
Expand Down Expand Up @@ -178,6 +186,21 @@ protected void setup() throws Exception {
urlTls3 = new URL(pulsar3.getWebServiceAddressTls());
admin3 = PulsarAdmin.builder().serviceHttpUrl(url3.toString()).build();

// Start region 4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Start region 4
// Start region 4, the Broker of cluster-4 will disable the feature "ReplicatedSubscriptions"


// Start zk & bks
bkEnsemble4 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
bkEnsemble4.start();

setConfig4DefaultValue();
pulsar4 = new PulsarService(config4);
pulsar4.start();

url4 = new URL(pulsar4.getWebServiceAddress());
urlTls4 = new URL(pulsar4.getWebServiceAddressTls());
admin4 = PulsarAdmin.builder().serviceHttpUrl(url4.toString()).build();


// Provision the global namespace
admin1.clusters().createCluster(cluster1, ClusterData.builder()
.serviceUrl(url1.toString())
Expand Down Expand Up @@ -230,6 +253,23 @@ protected void setup() throws Exception {
.brokerClientTlsTrustStorePassword(keyStorePassword)
.brokerClientTlsTrustStoreType(keyStoreType)
.build());
admin4.clusters().createCluster(cluster4, ClusterData.builder()
.serviceUrl(url4.toString())
.serviceUrlTls(urlTls4.toString())
.brokerServiceUrl(pulsar4.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar4.getBrokerServiceUrlTls())
.brokerClientTlsEnabled(true)
.brokerClientCertificateFilePath(clientCertFilePath)
.brokerClientKeyFilePath(clientKeyFilePath)
.brokerClientTrustCertsFilePath(caCertFilePath)
.brokerClientTlsEnabledWithKeyStore(tlsWithKeyStore)
.brokerClientTlsKeyStore(clientKeyStorePath)
.brokerClientTlsKeyStorePassword(keyStorePassword)
.brokerClientTlsKeyStoreType(keyStoreType)
.brokerClientTlsTrustStore(clientTrustStorePath)
.brokerClientTlsTrustStorePassword(keyStorePassword)
.brokerClientTlsTrustStoreType(keyStoreType)
.build());

admin1.tenants().createTenant("pulsar",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r2", "r3")));
Expand Down Expand Up @@ -257,7 +297,7 @@ protected void setup() throws Exception {
}

public void setConfig3DefaultValue() {
setConfigDefaults(config3, "r3", bkEnsemble3);
setConfigDefaults(config3, cluster3, bkEnsemble3);
config3.setTlsEnabled(true);
}

Expand All @@ -269,6 +309,11 @@ public void setConfig2DefaultValue() {
setConfigDefaults(config2, cluster2, bkEnsemble2);
}

public void setConfig4DefaultValue() {
setConfigDefaults(config4, cluster4, bkEnsemble4);
config4.setEnableReplicatedSubscriptions(false);
}

private void setConfigDefaults(ServiceConfiguration config, String clusterName,
LocalBookkeeperEnsemble bookkeeperEnsemble) {
config.setClusterName(clusterName);
Expand Down Expand Up @@ -316,6 +361,11 @@ public void resetConfig3() {
setConfig3DefaultValue();
}

public void resetConfig4() {
config4 = new ServiceConfiguration();
setConfig4DefaultValue();
}

private int inSec(int time, TimeUnit unit) {
return (int) TimeUnit.SECONDS.convert(time, unit);
}
Expand All @@ -332,7 +382,11 @@ protected void cleanup() throws Exception {
admin1.close();
admin2.close();
admin3.close();
admin4.close();

if (pulsar4 != null) {
pulsar4.close();
}
if (pulsar3 != null) {
pulsar3.close();
}
Expand All @@ -346,11 +400,13 @@ protected void cleanup() throws Exception {
bkEnsemble1.stop();
bkEnsemble2.stop();
bkEnsemble3.stop();
bkEnsemble4.stop();
globalZkS.stop();

resetConfig1();
resetConfig2();
resetConfig3();
resetConfig4();
}

static class MessageProducer implements AutoCloseable {
Expand Down
Loading