Skip to content

Commit

Permalink
[fix][test] Enable testMaxPendingChunkMessages (#18542)
Browse files Browse the repository at this point in the history
  • Loading branch information
RobertIndie committed Nov 22, 2022
1 parent 5df8b50 commit 49ff0f8
Showing 1 changed file with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,17 +309,18 @@ public void testPublishWithFailure() throws Exception {
producer.close();
}

@Test(enabled = false)
@Test
public void testMaxPendingChunkMessages() throws Exception {

log.info("-- Starting {} test --", methodName);
this.conf.setMaxMessageSize(10);
this.conf.setMaxMessageSize(100);
final int totalMessages = 25;
final String topicName = "persistent://my-property/my-ns/maxPending";
final int totalProducers = 25;
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(totalProducers);

@Cleanup
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscriber-name").acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.maxPendingChunkedMessage(1).autoAckOldestChunkedMessageOnQueueFull(true)
Expand All @@ -334,12 +335,11 @@ public void testMaxPendingChunkMessages() throws Exception {
producers[i] = producerBuilder.enableChunking(true).enableBatching(false).create();
int index = i;
executor.submit(() -> {
futures.add(producers[index].sendAsync(createMessagePayload(45).getBytes()));
futures.add(producers[index].sendAsync(createMessagePayload(450).getBytes()));
});
}

FutureUtil.waitForAll(futures).get();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();

Message<byte[]> msg = null;
Set<String> messageSet = new HashSet<>();
Expand All @@ -354,8 +354,12 @@ public void testMaxPendingChunkMessages() throws Exception {
consumer.acknowledge(msg);
}

log.info("messageSet size: {}, totalPublishedMessages: {}", messageSet.size(), totalPublishedMessages);
assertNotEquals(messageSet.size(), totalPublishedMessages);

for (int i = 0; i < totalProducers; i++) {
producers[i].close();
}
}

/**
Expand Down

0 comments on commit 49ff0f8

Please sign in to comment.