Skip to content

Commit

Permalink
Add unit test for replication resumption on backlog exceeded
Browse files Browse the repository at this point in the history
  • Loading branch information
Nozomi Kurihara committed Feb 17, 2017
1 parent b8a99ea commit c6afe6e
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 3 deletions.
Expand Up @@ -16,6 +16,7 @@
package com.yahoo.pulsar.broker.service;

import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand Down Expand Up @@ -56,11 +57,15 @@
import com.yahoo.pulsar.client.api.MessageBuilder;
import com.yahoo.pulsar.client.api.Producer;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.client.impl.PulsarClientImpl;
import com.yahoo.pulsar.client.impl.ProducerImpl;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.policies.data.BacklogQuota;
import com.yahoo.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
import com.yahoo.pulsar.common.policies.data.PersistentTopicStats;
import com.yahoo.pulsar.common.policies.data.ReplicatorStats;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;

Expand Down Expand Up @@ -165,7 +170,7 @@ public Void call() throws Exception {

// Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
}

@Test
public void testConcurrentReplicator() throws Exception {

Expand All @@ -176,7 +181,7 @@ public void testConcurrentReplicator() throws Exception {
conf.setStatsInterval(0, TimeUnit.SECONDS);
Producer producer = PulsarClient.create(url1.toString(), conf).createProducer(dest.toString());
producer.close();

PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(dest.toString()).get();

PulsarClientImpl pulsarClient = spy((PulsarClientImpl) pulsar1.getBrokerService().getReplicationClient("r3"));
Expand Down Expand Up @@ -608,7 +613,7 @@ public Void call() throws Exception {
}
}
}

/**
* It verifies that: if it fails while removing replicator-cluster-cursor: it should not restart the replicator and
* it should have cleaned up from the list
Expand Down Expand Up @@ -671,6 +676,71 @@ public void testReplicatorProducerClosing() throws Exception {
assertNull(producer);
}

/**
* Issue #199
*
* It verifies that: if the remote cluster reaches backlog quota limit, replicator temporarily stops and once the
* backlog drains it should resume replication.
*
* @throws Exception
*/

@Test(enabled = true, priority = -1)
public void testResumptionAfterBacklogRelaxed() throws Exception {

List<RetentionPolicy> policies = Lists.newArrayList();
policies.add(RetentionPolicy.producer_exception);
policies.add(RetentionPolicy.producer_request_hold);

for (RetentionPolicy policy : policies) {

DestinationName dest = DestinationName.get(String.format("persistent://pulsar/global/ns1/%s", policy));

// Producer on r1
MessageProducer producer1 = new MessageProducer(url1, dest);

// Consumer on r1
MessageConsumer consumer1 = new MessageConsumer(url1, dest);

// Consumer on r2
MessageConsumer consumer2 = new MessageConsumer(url2, dest);

// Replicator for r1 -> r2
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString());
PersistentReplicator replicator = topic.getPersistentReplicator("r2");

// Restrict backlog quota limit to 1
admin1.namespaces().setBacklogQuota("pulsar/global/ns1", new BacklogQuota(1, policy));

// Produce a message to r1, then it will be replicated to r2 and fulfill the backlog.
producer1.produce(1);
consumer1.receive(1);
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);

// Produce 9 messages to r1, then it will be pended because of the backlog limit excess
producer1.produce(9);
consumer1.receive(9);
Thread.sleep(1000L);
assertEquals(replicator.getStats().replicationBacklog, 9);

// Relax backlog quota limit to 1G
admin1.namespaces().setBacklogQuota("pulsar/global/ns1", new BacklogQuota(1024 * 1024 * 1024, policy));
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);

// The messages should be replicated to r2
assertEquals(replicator.getStats().replicationBacklog, 0);
consumer2.receive(1);
consumer2.receive(9);
if (!consumer2.drained()) {
throw new Exception("consumer2 - unexpected message in queue");
}

producer1.close();
consumer1.close();
consumer2.close();
}
}

private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);

}
Expand Up @@ -77,6 +77,8 @@ public class ReplicatorTestBase {

ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;

// Default frequency
public int getBrokerServicePurgeInactiveFrequency() {
return 60;
Expand Down Expand Up @@ -111,6 +113,7 @@ void setup() throws Exception {
config1.setBrokerServicePurgeInactiveFrequencyInSeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config1.setBrokerServicePort(PortManager.nextFreePort());
config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
pulsar1 = new PulsarService(config1);
pulsar1.start();
ns1 = pulsar1.getBrokerService();
Expand All @@ -135,6 +138,7 @@ void setup() throws Exception {
config2.setBrokerServicePurgeInactiveFrequencyInSeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config2.setBrokerServicePort(PortManager.nextFreePort());
config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
pulsar2 = new PulsarService(config2);
pulsar2.start();
ns2 = pulsar2.getBrokerService();
Expand Down

0 comments on commit c6afe6e

Please sign in to comment.