diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 4805eb5404957..3bcb4920b5f8a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -465,17 +465,13 @@ private CompletableFuture individualAckWithTransaction(CommandAck ack) { ackSets[j] = msgId.getAckSetAt(j); } position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), ackSets); - ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, ackSets); + ackedCount = getAckedCountForTransactionAck(batchSize, ackSets); } else { position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()); - ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position); + ackedCount = batchSize; } - if (msgId.hasBatchIndex()) { - positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize())); - } else { - positionsAcked.add(new MutablePair<>(position, 0)); - } + positionsAcked.add(new MutablePair<>(position, (int) batchSize)); addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); @@ -519,7 +515,7 @@ private long getBatchSize(MessageIdData msgId) { } private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl position) { - if (Subscription.isIndividualAckMode(subType) && isAcknowledgmentAtBatchIndexLevelEnabled) { + if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) { long[] cursorAckSet = getCursorAckSet(position); if (cursorAckSet != null) { return getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET); @@ -548,6 +544,13 @@ private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long return ackedCount; } + private long getAckedCountForTransactionAck(long batchSize, long[] ackSets) { + BitSetRecyclable bitset = BitSetRecyclable.create().resetWords(ackSets); + long ackedCount = batchSize - bitset.cardinality(); + bitset.recycle(); + return ackedCount; + } + private long getUnAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize) { long unAckedCount = batchSize; if (isAcknowledgmentAtBatchIndexLevelEnabled) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 1c0b10fbd57f6..6fafc686169d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -674,13 +674,18 @@ private void individualAckAbortCommon(TxnID txnID, HashMap consumer = (ConsumerImpl) pulsarClient + .newConsumer(Schema.BYTES) + .topic(topicName) + .isAckReceiptEnabled(true) + .subscriptionName(subscriptionName) + .subscriptionType(subType) + .enableBatchIndexAcknowledgment(true) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient + .newProducer() + .enableBatching(enableBatch) + .topic(topicName) + .batchingMaxPublishDelay(Integer.MAX_VALUE, TimeUnit.MILLISECONDS) + .create(); + + CountDownLatch countDownLatch = new CountDownLatch(messageCount); + for (int i = 0; i < messageCount; i++) { + producer.sendAsync((i + "").getBytes()).thenAccept(msgId -> { + log.info("Published message with msgId: {}", msgId); + countDownLatch.countDown(); + }); + // To generate batch message with different batch size + // 31 total batches, 5 batches with 3 messages, 8 batches with 2 messages and 37 batches with 1 message + if (((i / 3) % (i % 3 + 1)) == 0) { + producer.flush(); + } + } + + countDownLatch.await(); + + for (int i = 0; i < messageCount; i++) { + Message message = consumer.receive(); + if (enableBatch) { + // only ack messages which batch index < 2, which means we will not to ack the + // whole batch for the batch that with more than 2 messages + if (((BatchMessageIdImpl) message.getMessageId()).getBatchIndex() < 2) { + consumer.acknowledgeAsync(message).get(); + } + } else { + if (i % 2 == 0) { + consumer.acknowledgeAsync(message).get(); + } + } + } + + String topic = TopicName.get(topicName).toString(); + PersistentSubscription persistentSubscription = (PersistentSubscription) pulsar.getBrokerService() + .getTopic(topic, false).get().get().getSubscription(subscriptionName); + + Awaitility.await().untilAsserted(() -> { + if (subType == SubscriptionType.Shared) { + if (enableBatch) { + if (conf.isAcknowledgmentAtBatchIndexLevelEnabled()) { + assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 5 * 1); + } else { + assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 5 * 3); + } + } else { + assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2); + } + } else { + assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 0); + } + }); + } + private static final Logger LOG = LoggerFactory.getLogger(BatchMessageTest.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java index 5e09def401663..c3785c7d3cb40 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java @@ -19,28 +19,24 @@ package org.apache.pulsar.broker.service; import com.google.common.collect.Lists; -import lombok.Cleanup; import lombok.SneakyThrows; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; -import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ConsumerImpl; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.testng.annotations.BeforeClass; -import org.testng.annotations.DataProvider; import org.testng.annotations.Test; + import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; + import static org.testng.Assert.assertEquals; @Test(groups = "broker") @@ -111,66 +107,4 @@ public void testBatchMessageAck() { assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16); }); } - - @DataProvider(name = "testSubTypeAndEnableBatch") - public Object[][] testSubTypeAndEnableBatch() { - return new Object[][] { { SubscriptionType.Shared, Boolean.TRUE }, - { SubscriptionType.Failover, Boolean.TRUE }, - { SubscriptionType.Shared, Boolean.FALSE }, - { SubscriptionType.Failover, Boolean.FALSE }}; - } - - - @Test(dataProvider="testSubTypeAndEnableBatch") - private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subType, - boolean enableBatch) throws Exception { - - final int messageCount = 50; - final String topicName = "persistent://prop/ns-abc/testDecreaseWithAckReceipt" + UUID.randomUUID(); - final String subscriptionName = "sub-batch-1"; - @Cleanup - ConsumerImpl consumer = (ConsumerImpl) pulsarClient - .newConsumer(Schema.BYTES) - .topic(topicName) - .isAckReceiptEnabled(true) - .subscriptionName(subscriptionName) - .subscriptionType(subType) - .enableBatchIndexAcknowledgment(true) - .subscribe(); - - @Cleanup - Producer producer = pulsarClient - .newProducer() - .enableBatching(enableBatch) - .topic(topicName) - .batchingMaxMessages(10) - .create(); - - CountDownLatch countDownLatch = new CountDownLatch(messageCount); - for (int i = 0; i < messageCount; i++) { - producer.sendAsync((i + "").getBytes()).thenRun(countDownLatch::countDown); - } - - countDownLatch.await(); - - for (int i = 0; i < messageCount; i++) { - Message message = consumer.receive(); - // wait for receipt - if (i < messageCount / 2) { - consumer.acknowledgeAsync(message.getMessageId()).get(); - } - } - - String topic = TopicName.get(topicName).toString(); - PersistentSubscription persistentSubscription = (PersistentSubscription) pulsar.getBrokerService() - .getTopic(topic, false).get().get().getSubscription(subscriptionName); - - Awaitility.await().untilAsserted(() -> { - if (subType == SubscriptionType.Shared) { - assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2); - } else { - assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 0); - } - }); - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index ab248a2ba017e..de1ffdb593763 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -84,6 +84,7 @@ public abstract class TransactionTestBase extends TestRetrySupport { public static final String TENANT = "tnx"; protected static final String NAMESPACE1 = TENANT + "/ns1"; + protected ServiceConfiguration conf = new ServiceConfiguration(); public void internalSetup() throws Exception { incrementSetupNumber(); @@ -145,7 +146,6 @@ protected void setUpBase(int numBroker,int numPartitionsOfTC, String topic, int protected void startBroker() throws Exception { for (int i = 0; i < brokerCount; i++) { - ServiceConfiguration conf = new ServiceConfiguration(); conf.setClusterName(CLUSTER_NAME); conf.setAdvertisedAddress("localhost"); conf.setManagedLedgerCacheSizeMB(8); @@ -155,7 +155,6 @@ protected void startBroker() throws Exception { conf.setConfigurationStoreServers("localhost:3181"); conf.setAllowAutoTopicCreationType("non-partitioned"); conf.setBookkeeperClientExposeStatsToPrometheus(true); - conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); conf.setBrokerShutdownTimeoutMs(0L); conf.setBrokerServicePort(Optional.of(0)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java index bc22473e285f0..da2a3a940bd2c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java @@ -62,6 +62,7 @@ public class PendingAckInMemoryDeleteTest extends TransactionTestBase { private static final int NUM_PARTITIONS = 16; @BeforeMethod protected void setup() throws Exception { + conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); setUpBase(1, NUM_PARTITIONS, NAMESPACE1 +"/test", 0); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index 343791b7ae57f..bfa19b2496abb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -83,12 +83,13 @@ @Test(groups = "flaky") public class TransactionEndToEndTest extends TransactionTestBase { - private static final int TOPIC_PARTITION = 3; - private static final String TOPIC_OUTPUT = NAMESPACE1 + "/output"; - private static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + "/message-ack-test"; - private static final int NUM_PARTITIONS = 16; + protected static final int TOPIC_PARTITION = 3; + protected static final String TOPIC_OUTPUT = NAMESPACE1 + "/output"; + protected static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + "/message-ack-test"; + protected static final int NUM_PARTITIONS = 16; @BeforeMethod protected void setup() throws Exception { + conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION); admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1); } @@ -323,7 +324,7 @@ public void txnIndividualAckTestBatchAndFailoverSub() throws Exception { txnAckTest(true, 200, SubscriptionType.Failover); } - private void txnAckTest(boolean batchEnable, int maxBatchSize, + protected void txnAckTest(boolean batchEnable, int maxBatchSize, SubscriptionType subscriptionType) throws Exception { String normalTopic = NAMESPACE1 + "/normal-topic"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java new file mode 100644 index 0000000000000..1ef3998c3467d --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.SubscriptionType; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * End to end transaction test. + */ +@Slf4j +@Test(groups = "flaky") +public class TransactionEndToEndWithoutBatchIndexAckTest extends TransactionEndToEndTest { + + @BeforeMethod + protected void setup() throws Exception { + conf.setAcknowledgmentAtBatchIndexLevelEnabled(false); + setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION); + admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1); + } + + // TODO need to fix which using transaction with individual ack for failover subscription + @Test + public void txnIndividualAckTestBatchAndFailoverSub() throws Exception { + conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); + txnAckTest(true, 200, SubscriptionType.Failover); + } +}