Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix update replication cluster but not update replicator. #14570

Merged
merged 18 commits into from Mar 17, 2022
Expand Up @@ -3039,6 +3039,10 @@ public void onUpdate(TopicPolicies policies) {
replicators.forEach((name, replicator) -> replicator.getRateLimiter()
.ifPresent(DispatchRateLimiter::updateDispatchRate));

if (policies.getReplicationClusters() != null) {
checkReplicationAndRetryOnFailure();
}

checkDeduplicationStatus();

preCreateSubscriptionForCompactionIfNeeded();
Expand Down
Expand Up @@ -717,7 +717,7 @@ public void testReplicatorProducerClosing() throws Exception {
assertNull(producer);
}

@Test(priority = 5, timeOut = 30000)
@Test(priority = 4, timeOut = 30000)
public void testReplicatorProducerName() throws Exception {
log.info("--- Starting ReplicatorTest::testReplicatorProducerName ---");
final String topicName = BrokerTestUtil.newUniqueName("persistent://pulsar/ns/testReplicatorProducerName");
Expand All @@ -739,7 +739,7 @@ public void testReplicatorProducerName() throws Exception {
});
}

@Test(priority = 5, timeOut = 30000)
@Test(priority = 4, timeOut = 30000)
public void testReplicatorProducerNameWithUserDefinedReplicatorPrefix() throws Exception {
log.info("--- Starting ReplicatorTest::testReplicatorProducerNameWithUserDefinedReplicatorPrefix ---");
final String topicName = BrokerTestUtil.newUniqueName(
Expand Down Expand Up @@ -1427,4 +1427,32 @@ public void testReplicatorWithFailedAck() throws Exception {

private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);

@Test
public void testWhenUpdateReplicationCluster() throws Exception {
log.info("--- testWhenUpdateReplicationCluster ---");
String namespace = "pulsar/ns2";
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
final TopicName dest = TopicName.get(
BrokerTestUtil.newUniqueName("persistent://" + namespace + "/testWhenUpdateReplicationCluster"));
@Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest);
log.info("--- Starting producer --- " + url1);

producer1.produce(2);

PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(dest.toString(), false)
.getNow(null).get();
Awaitility.await().untilAsserted(() -> {
assertTrue(topic.getReplicators().containsKey("r2"));
});

admin1.topics().setReplicationClusters(dest.toString(), Lists.newArrayList("r1"));

Awaitility.await().untilAsserted(() -> {
Set<String> replicationClusters = admin1.topics().getReplicationClusters(dest.toString(), false);
assertTrue(replicationClusters != null && replicationClusters.size() == 1);
assertTrue(topic.getReplicators().isEmpty());
});
}
}