Skip to content

Commit

Permalink
Use Awaitility instead Thread.sleep in InactiveTopicDeleteTest.java (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
315157973 authored Nov 21, 2020
1 parent 4ad150a commit 1b790ba
Showing 1 changed file with 74 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Sets;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
Expand All @@ -36,6 +37,7 @@
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -82,14 +84,12 @@ public void testDeleteWhenNoSubscriptions() throws Exception {
consumer.close();
producer.close();

Thread.sleep(2000);
Assert.assertTrue(admin.topics().getList("prop/ns-abc")
.contains(topic));
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertTrue(admin.topics().getList("prop/ns-abc")
.contains(topic)));

admin.topics().deleteSubscription(topic, "sub");
Thread.sleep(2000);
Assert.assertFalse(admin.topics().getList("prop/ns-abc")
.contains(topic));
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertFalse(admin.topics().getList("prop/ns-abc")
.contains(topic)));
}

@Test
Expand All @@ -104,14 +104,14 @@ public void testDeleteAndCleanZkNode() throws Exception {
pulsarClient.newProducer().topic(topic).create().close();
pulsarClient.newConsumer().topic(topic).subscriptionName("sub").subscribe().close();

Thread.sleep(2000);
Assert.assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc")
.contains(topic));
Awaitility.await().atMost(2, TimeUnit.SECONDS)
.untilAsserted(() -> Assert.assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc")
.contains(topic)));

admin.topics().deleteSubscription(topic, "sub");
Thread.sleep(2000);
Assert.assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc")
.contains(topic));
Awaitility.await().atMost(2, TimeUnit.SECONDS)
.untilAsserted(() -> Assert.assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc")
.contains(topic)));
}

@Test
Expand Down Expand Up @@ -162,13 +162,14 @@ public void testNotEnabledDeleteZkNode() throws Exception {
pulsarClient.newConsumer().topic(topic).subscriptionName("sub").subscribe().close();
pulsarClient.newConsumer().topic(topic2).subscriptionName("sub2").subscribe().close();

Thread.sleep(2000);
Awaitility.await().atMost(2, TimeUnit.SECONDS)
.untilAsserted(() -> Assert.assertTrue(admin.topics().getList(namespace).contains(topic2)));
Assert.assertTrue(admin.topics().getPartitionedTopicList(namespace).contains(topic));
Assert.assertTrue(admin.topics().getList(namespace).contains(topic2));

admin.topics().deleteSubscription(topic, "sub");
admin.topics().deleteSubscription(topic2, "sub2");
Thread.sleep(2000);
Awaitility.await().atMost(2, TimeUnit.SECONDS)
.untilAsserted(() -> Assert.assertFalse(admin.topics().getList(namespace).contains(topic2)));
Assert.assertTrue(admin.topics().getPartitionedTopicList(namespace).contains(topic));
// BrokerDeleteInactivePartitionedTopicMetaDataEnabled is not enabled, so only NonPartitionedTopic will be cleaned
Assert.assertFalse(admin.topics().getList(namespace).contains(topic2));
Expand Down Expand Up @@ -213,27 +214,24 @@ public void testTopicPolicyUpdateAndClean() throws Exception {

InactiveTopicPolicies policies;
//wait for zk
while (true) {
policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
if (policies.isDeleteWhileInactive()) {
break;
}
Thread.sleep(1000);
}
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
-> {
InactiveTopicPolicies temp = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
return temp.isDeleteWhileInactive();
});
policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;

Assert.assertTrue(policies.isDeleteWhileInactive());
assertEquals(policies.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_no_subscriptions);
assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
assertEquals(policies, admin.namespaces().getInactiveTopicPolicies(namespace));

admin.namespaces().removeInactiveTopicPolicies(namespace);
while (true) {
Thread.sleep(500);
policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
if (policies.getMaxInactiveDurationSeconds() == 1000) {
break;
}
}
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
-> {
InactiveTopicPolicies temp = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
return temp.getMaxInactiveDurationSeconds() == 1000;
});
assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies
, defaultPolicy);

Expand All @@ -244,13 +242,11 @@ public void testTopicPolicyUpdateAndClean() throws Exception {
assertEquals(policies, admin.namespaces().getInactiveTopicPolicies(namespace2));

admin.namespaces().removeInactiveTopicPolicies(namespace2);
while (true) {
Thread.sleep(500);
policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic2, false).get().get()).inactiveTopicPolicies;
if (policies.getMaxInactiveDurationSeconds() == 1000) {
break;
}
}
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
-> {
InactiveTopicPolicies temp = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
return temp.getMaxInactiveDurationSeconds() == 1000;
});
assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic2, false).get().get()).inactiveTopicPolicies
, defaultPolicy);
}
Expand Down Expand Up @@ -297,14 +293,10 @@ public void testDeleteWhenNoSubscriptionsWithMultiConfig() throws Exception {
admin.namespaces().setInactiveTopicPolicies(namespace2, inactiveTopicPolicies);

//wait for zk
while (true) {
InactiveTopicPolicies policies = ((PersistentTopic) pulsar.getBrokerService()
.getTopic(topic, false).get().get()).inactiveTopicPolicies;
if (policies.isDeleteWhileInactive()) {
break;
}
Thread.sleep(100);
}
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
InactiveTopicPolicies temp = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
return temp.isDeleteWhileInactive();
});

// topic should still exist
Thread.sleep(2000);
Expand All @@ -314,14 +306,14 @@ public void testDeleteWhenNoSubscriptionsWithMultiConfig() throws Exception {

// no backlog, trigger delete_when_subscriptions_caught_up
admin.topics().skipAllMessages(topic2, topicToSub.remove(topic2));
Thread.sleep(2000);
Assert.assertFalse(admin.topics().getList(namespace2).contains(topic2));
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
-> Assert.assertFalse(admin.topics().getList(namespace2).contains(topic2)));
// delete subscription, trigger delete_when_no_subscriptions
for (Map.Entry<String, String> entry : topicToSub.entrySet()) {
admin.topics().deleteSubscription(entry.getKey(), entry.getValue());
}
Thread.sleep(2000);
Assert.assertFalse(admin.topics().getList(namespace).contains(topic));
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> Assert.assertFalse(admin.topics().getList(namespace).contains(topic)));
Assert.assertFalse(admin.topics().getList(namespace3).contains(topic3));
}

Expand Down Expand Up @@ -354,9 +346,8 @@ public void testDeleteWhenNoBacklogs() throws Exception {
.contains(topic));

admin.topics().skipAllMessages(topic, "sub");
Thread.sleep(2000);
Assert.assertFalse(admin.topics().getList("prop/ns-abc")
.contains(topic));
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> Assert.assertFalse(admin.topics().getList("prop/ns-abc").contains(topic)));
}

@Test
Expand Down Expand Up @@ -393,9 +384,8 @@ public void testTopicLevelInActiveTopicApi() throws Exception {
admin.topics().createPartitionedTopic(topicName, 3);
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close();
TopicName topic = TopicName.get(topicName);
while (!pulsar.getTopicPoliciesService().cacheIsInitialized(topic)) {
Thread.sleep(500);
}
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
-> pulsar.getTopicPoliciesService().cacheIsInitialized(topic));

InactiveTopicPolicies inactiveTopicPolicies = admin.topics().getInactiveTopicPolicies(topicName);
assertNull(inactiveTopicPolicies);
Expand All @@ -406,21 +396,13 @@ public void testTopicLevelInActiveTopicApi() throws Exception {
policies.setMaxInactiveDurationSeconds(10);
admin.topics().setInactiveTopicPolicies(topicName, policies);

for (int i = 0; i < 50; i++) {
if (admin.topics().getInactiveTopicPolicies(topicName) != null) {
break;
}
Thread.sleep(100);
}
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
-> admin.topics().getInactiveTopicPolicies(topicName) != null);
assertEquals(admin.topics().getInactiveTopicPolicies(topicName), policies);
admin.topics().removeInactiveTopicPolicies(topicName);
for (int i = 0; i < 50; i++) {
if (admin.topics().getInactiveTopicPolicies(topicName) == null) {
break;
}
Thread.sleep(100);
}
assertNull(admin.topics().getInactiveTopicPolicies(topicName));

Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
-> assertNull(admin.topics().getInactiveTopicPolicies(topicName)));
}

@Test(timeOut = 30000)
Expand Down Expand Up @@ -461,12 +443,8 @@ public void testTopicLevelInactivePolicyUpdateAndClean() throws Exception {
admin.topics().setInactiveTopicPolicies(topic3, inactiveTopicPolicies);

//wait for cache
for (int i = 0; i < 50; i++) {
if (admin.topics().getInactiveTopicPolicies(topic) != null) {
break;
}
Thread.sleep(100);
}
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
-> admin.topics().getInactiveTopicPolicies(topic) != null);
InactiveTopicPolicies policies = ((PersistentTopic) pulsar.getBrokerService()
.getTopic(topic, false).get().get()).inactiveTopicPolicies;
Assert.assertTrue(policies.isDeleteWhileInactive());
Expand All @@ -477,9 +455,9 @@ public void testTopicLevelInactivePolicyUpdateAndClean() throws Exception {
admin.topics().removeInactiveTopicPolicies(topic);
//Only the broker-level policies is set, so after removing the topic-level policies
// , the topic will use the broker-level policies
Thread.sleep(1000);
assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies
, defaultPolicy);
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
-> assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies
, defaultPolicy));

policies = ((PersistentTopic)pulsar.getBrokerService().getTopic(topic2,false).get().get()).inactiveTopicPolicies;
Assert.assertTrue(policies.isDeleteWhileInactive());
Expand All @@ -491,14 +469,20 @@ public void testTopicLevelInactivePolicyUpdateAndClean() throws Exception {
// , the topic will use the namespace level policies
admin.namespaces().setInactiveTopicPolicies(namespace, inactiveTopicPolicies);
//wait for zk
Thread.sleep(1000);
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> {
InactiveTopicPolicies tempPolicies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false)
.get().get()).inactiveTopicPolicies;
return inactiveTopicPolicies.equals(tempPolicies);
});
admin.topics().removeInactiveTopicPolicies(topic2);
// The cache has been updated, but the system-event may not be consumed yet
// ,so wait for topic-policies update event
Thread.sleep(1000);
InactiveTopicPolicies nsPolicies = ((PersistentTopic) pulsar.getBrokerService()
.getTopic(topic2, false).get().get()).inactiveTopicPolicies;
assertEquals(nsPolicies.getMaxInactiveDurationSeconds(), 999);
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> {
InactiveTopicPolicies nsPolicies = ((PersistentTopic) pulsar.getBrokerService()
.getTopic(topic2, false).get().get()).inactiveTopicPolicies;
assertEquals(nsPolicies.getMaxInactiveDurationSeconds(), 999);
});

}

@Test(timeOut = 30000)
Expand All @@ -509,8 +493,6 @@ public void testDeleteWhenNoSubscriptionsWithTopicLevelPolicies() throws Excepti
conf.setBrokerDeleteInactiveTopicsEnabled(true);
conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
super.baseSetup();
//wait for cache init
Thread.sleep(2000);
final String topic = "persistent://prop/ns-abc/test-" + UUID.randomUUID();
final String topic2 = "persistent://prop/ns-abc/test-" + UUID.randomUUID();
final String topic3 = "persistent://prop/ns-abc/test-" + UUID.randomUUID();
Expand All @@ -529,6 +511,9 @@ public void testDeleteWhenNoSubscriptionsWithTopicLevelPolicies() throws Excepti
producer.close();
Thread.sleep(1);
}
//wait for cache init
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
-> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic3)));
// "topic" use delete_when_no_subscriptions, "topic2" use delete_when_subscriptions_caught_up
// "topic3" use default:delete_when_no_subscriptions
InactiveTopicPolicies inactiveTopicPolicies =
Expand All @@ -538,13 +523,8 @@ public void testDeleteWhenNoSubscriptionsWithTopicLevelPolicies() throws Excepti
admin.topics().setInactiveTopicPolicies(topic2, inactiveTopicPolicies);

//wait for update
for (int i = 0; i < 50; i++) {
if (admin.topics().getInactiveTopicPolicies(topic2) != null) {
break;
}
Thread.sleep(100);
}

Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
-> admin.topics().getInactiveTopicPolicies(topic2) != null);
// topic should still exist
Thread.sleep(2000);
Assert.assertTrue(admin.topics().getList(namespace).contains(topic));
Expand All @@ -553,14 +533,14 @@ public void testDeleteWhenNoSubscriptionsWithTopicLevelPolicies() throws Excepti

// no backlog, trigger delete_when_subscriptions_caught_up
admin.topics().skipAllMessages(topic2, topicToSub.remove(topic2));
Thread.sleep(2000);
Assert.assertFalse(admin.topics().getList(namespace).contains(topic2));
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
-> Assert.assertFalse(admin.topics().getList(namespace).contains(topic2)));
// delete subscription, trigger delete_when_no_subscriptions
for (Map.Entry<String, String> entry : topicToSub.entrySet()) {
admin.topics().deleteSubscription(entry.getKey(), entry.getValue());
}
Thread.sleep(2000);
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
-> Assert.assertFalse(admin.topics().getList(namespace).contains(topic3)));
Assert.assertFalse(admin.topics().getList(namespace).contains(topic));
Assert.assertFalse(admin.topics().getList(namespace).contains(topic3));
}
}

0 comments on commit 1b790ba

Please sign in to comment.