Skip to content

Commit

Permalink
Update BrokerServiceTest.java
Browse files Browse the repository at this point in the history
- more tests
- test results
  • Loading branch information
180254 committed May 14, 2024
1 parent e429231 commit 20050a5
Showing 1 changed file with 264 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,46 +18,13 @@
*/
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN;
import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_LOG;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
Expand All @@ -79,20 +46,12 @@
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
Expand All @@ -102,10 +61,8 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.*;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.compaction.Compactor;
Expand All @@ -121,6 +78,21 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.*;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN;
import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_LOG;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.*;
import static org.testng.Assert.*;

@Slf4j
@Test(groups = "broker")
public class BrokerServiceTest extends BrokerTestBase {
Expand Down Expand Up @@ -1767,6 +1739,11 @@ public void testMetricsNonPersistentTopicLoadFails() throws Exception {
admin.namespaces().deleteNamespace(namespace);
}

/*
v3.0.1: ok
v3.0.2: fails
branch-3.0 (2da571e): fails
*/
@Test
public void test22657_1() throws Exception {
final String ns = "prop/ns-test";
Expand Down Expand Up @@ -1817,9 +1794,14 @@ public void test22657_1() throws Exception {

@DataProvider(name = "max_unacked_data")
public static Object[][] max_unacked_data() {
return new Object[][]{{1, 1}, {1, 2}, {1, 5}, {2, 2}, {2, 3}, {4, 4}, {4, 8},{10,20}};
return new Object[][]{{1, 1}, {1, 2}, {1, 5}, {2, 2}, {2, 3}, {4, 4}, {4, 8}, {10, 20}};
}

/*
v3.0.1: fails when [maxUnackedMsgPerConsumer=1,maxUnackedMsgPerSubscription=1], other variants ok
v3.0.2: fails when [maxUnackedMsgPerConsumer=1,maxUnackedMsgPerSubscription=any value], other variants ok
branch-3.0 (2da571e): fails when [maxUnackedMsgPerConsumer=1,maxUnackedMsgPerSubscription=any value], other variants ok
*/
@Test(dataProvider = "max_unacked_data")
public void test22657_1_parameterized(int maxUnackedMsgPerConsumer, int maxUnackedMsgPerSubscription) throws Exception {
final String ns = "prop/ns-test-%d-%d".formatted(maxUnackedMsgPerConsumer, maxUnackedMsgPerSubscription);
Expand Down Expand Up @@ -1847,7 +1829,7 @@ public void test22657_1_parameterized(int maxUnackedMsgPerConsumer, int maxUnack
.subscribe();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);

int msgs = Math.max(maxUnackedMsgPerConsumer, maxUnackedMsgPerSubscription)+1;
int msgs = Math.max(maxUnackedMsgPerConsumer, maxUnackedMsgPerSubscription) + 1;
for (int i = 0; i < msgs; i++) {
producer.send((("message " + i).getBytes(StandardCharsets.UTF_8)));
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
Expand All @@ -1873,6 +1855,11 @@ public void test22657_1_parameterized(int maxUnackedMsgPerConsumer, int maxUnack
assertFalse(subscriptionStats.getConsumers().get(0).isBlockedConsumerOnUnackedMsgs());
}

/*
v3.0.1: ok
v3.0.2: fails (timeout on message2Future.get)
branch-3.0 (2da571e): fails (timeout on message2Future.get)
*/
@Test
public void test22657_2() throws Exception {
final String ns = "prop/ns-test";
Expand Down Expand Up @@ -1926,4 +1913,232 @@ public void test22657_2() throws Exception {
assertFalse(subscriptionStats.isBlockedSubscriptionOnUnackedMsgs());
assertFalse(subscriptionStats.getConsumers().get(0).isBlockedConsumerOnUnackedMsgs());
}

/*
v3.0.1: fails when [maxUnackedMsgPerConsumer=1,maxUnackedMsgPerSubscription=1], other variants ok
v3.0.2: fails
branch-3.0 (2da571e): fails
*/
@Test(dataProvider = "max_unacked_data")
public void test22657_3(int maxUnackedMessagesPerConsumer, int maxUnackedMessagesPerSubscription) throws Exception {
final String ns = "prop/ns-test-%d-%d".formatted(maxUnackedMessagesPerConsumer, maxUnackedMessagesPerSubscription);

admin.namespaces().createNamespace(ns, 2);
admin.namespaces().setMaxUnackedMessagesPerConsumer(ns, maxUnackedMessagesPerConsumer);
admin.namespaces().setMaxUnackedMessagesPerSubscription(ns, maxUnackedMessagesPerSubscription);

final String topicName = "persistent://" + ns + "/test-22657-3";

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();

String subscriptionName = "sub1";

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.receiverQueueSize(1)
.subscribe();

int numberOfMessages = maxUnackedMessagesPerSubscription * 5;

// Supervisor whether all the work has been done.
CountDownLatch supervisor = new CountDownLatch(numberOfMessages * 2);

// producer
for (int i = 0; i < numberOfMessages; i++) {
producer.sendAsync((("message " + i).getBytes(StandardCharsets.UTF_8)))
.whenCompleteAsync((messageId, throwable) -> {
if (throwable == null) {
supervisor.countDown();
}
}
);
}

// consumer
new Thread(() -> {
try {
while (true) {
Message<byte[]> message = consumer.receive(1, TimeUnit.SECONDS);
if (message == null) {
break;
}

consumer.acknowledge(message);
supervisor.countDown();
}
} catch (PulsarClientException e) {
log.error("Consumer failed", e);
throw new RuntimeException(e);
}
}).start();


// Wait until all messages are sent and later processed.
if (!supervisor.await(10, TimeUnit.SECONDS)) {
fail("The test is failed because of latch.await timeout.");
}

// Maybe something is still happening asynchronously? Maybe some time is needed to obtain the correct state?
// Therefore, awaitility "blob" instead of standard assertions, at least for now.
Awaitility.await()
.atMost(1, TimeUnit.SECONDS)
.pollDelay(100, TimeUnit.MILLISECONDS)
.until(() -> {
SubscriptionStats subscriptionStats = admin.topics().getStats(topicName).getSubscriptions().get(subscriptionName);
ConsumerStats consumerStats = subscriptionStats.getConsumers().get(0);

int currentReceiverQueueSize = ((ConsumerImpl<byte[]>) consumer).getCurrentReceiverQueueSize();
int numMessagesInQueue = ((ConsumerImpl<byte[]>) consumer).numMessagesInQueue();

long unackedMessagesSubscription = subscriptionStats.getUnackedMessages();
boolean blockedSubscriptionOnUnackedMsgs = subscriptionStats.isBlockedSubscriptionOnUnackedMsgs();
int unackedMessagesConsumer = consumerStats.getUnackedMessages();
boolean blockedConsumerOnUnackedMsgs = consumerStats.isBlockedConsumerOnUnackedMsgs();

log.info("----");
log.info("subscriptionStats: {}", subscriptionStats);
log.info("currentReceiverQueueSize: {}", currentReceiverQueueSize);
log.info("numMessagesInQueue: {}", numMessagesInQueue);
log.info("unackedMessagesSubscription: {}", unackedMessagesSubscription);
log.info("blockedSubscriptionOnUnackedMsgs: {}", blockedSubscriptionOnUnackedMsgs);
log.info("unackedMessagesConsumer: {}", unackedMessagesConsumer);
log.info("blockedConsumerOnUnackedMsgs: {}", blockedConsumerOnUnackedMsgs);
log.info("----");

return numMessagesInQueue == 0
&& unackedMessagesSubscription == 0
&& !blockedSubscriptionOnUnackedMsgs
&& unackedMessagesConsumer == 0
&& !blockedConsumerOnUnackedMsgs;
});
}

/*
v3.0.1: fails when [maxUnackedMsgPerConsumer=1,maxUnackedMsgPerSubscription=1], other variants ok
v3.0.2: fails
branch-3.0 (2da571e): fails
*/
@Test(dataProvider = "max_unacked_data")
public void test22657_3_moreconsumers(int maxUnackedMessagesPerConsumer, int maxUnackedMessagesPerSubscription) throws Exception {
final String ns = "prop/ns-test-%d-%d".formatted(maxUnackedMessagesPerConsumer, maxUnackedMessagesPerSubscription);

admin.namespaces().createNamespace(ns, 2);
admin.namespaces().setMaxUnackedMessagesPerConsumer(ns, maxUnackedMessagesPerConsumer);
admin.namespaces().setMaxUnackedMessagesPerSubscription(ns, maxUnackedMessagesPerSubscription);

final String topicName = "persistent://" + ns + "/test-22657-3";

Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();

String subscriptionName = "sub1";

// numbersOfConsumers = each consumer can have their own max unack
int numbersOfConsumers = maxUnackedMessagesPerSubscription / maxUnackedMessagesPerConsumer;

List<Consumer<byte[]>> consumers = new ArrayList<>();
for (int i = 0; i < numbersOfConsumers; i++) {
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.receiverQueueSize(1)
.subscribe();
consumers.add(consumer);
}

int numberOfMessages = maxUnackedMessagesPerSubscription * 5;
CountDownLatch supervisor = new CountDownLatch(numberOfMessages * 2);

// producer
for (int i = 0; i < numberOfMessages; i++) {
producer.sendAsync((("message " + i).getBytes(StandardCharsets.UTF_8)))
.whenCompleteAsync((messageId, throwable) -> {
if (throwable == null) {
supervisor.countDown();
}
}
);
}

// consumers
for (int i = 0; i < consumers.size(); i++) {
int finalI = i;
Consumer<byte[]> consumer = consumers.get(i);
for (int k = 0; k < maxUnackedMessagesPerConsumer; k++) {
new Thread(() -> {
try {
while (true) {
Message<byte[]> message = consumer.receive(1, TimeUnit.SECONDS);
if (message == null) {
break;
}

consumer.acknowledge(message);
supervisor.countDown();
}
} catch (PulsarClientException e) {
log.error("Consumer {} failed", finalI, e);
throw new RuntimeException(e);
}
}).start();
}
}

// Wait until all messages are sent and later processed.
if (!supervisor.await(10, TimeUnit.SECONDS)) {
fail("The test is failed because of latch.await timeout.");
}

// Maybe something is still happening asynchronously? Maybe some time is needed to obtain the correct state?
// Therefore, awaitility "blob" instead of standard assertions, at least for now.
Awaitility.await()
.atMost(1, TimeUnit.SECONDS)
.pollDelay(100, TimeUnit.MILLISECONDS)
.until(() -> {
SubscriptionStats subscriptionStats = admin.topics().getStats(topicName).getSubscriptions().get(subscriptionName);
ConsumerStats consumerStats = subscriptionStats.getConsumers().get(0);

StringBuilder currentReceiverQueueSize = new StringBuilder();
StringBuilder numMessagesInQueue = new StringBuilder();
for (Consumer<byte[]> consumer : consumers) {
currentReceiverQueueSize.append(((ConsumerImpl<byte[]>) consumer).getCurrentReceiverQueueSize()).append("/");
numMessagesInQueue.append(((ConsumerImpl<byte[]>) consumer).numMessagesInQueue()).append("/");
}
currentReceiverQueueSize.deleteCharAt(currentReceiverQueueSize.length() - 1);
numMessagesInQueue.deleteCharAt(numMessagesInQueue.length() - 1);


long unackedMessagesSubscription = subscriptionStats.getUnackedMessages();
boolean blockedSubscriptionOnUnackedMsgs = subscriptionStats.isBlockedSubscriptionOnUnackedMsgs();
int unackedMessagesConsumer = consumerStats.getUnackedMessages();
boolean blockedConsumerOnUnackedMsgs = consumerStats.isBlockedConsumerOnUnackedMsgs();

log.info("----");
log.info("subscriptionStats: {}", subscriptionStats);
log.info("currentReceiverQueueSize: {}", currentReceiverQueueSize.toString());
log.info("numMessagesInQueue: {}", numMessagesInQueue.toString());
log.info("unackedMessagesSubscription: {}", unackedMessagesSubscription);
log.info("blockedSubscriptionOnUnackedMsgs: {}", blockedSubscriptionOnUnackedMsgs);
log.info("unackedMessagesConsumer: {}", unackedMessagesConsumer);
log.info("blockedConsumerOnUnackedMsgs: {}", blockedConsumerOnUnackedMsgs);
log.info("----");

return unackedMessagesSubscription == 0
&& !blockedSubscriptionOnUnackedMsgs
&& unackedMessagesConsumer == 0
&& !blockedConsumerOnUnackedMsgs;
});
}
}

0 comments on commit 20050a5

Please sign in to comment.