Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support start from separate MessageId for each topic/ partition #10033

Merged
merged 4 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,30 @@
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -358,6 +363,52 @@ public void testSeekTime() throws Exception {
assertEquals(sub.getNumberOfEntriesInBacklog(false), 10);
}

@Test
public void testSeekTimeByFunction() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/test" + UUID.randomUUID();
int partitionNum = 4;
int msgNum = 20;
admin.topics().createPartitionedTopic(topicName, partitionNum);
creatProducerAndSendMsg(topicName, msgNum);
org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient
.newConsumer(Schema.STRING).startMessageIdInclusive()
.topic(topicName).subscriptionName("my-sub").subscribe();
long now = System.currentTimeMillis();
consumer.seek((topic) -> now);
assertNull(consumer.receive(1, TimeUnit.SECONDS));

consumer.seek((topic) -> {
TopicName name = TopicName.get(topic);
switch (name.getPartitionIndex()) {
case 0:
return MessageId.latest;
case 1:
return MessageId.earliest;
case 2:
return now;
case 3:
return now - 999999;
default:
return null;
}
});
int count = 0;
while (true) {
Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
if (message == null) {
break;
}
count++;
}
int msgNumInPartition0 = 0;
int msgNumInPartition1 = msgNum / partitionNum;
int msgNumInPartition2 = 0;
int msgNumInPartition3 = msgNum / partitionNum;

assertEquals(count, msgNumInPartition0 + msgNumInPartition1 + msgNumInPartition2 + msgNumInPartition3);

}

@Test
public void testSeekTimeOnPartitionedTopic() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testSeekTimePartitions";
Expand Down Expand Up @@ -493,4 +544,156 @@ public void testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek
}
assertTrue(hasConsumerNotDisconnected);
}

@Test
public void testSeekByFunction() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/test" + UUID.randomUUID();
int partitionNum = 4;
int msgNum = 160;
admin.topics().createPartitionedTopic(topicName, partitionNum);
creatProducerAndSendMsg(topicName, msgNum);
org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient
.newConsumer(Schema.STRING).startMessageIdInclusive()
.topic(topicName).subscriptionName("my-sub").subscribe();

TopicName partitionedTopic = TopicName.get(topicName);
Reader<String> reader = pulsarClient.newReader(Schema.STRING)
.startMessageId(MessageId.earliest)
.topic(partitionedTopic.getPartition(0).toString()).create();
List<MessageId> list = new ArrayList<>();
while (reader.hasMessageAvailable()) {
list.add(reader.readNext().getMessageId());
}
// get middle msg from partition-0
MessageId middleMsgIdInPartition0 = list.get(list.size() / 2);
List<MessageId> msgNotIn = list.subList(0, list.size() / 2 - 1);
// get last msg from partition-1
MessageId lastMsgInPartition1 = admin.topics().getLastMessageId(partitionedTopic.getPartition(1).toString());
reader.close();
reader = pulsarClient.newReader(Schema.STRING)
.startMessageId(MessageId.earliest)
.topic(partitionedTopic.getPartition(2).toString()).create();
// get first msg from partition-2
MessageId firstMsgInPartition2 = reader.readNext().getMessageId();

consumer.seek((topic) -> {
int index = TopicName.get(topic).getPartitionIndex();
if (index == 0) {
return middleMsgIdInPartition0;
} else if (index == 1) {
return lastMsgInPartition1;
} else if (index == 2) {
return firstMsgInPartition2;
}
return null;
});
Set<MessageId> received = new HashSet<>();
while (true) {
Message<String> message = consumer.receive(2, TimeUnit.SECONDS);
if (message == null) {
break;
}
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) message.getMessageId();
received.add(topicMessageId.getInnerMessageId());
}
int msgNumFromPartition1 = list.size() / 2;
int msgNumFromPartition2 = 1;
int msgNumFromPartition3 = msgNum / partitionNum;
assertEquals(received.size(), msgNumFromPartition1 + msgNumFromPartition2 + msgNumFromPartition3);
assertTrue(received.contains(middleMsgIdInPartition0));
assertTrue(received.contains(lastMsgInPartition1));
assertTrue(received.contains(firstMsgInPartition2));
for (MessageId messageId : msgNotIn) {
assertFalse(received.contains(messageId));
}
reader.close();
consumer.close();
}

private List<MessageId> creatProducerAndSendMsg(String topic, int msgNum) throws Exception {
List<MessageId> messageIds = new ArrayList<>();
Producer<String> producer = pulsarClient
.newProducer(Schema.STRING)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.topic(topic).create();
for (int i = 0; i < msgNum; i++) {
messageIds.add(producer.send("msg" + i));
}
producer.close();
return messageIds;
}

@Test
public void testSeekByFunctionAndMultiTopic() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/test" + UUID.randomUUID();
final String topicName2 = "persistent://prop/use/ns-abc/test" + UUID.randomUUID();
int partitionNum = 3;
int msgNum = 15;
admin.topics().createPartitionedTopic(topicName, partitionNum);
admin.topics().createPartitionedTopic(topicName2, partitionNum);
creatProducerAndSendMsg(topicName, msgNum);
creatProducerAndSendMsg(topicName2, msgNum);
TopicName topic = TopicName.get(topicName);
TopicName topic2 = TopicName.get(topicName2);
MessageId msgIdInTopic1Partition0 = admin.topics().getLastMessageId(topic.getPartition(0).toString());
MessageId msgIdInTopic1Partition2 = admin.topics().getLastMessageId(topic.getPartition(2).toString());
MessageId msgIdInTopic2Partition0 = admin.topics().getLastMessageId(topic2.getPartition(0).toString());
MessageId msgIdInTopic2Partition2 = admin.topics().getLastMessageId(topic2.getPartition(2).toString());

org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient
.newConsumer(Schema.STRING).startMessageIdInclusive()
.topics(Arrays.asList(topicName, topicName2)).subscriptionName("my-sub").subscribe();
consumer.seek((partitionedTopic) -> {
if (partitionedTopic.equals(topic.getPartition(0).toString())) {
return msgIdInTopic1Partition0;
}
if (partitionedTopic.equals(topic.getPartition(2).toString())) {
return msgIdInTopic1Partition2;
}
if (partitionedTopic.equals(topic2.getPartition(0).toString())) {
return msgIdInTopic2Partition0;
}
if (partitionedTopic.equals(topic2.getPartition(2).toString())) {
return msgIdInTopic2Partition2;
}
return MessageId.earliest;
});
int count = 0;
while (true) {
Message message = consumer.receive(2, TimeUnit.SECONDS);
if (message == null) {
break;
}
count++;
}
int msgInTopic1Partition0 = 1;
int msgInTopic1Partition1 = msgNum / partitionNum;
int msgInTopic1Partition2 = 1;
assertEquals(count, (msgInTopic1Partition0 + msgInTopic1Partition1 + msgInTopic1Partition2) * 2);
}

@Test
public void testExceptionBySeekFunction() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/test" + UUID.randomUUID();
creatProducerAndSendMsg(topicName,10);
org.apache.pulsar.client.api.Consumer consumer = pulsarClient
.newConsumer()
.topic(topicName).subscriptionName("my-sub").subscribe();
try {
consumer.seek((Function<String, MessageId>) null);
fail("should fail");
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException);
assertTrue(e.getMessage().contains("Function must be set"));
}
assertNull(consumer.seekAsync((topic)-> null).get());
try {
assertNull(consumer.seekAsync((topic)-> new Object()).get());
fail("should fail");
} catch (Exception e) {
assertTrue(e.getCause() instanceof PulsarClientException);
assertTrue(e.getCause().getMessage().contains("Only support seek by messageId or timestamp"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerStats;
Expand Down Expand Up @@ -142,6 +143,10 @@ public void seek(long arg0) throws PulsarClientException {
consumer.seek(arg0);
}

public void seek(Function<String, Object> function) throws PulsarClientException {
consumer.seek(function);
}

public CompletableFuture<Void> seekAsync(long arg0) {
return consumer.seekAsync(arg0);
}
Expand All @@ -150,6 +155,10 @@ public CompletableFuture<Void> seekAsync(MessageId arg0) {
return consumer.seekAsync(arg0);
}

public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
return consumer.seekAsync(function);
}

public void unsubscribe() throws PulsarClientException {
consumer.unsubscribe();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
Expand Down Expand Up @@ -598,6 +599,32 @@ CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId,
*/
void seek(long timestamp) throws PulsarClientException;

/**
* Reset the subscription associated with this consumer to a specific message id.
* <p>
* The Function input is topic+partition.
* <p>
* The return value is the seek position/timestamp of the current partition.
* <p>
* If returns null, the current partition will not do any processing.
* @param function
* @throws PulsarClientException
*/
void seek(Function<String, Object> function) throws PulsarClientException;

/**
* Reset the subscription associated with this consumer to a specific message id asynchronously.
* <p>
* The Function input is topic+partition.
* <p>
* The return value is the seek position/timestamp of the current partition.
* <p>
* If returns null, the current partition will not do any processing.
* @param function
* @return
*/
CompletableFuture<Void> seekAsync(Function<String, Object> function);

/**
* Reset the subscription associated with this consumer to a specific message id.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -1746,6 +1747,34 @@ public void seek(long timestamp) throws PulsarClientException {
}
}

@Override
public void seek(Function<String, Object> function) throws PulsarClientException {
try {
seekAsync(function).get();
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}

@Override
public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
if (function == null) {
return FutureUtil.failedFuture(new PulsarClientException("Function must be set"));
}
Object seekPosition = function.apply(topic);
if (seekPosition == null) {
return CompletableFuture.completedFuture(null);
}
if (seekPosition instanceof MessageId) {
return seekAsync((MessageId) seekPosition);
} else if (seekPosition.getClass().getTypeName()
.equals(Long.class.getTypeName())) {
return seekAsync((long) seekPosition);
}
return FutureUtil.failedFuture(
new PulsarClientException("Only support seek by messageId or timestamp"));
}

private Optional<CompletableFuture<Void>> seekAsyncCheckState(String seekBy) {
if (getState() == State.Closing || getState() == State.Closed) {
return Optional.of(FutureUtil
Expand Down
Loading