Skip to content

Commit

Permalink
Fix unack message count for transaction Ack while disabled batch inde…
Browse files Browse the repository at this point in the history
…x ack (#14071)

* Fix unack message count for transaction Ack while disabled batch index ack.

### Motivation

Fix unack message count for transaction Ack while disabled batch index ack.

Transaction Ack is different with normal message ack for a batch message.

For normal message, we are using a bitset to carry the batch index state, for example

```
1. Ack with `00111111` means acks batch index 0 and 1
2. For ack batch index 2 and 3, the client will send `00001111` to broker
3. After all the batch been acked, send `00000000` to broker
```

The following is for transaction ack:

```
1. `00111111` means acks batch index 0 and 1
1. `11001111` means acks batch index 2 and 3
```

### Verification

Enabled transaction e2e test for batch index ack disabled
  • Loading branch information
codelipenghui committed Feb 10, 2022
1 parent 025509c commit a640146
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -465,17 +465,13 @@ private CompletableFuture<Void> individualAckWithTransaction(CommandAck ack) {
ackSets[j] = msgId.getAckSetAt(j);
}
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), ackSets);
ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, ackSets);
ackedCount = getAckedCountForTransactionAck(batchSize, ackSets);
} else {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position);
ackedCount = batchSize;
}

if (msgId.hasBatchIndex()) {
positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
} else {
positionsAcked.add(new MutablePair<>(position, 0));
}
positionsAcked.add(new MutablePair<>(position, (int) batchSize));

addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);

Expand Down Expand Up @@ -519,7 +515,7 @@ private long getBatchSize(MessageIdData msgId) {
}

private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl position) {
if (Subscription.isIndividualAckMode(subType) && isAcknowledgmentAtBatchIndexLevelEnabled) {
if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) {
long[] cursorAckSet = getCursorAckSet(position);
if (cursorAckSet != null) {
return getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET);
Expand Down Expand Up @@ -548,6 +544,13 @@ private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long
return ackedCount;
}

private long getAckedCountForTransactionAck(long batchSize, long[] ackSets) {
BitSetRecyclable bitset = BitSetRecyclable.create().resetWords(ackSets);
long ackedCount = batchSize - bitset.cardinality();
bitset.recycle();
return ackedCount;
}

private long getUnAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize) {
long unAckedCount = batchSize;
if (isAcknowledgmentAtBatchIndexLevelEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,13 +674,18 @@ private void individualAckAbortCommon(TxnID txnID, HashMap<PositionImpl, Positio
&& individualAckPositions.containsKey(entry.getValue())) {
BitSetRecyclable thisBitSet =
BitSetRecyclable.valueOf(entry.getValue().getAckSet());
thisBitSet.flip(0, individualAckPositions.get(entry.getValue()).right);
int batchSize = individualAckPositions.get(entry.getValue()).right;
thisBitSet.flip(0, batchSize);
BitSetRecyclable otherBitSet =
BitSetRecyclable.valueOf(individualAckPositions
.get(entry.getValue()).left.getAckSet());
otherBitSet.or(thisBitSet);
individualAckPositions.get(entry.getKey())
.left.setAckSet(otherBitSet.toLongArray());
if (otherBitSet.cardinality() == batchSize) {
individualAckPositions.remove(entry.getValue());
} else {
individualAckPositions.get(entry.getKey())
.left.setAckSet(otherBitSet.toLongArray());
}
otherBitSet.recycle();
thisBitSet.recycle();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import lombok.Cleanup;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
Expand All @@ -49,9 +51,13 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -63,6 +69,8 @@
@Test(groups = "broker")
public class BatchMessageTest extends BrokerTestBase {

private static final Logger log = LoggerFactory.getLogger(BatchMessageTest.class);

@BeforeClass
@Override
protected void setup() throws Exception {
Expand Down Expand Up @@ -95,6 +103,15 @@ public Object[][] containerBuilderProvider() {
};
}

@DataProvider(name = "testSubTypeAndEnableBatch")
public Object[][] testSubTypeAndEnableBatch() {
return new Object[][] { { SubscriptionType.Shared, Boolean.TRUE },
{ SubscriptionType.Failover, Boolean.TRUE },
{ SubscriptionType.Shared, Boolean.FALSE },
{ SubscriptionType.Failover, Boolean.FALSE }
};
}

@Test(dataProvider = "codecAndContainerBuilder")
public void testSimpleBatchProducerWithFixedBatchSize(CompressionType compressionType, BatcherBuilder builder) throws Exception {
int numMsgs = 50;
Expand Down Expand Up @@ -919,5 +936,80 @@ public void testBatchMessageDispatchingAccordingToPermits() throws Exception {
consumer1.close();
}

@Test(dataProvider="testSubTypeAndEnableBatch")
private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subType,
boolean enableBatch) throws Exception {
final int messageCount = 50;
final String topicName = "persistent://prop/ns-abc/testDecreaseWithAckReceipt" + UUID.randomUUID();
final String subscriptionName = "sub-batch-1";
@Cleanup
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient
.newConsumer(Schema.BYTES)
.topic(topicName)
.isAckReceiptEnabled(true)
.subscriptionName(subscriptionName)
.subscriptionType(subType)
.enableBatchIndexAcknowledgment(true)
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.enableBatching(enableBatch)
.topic(topicName)
.batchingMaxPublishDelay(Integer.MAX_VALUE, TimeUnit.MILLISECONDS)
.create();

CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
producer.sendAsync((i + "").getBytes()).thenAccept(msgId -> {
log.info("Published message with msgId: {}", msgId);
countDownLatch.countDown();
});
// To generate batch message with different batch size
// 31 total batches, 5 batches with 3 messages, 8 batches with 2 messages and 37 batches with 1 message
if (((i / 3) % (i % 3 + 1)) == 0) {
producer.flush();
}
}

countDownLatch.await();

for (int i = 0; i < messageCount; i++) {
Message<byte[]> message = consumer.receive();
if (enableBatch) {
// only ack messages which batch index < 2, which means we will not to ack the
// whole batch for the batch that with more than 2 messages
if (((BatchMessageIdImpl) message.getMessageId()).getBatchIndex() < 2) {
consumer.acknowledgeAsync(message).get();
}
} else {
if (i % 2 == 0) {
consumer.acknowledgeAsync(message).get();
}
}
}

String topic = TopicName.get(topicName).toString();
PersistentSubscription persistentSubscription = (PersistentSubscription) pulsar.getBrokerService()
.getTopic(topic, false).get().get().getSubscription(subscriptionName);

Awaitility.await().untilAsserted(() -> {
if (subType == SubscriptionType.Shared) {
if (enableBatch) {
if (conf.isAcknowledgmentAtBatchIndexLevelEnabled()) {
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 5 * 1);
} else {
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 5 * 3);
}
} else {
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2);
}
} else {
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 0);
}
});
}

private static final Logger LOG = LoggerFactory.getLogger(BatchMessageTest.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,24 @@
package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.testng.Assert.assertEquals;

@Test(groups = "broker")
Expand Down Expand Up @@ -111,66 +107,4 @@ public void testBatchMessageAck() {
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
});
}

@DataProvider(name = "testSubTypeAndEnableBatch")
public Object[][] testSubTypeAndEnableBatch() {
return new Object[][] { { SubscriptionType.Shared, Boolean.TRUE },
{ SubscriptionType.Failover, Boolean.TRUE },
{ SubscriptionType.Shared, Boolean.FALSE },
{ SubscriptionType.Failover, Boolean.FALSE }};
}


@Test(dataProvider="testSubTypeAndEnableBatch")
private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subType,
boolean enableBatch) throws Exception {

final int messageCount = 50;
final String topicName = "persistent://prop/ns-abc/testDecreaseWithAckReceipt" + UUID.randomUUID();
final String subscriptionName = "sub-batch-1";
@Cleanup
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient
.newConsumer(Schema.BYTES)
.topic(topicName)
.isAckReceiptEnabled(true)
.subscriptionName(subscriptionName)
.subscriptionType(subType)
.enableBatchIndexAcknowledgment(true)
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.enableBatching(enableBatch)
.topic(topicName)
.batchingMaxMessages(10)
.create();

CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
producer.sendAsync((i + "").getBytes()).thenRun(countDownLatch::countDown);
}

countDownLatch.await();

for (int i = 0; i < messageCount; i++) {
Message<byte[]> message = consumer.receive();
// wait for receipt
if (i < messageCount / 2) {
consumer.acknowledgeAsync(message.getMessageId()).get();
}
}

String topic = TopicName.get(topicName).toString();
PersistentSubscription persistentSubscription = (PersistentSubscription) pulsar.getBrokerService()
.getTopic(topic, false).get().get().getSubscription(subscriptionName);

Awaitility.await().untilAsserted(() -> {
if (subType == SubscriptionType.Shared) {
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2);
} else {
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 0);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public abstract class TransactionTestBase extends TestRetrySupport {

public static final String TENANT = "tnx";
protected static final String NAMESPACE1 = TENANT + "/ns1";
protected ServiceConfiguration conf = new ServiceConfiguration();

public void internalSetup() throws Exception {
incrementSetupNumber();
Expand Down Expand Up @@ -145,7 +146,6 @@ protected void setUpBase(int numBroker,int numPartitionsOfTC, String topic, int

protected void startBroker() throws Exception {
for (int i = 0; i < brokerCount; i++) {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setClusterName(CLUSTER_NAME);
conf.setAdvertisedAddress("localhost");
conf.setManagedLedgerCacheSizeMB(8);
Expand All @@ -155,7 +155,6 @@ protected void startBroker() throws Exception {
conf.setConfigurationStoreServers("localhost:3181");
conf.setAllowAutoTopicCreationType("non-partitioned");
conf.setBookkeeperClientExposeStatsToPrometheus(true);
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);

conf.setBrokerShutdownTimeoutMs(0L);
conf.setBrokerServicePort(Optional.of(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class PendingAckInMemoryDeleteTest extends TransactionTestBase {
private static final int NUM_PARTITIONS = 16;
@BeforeMethod
protected void setup() throws Exception {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
setUpBase(1, NUM_PARTITIONS, NAMESPACE1 +"/test", 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,13 @@
@Test(groups = "flaky")
public class TransactionEndToEndTest extends TransactionTestBase {

private static final int TOPIC_PARTITION = 3;
private static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
private static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + "/message-ack-test";
private static final int NUM_PARTITIONS = 16;
protected static final int TOPIC_PARTITION = 3;
protected static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
protected static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + "/message-ack-test";
protected static final int NUM_PARTITIONS = 16;
@BeforeMethod
protected void setup() throws Exception {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);
admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1);
}
Expand Down Expand Up @@ -323,7 +324,7 @@ public void txnIndividualAckTestBatchAndFailoverSub() throws Exception {
txnAckTest(true, 200, SubscriptionType.Failover);
}

private void txnAckTest(boolean batchEnable, int maxBatchSize,
protected void txnAckTest(boolean batchEnable, int maxBatchSize,
SubscriptionType subscriptionType) throws Exception {
String normalTopic = NAMESPACE1 + "/normal-topic";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.SubscriptionType;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/**
* End to end transaction test.
*/
@Slf4j
@Test(groups = "flaky")
public class TransactionEndToEndWithoutBatchIndexAckTest extends TransactionEndToEndTest {

@BeforeMethod
protected void setup() throws Exception {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(false);
setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);
admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1);
}

// TODO need to fix which using transaction with individual ack for failover subscription
@Test
public void txnIndividualAckTestBatchAndFailoverSub() throws Exception {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
txnAckTest(true, 200, SubscriptionType.Failover);
}
}

0 comments on commit a640146

Please sign in to comment.