Skip to content

Commit

Permalink
Fix update replication cluster but not update replicator. (#14570)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- committed Mar 17, 2022
1 parent b06dac6 commit bcbeb53
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 2 deletions.
Expand Up @@ -3039,6 +3039,10 @@ public void onUpdate(TopicPolicies policies) {
replicators.forEach((name, replicator) -> replicator.getRateLimiter()
.ifPresent(DispatchRateLimiter::updateDispatchRate));

if (policies.getReplicationClusters() != null) {
checkReplicationAndRetryOnFailure();
}

checkDeduplicationStatus();

preCreateSubscriptionForCompactionIfNeeded();
Expand Down
Expand Up @@ -717,7 +717,7 @@ public void testReplicatorProducerClosing() throws Exception {
assertNull(producer);
}

@Test(priority = 5, timeOut = 30000)
@Test(priority = 4, timeOut = 30000)
public void testReplicatorProducerName() throws Exception {
log.info("--- Starting ReplicatorTest::testReplicatorProducerName ---");
final String topicName = BrokerTestUtil.newUniqueName("persistent://pulsar/ns/testReplicatorProducerName");
Expand All @@ -739,7 +739,7 @@ public void testReplicatorProducerName() throws Exception {
});
}

@Test(priority = 5, timeOut = 30000)
@Test(priority = 4, timeOut = 30000)
public void testReplicatorProducerNameWithUserDefinedReplicatorPrefix() throws Exception {
log.info("--- Starting ReplicatorTest::testReplicatorProducerNameWithUserDefinedReplicatorPrefix ---");
final String topicName = BrokerTestUtil.newUniqueName(
Expand Down Expand Up @@ -1427,4 +1427,32 @@ public void testReplicatorWithFailedAck() throws Exception {

private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);

@Test
public void testWhenUpdateReplicationCluster() throws Exception {
log.info("--- testWhenUpdateReplicationCluster ---");
String namespace = "pulsar/ns2";
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
final TopicName dest = TopicName.get(
BrokerTestUtil.newUniqueName("persistent://" + namespace + "/testWhenUpdateReplicationCluster"));
@Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest);
log.info("--- Starting producer --- " + url1);

producer1.produce(2);

PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(dest.toString(), false)
.getNow(null).get();
Awaitility.await().untilAsserted(() -> {
assertTrue(topic.getReplicators().containsKey("r2"));
});

admin1.topics().setReplicationClusters(dest.toString(), Lists.newArrayList("r1"));

Awaitility.await().untilAsserted(() -> {
Set<String> replicationClusters = admin1.topics().getReplicationClusters(dest.toString(), false);
assertTrue(replicationClusters != null && replicationClusters.size() == 1);
assertTrue(topic.getReplicators().isEmpty());
});
}
}

0 comments on commit bcbeb53

Please sign in to comment.