Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 8260] Support reset cursor to a batch index of the batching message #8285

Merged
merged 30 commits into from
Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f8c7c97
store ackSet when reset cursor in batch indexes
Renkai Oct 17, 2020
f723317
carry ack info in command seek
Renkai Oct 17, 2020
77aa4f2
make ackSet by batch index from client
Renkai Oct 17, 2020
6c28676
fix timeout bug
Renkai Oct 17, 2020
76101ff
use different parameter for different message impl
Renkai Oct 18, 2020
0a84bd8
use different parameter for different message impl
Renkai Oct 18, 2020
2c4ce62
polish ackSet
Renkai Oct 18, 2020
18c8acc
polish
Renkai Oct 18, 2020
d365243
polish code
Renkai Oct 18, 2020
0076999
pass test
Renkai Oct 19, 2020
2323491
add some comment
Renkai Oct 19, 2020
2f0cad6
fix ci error
Renkai Oct 19, 2020
f2134ec
replace * with specific imports
Renkai Oct 20, 2020
deb5772
use MessageIdData carried ack set instead of generated by batchIndex
Renkai Oct 20, 2020
7af4e32
use MessageIdData carried ack set instead of generated by batchIndex
Renkai Oct 20, 2020
f989ac3
use MessageIdData carried ack set instead of generated by batchIndex
Renkai Oct 20, 2020
be2c79d
use MessageIdData carried ack set instead of generated by batchIndex
Renkai Oct 20, 2020
3befed3
use MessageIdData carried ack set instead of generated by batchIndex
Renkai Oct 20, 2020
e7d08f2
remove redundant map put
Renkai Oct 20, 2020
b1c90f1
use recycleable bit set
Renkai Oct 20, 2020
68b6215
use recycleable bit set
Renkai Oct 20, 2020
200d3f9
remove unnecessary client parameter
Renkai Oct 20, 2020
0c2e6e4
polish code
Renkai Oct 20, 2020
519ebb9
start message id inclusive
Renkai Oct 21, 2020
8d356ac
start message id inclusive
Renkai Oct 21, 2020
32645c8
leave ack set null when command not include a ack set
Renkai Oct 21, 2020
33399a4
remove unnecessary try catch
Renkai Oct 21, 2020
9b4241d
drop unnecessary sleep
Renkai Oct 21, 2020
5aaa206
enhance test
Renkai Oct 21, 2020
c55e8bf
remove unnecessary import
Renkai Oct 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,11 @@ public void operationComplete() {
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
batchDeletedIndexes.clear();
long[] resetWords = newPosition.ackSet;
if (resetWords != null) {
BitSetRecyclable ackSet = BitSetRecyclable.create().resetWords(resetWords);
batchDeletedIndexes.put(newPosition, ackSet);
}
}

PositionImpl oldReadPosition = readPosition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
Expand Down Expand Up @@ -1350,7 +1352,13 @@ protected void handleSeek(CommandSeek seek) {
Subscription subscription = consumer.getSubscription();
MessageIdData msgIdData = seek.getMessageId();

Position position = new PositionImpl(msgIdData.getLedgerId(), msgIdData.getEntryId());
long[] ackSet = null;
if (msgIdData.getAckSetCount() > 0) {
ackSet = SafeCollectionUtils.longListToArray(msgIdData.getAckSetList());
}

Position position = new PositionImpl(msgIdData.getLedgerId(),
msgIdData.getEntryId(), ackSet);


subscription.resetCursor(position).thenRun(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,28 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
Expand All @@ -54,6 +59,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
@Override
protected void setup() throws Exception {
super.baseSetup();
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
}

@AfterClass
Expand Down Expand Up @@ -116,6 +122,62 @@ public void testSeek() throws Exception {
assertEquals(sub.getNumberOfEntriesInBacklog(false), 0);
}

@Test
public void testSeekForBatch() throws Exception {
final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatch";
String subscriptionName = "my-subscription-batch";

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(true)
.batchingMaxMessages(3)
.topic(topicName).create();


List<MessageId> messageIds = new ArrayList<>();
List<CompletableFuture<MessageId>> futureMessageIds = new ArrayList<>();

List<String> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
messages.add(message);
CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(message);
futureMessageIds.add(messageIdCompletableFuture);
}

for (CompletableFuture<MessageId> futureMessageId : futureMessageIds) {
MessageId messageId = futureMessageId.get();
messageIds.add(messageId);
}

producer.close();


org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName(subscriptionName)
.startMessageIdInclusive()
.subscribe();

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);

assertEquals(topicRef.getSubscriptions().size(), 1);

consumer.seek(MessageId.earliest);
Message<String> receiveBeforEarliest = consumer.receive();
assertEquals(receiveBeforEarliest.getValue(), messages.get(0));
consumer.seek(MessageId.latest);
Message<String> receiveAfterLatest = consumer.receive(1, TimeUnit.SECONDS);
assertNull(receiveAfterLatest);

for (MessageId messageId : messageIds) {
consumer.seek(messageId);
MessageId receiveId = consumer.receive().getMessageId();
assertEquals(receiveId, messageId);
}
}


@Test
public void testConcurrentResetCursor() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testConcurrentReset_" + System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,12 @@ public boolean isPrevBatchCumulativelyAcked() {
return prevBatchCumulativelyAcked;
}

@Override
public String toString() {
return "BatchMessageAcker{" +
"batchSize=" + batchSize +
", bitSet=" + bitSet +
", prevBatchCumulativelyAcked=" + prevBatchCumulativelyAcked +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1922,15 +1922,29 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {

if (!isConnected()) {
return FutureUtil.failedFuture(new PulsarClientException(
String.format("The client is not connected to the broker when seeking the subscription %s of the " +
"topic %s to the message %s", subscription, topicName.toString(), messageId.toString())));
String.format("The client is not connected to the broker when seeking the subscription %s of the " +
"topic %s to the message %s", subscription, topicName.toString(), messageId.toString())));
}

final CompletableFuture<Void> seekFuture = new CompletableFuture<>();

long requestId = client.newRequestId();
MessageIdImpl msgId = (MessageIdImpl) messageId;
ByteBuf seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId());
ByteBuf seek = null;
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl msgId = (BatchMessageIdImpl) messageId;
// Initialize ack set
BitSetRecyclable ackSet = BitSetRecyclable.create();
ackSet.set(0, msgId.getBatchSize());
ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
long[] ackSetArr = ackSet.toLongArray();
ackSet.recycle();

seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
} else {
MessageIdImpl msgId = (MessageIdImpl) messageId;
seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), new long[0]);
}

ClientCnx cnx = cnx();

log.info("[{}][{}] Seek subscription to message id {}", topic, subscription, messageId);
Expand Down Expand Up @@ -2470,4 +2484,4 @@ protected OpForAckCallBack newObject(Handle<OpForAckCallBack> handle) {

private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -760,24 +760,27 @@ public static ByteBuf newUnsubscribe(long consumerId, long requestId) {
public static ByteBuf newActiveConsumerChange(long consumerId, boolean isActive) {
CommandActiveConsumerChange.Builder changeBuilder = CommandActiveConsumerChange.newBuilder()
.setConsumerId(consumerId)
.setIsActive(isActive);
.setIsActive(isActive);

CommandActiveConsumerChange change = changeBuilder.build();
ByteBuf res = serializeWithSize(
BaseCommand.newBuilder().setType(Type.ACTIVE_CONSUMER_CHANGE).setActiveConsumerChange(change));
BaseCommand.newBuilder().setType(Type.ACTIVE_CONSUMER_CHANGE).setActiveConsumerChange(change));
changeBuilder.recycle();
change.recycle();
return res;
}

public static ByteBuf newSeek(long consumerId, long requestId, long ledgerId, long entryId) {
public static ByteBuf newSeek(long consumerId, long requestId,
long ledgerId, long entryId, long[] ackSet) {
CommandSeek.Builder seekBuilder = CommandSeek.newBuilder();
seekBuilder.setConsumerId(consumerId);
seekBuilder.setRequestId(requestId);

MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder();
messageIdBuilder.setLedgerId(ledgerId);
messageIdBuilder.setEntryId(entryId);
messageIdBuilder.addAllAckSet(SafeCollectionUtils.longArrayToList(ackSet));

MessageIdData messageId = messageIdBuilder.build();
seekBuilder.setMessageId(messageId);

Expand Down