From 075dff4ffe57c482b39b3ca7f51b8ef94a21f6e5 Mon Sep 17 00:00:00 2001 From: KKcorps Date: Thu, 13 Jan 2022 23:55:58 +0530 Subject: [PATCH 01/13] Fetch offsets from consumer interface instead of reader --- .../plugin/stream/pulsar/PulsarConfig.java | 8 ++++ .../pulsar/PulsarStreamMetadataProvider.java | 37 ++++++++++--------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java index e605706ab27..5032306f78d 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java @@ -26,6 +26,7 @@ import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; /** @@ -80,4 +81,11 @@ public String getBootstrapServers() { public MessageId getInitialMessageId() { return _initialMessageId; } + + public SubscriptionInitialPosition getInitialSubscriberPosition() { + if (_initialMessageId == MessageId.earliest) { + return SubscriptionInitialPosition.Earliest; + } + return SubscriptionInitialPosition.Latest; + } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java index a6b72bc72f1..64396c8c819 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java @@ -22,16 +22,16 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; -import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Reader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,8 +39,7 @@ /** * A {@link StreamMetadataProvider} implementation for the Pulsar stream */ -public class PulsarStreamMetadataProvider extends PulsarPartitionLevelConnectionHandler - implements StreamMetadataProvider { +public class PulsarStreamMetadataProvider extends PulsarPartitionLevelConnectionHandler implements StreamMetadataProvider { private static final Logger LOGGER = LoggerFactory.getLogger(PulsarStreamMetadataProvider.class); private final StreamConfig _streamConfig; @@ -80,16 +79,15 @@ public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offset Preconditions.checkNotNull(offsetCriteria); try { MessageId offset = null; + Consumer consumer = + _pulsarClient.newConsumer().topic(_topic) + .subscriptionInitialPosition(_config.getInitialSubscriberPosition()) + .subscriptionName("Pinot_" + UUID.randomUUID()).subscribe(); + if (offsetCriteria.isLargest()) { - _reader.seek(MessageId.latest); - if (_reader.hasMessageAvailable()) { - offset = _reader.readNext().getMessageId(); - } + offset = consumer.getLastMessageId(); } else if (offsetCriteria.isSmallest()) { - _reader.seek(MessageId.earliest); - if (_reader.hasMessageAvailable()) { - offset = _reader.readNext().getMessageId(); - } + offset = consumer.receive().getMessageId(); } else { throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria); } @@ -124,18 +122,21 @@ public List computePartitionGroupMetadata(String clientI for (int p = newPartitionStartIndex; p < partitionedTopicNameList.size(); p++) { - Reader reader = - _pulsarClient.newReader().topic(getPartitionedTopicName(p)).startMessageId(_config.getInitialMessageId()) - .create(); + Consumer consumer = _pulsarClient.newConsumer().topic(getPartitionedTopicName(p)) + .subscriptionInitialPosition(_config.getInitialSubscriberPosition()) + .subscriptionName("Pinot_" + UUID.randomUUID()).subscribe(); - if (reader.hasMessageAvailable()) { - Message message = reader.readNext(); + if (!consumer.hasReachedEndOfTopic()) { + newPartitionGroupMetadataList.add( + new PartitionGroupMetadata(p, new MessageIdStreamOffset(consumer.receive().getMessageId()))); + } else { newPartitionGroupMetadataList.add( - new PartitionGroupMetadata(p, new MessageIdStreamOffset(message.getMessageId()))); + new PartitionGroupMetadata(p, new MessageIdStreamOffset(consumer.getLastMessageId()))); } } } } catch (Exception e) { + LOGGER.warn("Error encountered while calculating pulsar partition group metadata: " + e.getMessage(), e); // No partition found } From a835d2e2479209fc551642f587cf27895c55471c Mon Sep 17 00:00:00 2001 From: KKcorps Date: Fri, 14 Jan 2022 12:41:53 +0530 Subject: [PATCH 02/13] Add timeout for getLastMessageId --- .../pulsar/PulsarStreamMetadataProvider.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java index 64396c8c819..f9aa92a3db6 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java @@ -23,6 +23,8 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; import org.apache.pinot.spi.stream.PartitionGroupMetadata; @@ -30,6 +32,7 @@ import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.slf4j.Logger; @@ -39,7 +42,8 @@ /** * A {@link StreamMetadataProvider} implementation for the Pulsar stream */ -public class PulsarStreamMetadataProvider extends PulsarPartitionLevelConnectionHandler implements StreamMetadataProvider { +public class PulsarStreamMetadataProvider extends PulsarPartitionLevelConnectionHandler + implements StreamMetadataProvider { private static final Logger LOGGER = LoggerFactory.getLogger(PulsarStreamMetadataProvider.class); private final StreamConfig _streamConfig; @@ -126,18 +130,25 @@ public List computePartitionGroupMetadata(String clientI .subscriptionInitialPosition(_config.getInitialSubscriberPosition()) .subscriptionName("Pinot_" + UUID.randomUUID()).subscribe(); - if (!consumer.hasReachedEndOfTopic()) { + + Message message = consumer.receive(3, TimeUnit.SECONDS); + if (message != null) { newPartitionGroupMetadataList.add( - new PartitionGroupMetadata(p, new MessageIdStreamOffset(consumer.receive().getMessageId()))); + new PartitionGroupMetadata(p, new MessageIdStreamOffset(message.getMessageId()))); } else { + MessageId lastMessageId; + try { + lastMessageId = (MessageId) consumer.getLastMessageIdAsync().get(3, TimeUnit.SECONDS); + } catch (TimeoutException t) { + lastMessageId = MessageId.latest; + } newPartitionGroupMetadataList.add( - new PartitionGroupMetadata(p, new MessageIdStreamOffset(consumer.getLastMessageId()))); + new PartitionGroupMetadata(p, new MessageIdStreamOffset(lastMessageId))); } } } } catch (Exception e) { LOGGER.warn("Error encountered while calculating pulsar partition group metadata: " + e.getMessage(), e); - // No partition found } return newPartitionGroupMetadataList; From 885b18ffb825e0cd2042a3f7b5825f73e2ba1bd1 Mon Sep 17 00:00:00 2001 From: KKcorps Date: Fri, 14 Jan 2022 12:42:27 +0530 Subject: [PATCH 03/13] Add todo --- .../plugin/stream/pulsar/PulsarStreamMetadataProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java index f9aa92a3db6..7603825bbe8 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java @@ -130,7 +130,7 @@ public List computePartitionGroupMetadata(String clientI .subscriptionInitialPosition(_config.getInitialSubscriberPosition()) .subscriptionName("Pinot_" + UUID.randomUUID()).subscribe(); - + //TODO: Make timeout values configurable Message message = consumer.receive(3, TimeUnit.SECONDS); if (message != null) { newPartitionGroupMetadataList.add( From 7b1d8ce227f518de543869f0615484a6d63e397c Mon Sep 17 00:00:00 2001 From: KKcorps Date: Sun, 16 Jan 2022 02:19:45 +0530 Subject: [PATCH 04/13] Include start message id when computing data --- .../stream/pulsar/PulsarPartitionLevelConnectionHandler.java | 2 +- .../plugin/stream/pulsar/PulsarStreamMetadataProvider.java | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java index 43c61fc95b6..47dbd97ed49 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java @@ -53,7 +53,7 @@ public PulsarPartitionLevelConnectionHandler(String clientId, StreamConfig strea _pulsarClient = PulsarClient.builder().serviceUrl(_config.getBootstrapServers()).build(); _reader = _pulsarClient.newReader().topic(getPartitionedTopicName(partition)) - .startMessageId(_config.getInitialMessageId()).create(); + .startMessageId(_config.getInitialMessageId()).startMessageIdInclusive().create(); LOGGER.info("Created consumer with id {} for topic {}", _reader, _config.getPulsarTopicName()); } catch (Exception e) { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java index 7603825bbe8..1e561bdb829 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java @@ -130,15 +130,14 @@ public List computePartitionGroupMetadata(String clientI .subscriptionInitialPosition(_config.getInitialSubscriberPosition()) .subscriptionName("Pinot_" + UUID.randomUUID()).subscribe(); - //TODO: Make timeout values configurable - Message message = consumer.receive(3, TimeUnit.SECONDS); + Message message = consumer.receive(timeoutMillis, TimeUnit.MILLISECONDS); if (message != null) { newPartitionGroupMetadataList.add( new PartitionGroupMetadata(p, new MessageIdStreamOffset(message.getMessageId()))); } else { MessageId lastMessageId; try { - lastMessageId = (MessageId) consumer.getLastMessageIdAsync().get(3, TimeUnit.SECONDS); + lastMessageId = (MessageId) consumer.getLastMessageIdAsync().get(timeoutMillis, TimeUnit.MILLISECONDS); } catch (TimeoutException t) { lastMessageId = MessageId.latest; } From a4b95eae02ac91aa0d7147e2e3cc27161af12412 Mon Sep 17 00:00:00 2001 From: KKcorps Date: Sun, 16 Jan 2022 03:22:03 +0530 Subject: [PATCH 05/13] Handle batch message ids as well --- .../plugin/stream/pulsar/PulsarMessageBatch.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java index ad756a793dc..cd54be2a46c 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java @@ -25,6 +25,7 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.internal.DefaultImplementation; @@ -75,9 +76,16 @@ public int getMessageLengthAtIndex(int index) { @Override public StreamPartitionMsgOffset getNextStreamPartitionMsgOffsetAtIndex(int index) { MessageIdImpl currentMessageId = MessageIdImpl.convertToMessageIdImpl(_messageList.get(index).getMessageId()); - MessageId nextMessageId = DefaultImplementation - .newMessageId(currentMessageId.getLedgerId(), currentMessageId.getEntryId() + 1, - currentMessageId.getPartitionIndex()); + MessageId nextMessageId; + if (currentMessageId instanceof BatchMessageIdImpl) { + nextMessageId = new BatchMessageIdImpl(currentMessageId.getLedgerId(), currentMessageId.getEntryId() + 1, + currentMessageId.getPartitionIndex(), ((BatchMessageIdImpl) currentMessageId).getBatchIndex(), + ((BatchMessageIdImpl) currentMessageId).getBatchSize(), ((BatchMessageIdImpl) currentMessageId).getAcker()); + } else { + nextMessageId = + DefaultImplementation.newMessageId(currentMessageId.getLedgerId(), currentMessageId.getEntryId() + 1, + currentMessageId.getPartitionIndex()); + } return new MessageIdStreamOffset(nextMessageId); } From a968f158fb354271e17dcfe531dc9dc16f45d03a Mon Sep 17 00:00:00 2001 From: KKcorps Date: Sun, 16 Jan 2022 03:26:27 +0530 Subject: [PATCH 06/13] Use pulsar util to generate random consumer name instead of pinot's logic --- .../plugin/stream/pulsar/PulsarStreamMetadataProvider.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java index 1e561bdb829..47389f464c6 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java @@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.util.ConsumerName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,7 +129,7 @@ public List computePartitionGroupMetadata(String clientI Consumer consumer = _pulsarClient.newConsumer().topic(getPartitionedTopicName(p)) .subscriptionInitialPosition(_config.getInitialSubscriberPosition()) - .subscriptionName("Pinot_" + UUID.randomUUID()).subscribe(); + .subscriptionName(ConsumerName.generateRandomName()).subscribe(); Message message = consumer.receive(timeoutMillis, TimeUnit.MILLISECONDS); if (message != null) { From 0aebfd255742978143cc5a72691d5d179a63dba5 Mon Sep 17 00:00:00 2001 From: KKcorps Date: Fri, 4 Feb 2022 15:38:34 +0530 Subject: [PATCH 07/13] Increment batch id for batch message's next offset --- .../stream/pulsar/PulsarMessageBatch.java | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java index cd54be2a46c..514773719fe 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java @@ -77,14 +77,26 @@ public int getMessageLengthAtIndex(int index) { public StreamPartitionMsgOffset getNextStreamPartitionMsgOffsetAtIndex(int index) { MessageIdImpl currentMessageId = MessageIdImpl.convertToMessageIdImpl(_messageList.get(index).getMessageId()); MessageId nextMessageId; + + long currentLedgerId = currentMessageId.getLedgerId(); + long currentEntryId = currentMessageId.getEntryId(); + int currentPartitionIndex = currentMessageId.getPartitionIndex(); + if (currentMessageId instanceof BatchMessageIdImpl) { - nextMessageId = new BatchMessageIdImpl(currentMessageId.getLedgerId(), currentMessageId.getEntryId() + 1, - currentMessageId.getPartitionIndex(), ((BatchMessageIdImpl) currentMessageId).getBatchIndex(), - ((BatchMessageIdImpl) currentMessageId).getBatchSize(), ((BatchMessageIdImpl) currentMessageId).getAcker()); + int currentBatchIndex = ((BatchMessageIdImpl) currentMessageId).getBatchIndex(); + int currentBatchSize = ((BatchMessageIdImpl) currentMessageId).getBatchSize(); + + if (currentBatchIndex < currentBatchSize - 1) { + nextMessageId = + new BatchMessageIdImpl(currentLedgerId, currentEntryId, currentPartitionIndex, currentBatchIndex + 1, + currentBatchSize, ((BatchMessageIdImpl) currentMessageId).getAcker()); + } else { + nextMessageId = + new BatchMessageIdImpl(currentLedgerId, currentEntryId + 1, currentPartitionIndex, 0, currentBatchSize, + ((BatchMessageIdImpl) currentMessageId).getAcker()); + } } else { - nextMessageId = - DefaultImplementation.newMessageId(currentMessageId.getLedgerId(), currentMessageId.getEntryId() + 1, - currentMessageId.getPartitionIndex()); + nextMessageId = DefaultImplementation.newMessageId(currentLedgerId, currentEntryId + 1, currentPartitionIndex); } return new MessageIdStreamOffset(nextMessageId); } From 5b421d6c5e5c943a3c400319a802cf54f126a1ab Mon Sep 17 00:00:00 2001 From: KKcorps Date: Sun, 20 Feb 2022 21:52:01 +0530 Subject: [PATCH 08/13] Add test for batch message ids --- .../stream/pulsar/PulsarConsumerTest.java | 117 ++++++++++++++++-- 1 file changed, 107 insertions(+), 10 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java index 40a33bf169a..53ad46a728b 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java @@ -20,6 +20,8 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.PartitionGroupConsumer; import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; @@ -35,6 +37,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TopicMetadata; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.internal.DefaultImplementation; import org.testng.Assert; @@ -47,13 +50,18 @@ public class PulsarConsumerTest { public static final String TABLE_NAME_WITH_TYPE = "tableName_REALTIME"; public static final String TEST_TOPIC = "test-topic"; - public static final int NUM_PARTITION = 1; + public static final String TEST_TOPIC_BATCH = "test-topic-batch"; public static final String MESSAGE_PREFIX = "sample_msg"; - public static final int NUM_RECORDS_PER_PARTITION = 1000; public static final String CLIENT_ID = "clientId"; + + public static final int NUM_PARTITION = 1; + public static final int NUM_RECORDS_PER_PARTITION = 1000; + public static final int BATCH_SIZE = 10; + private PulsarClient _pulsarClient; private PulsarStandaloneCluster _pulsarStandaloneCluster; private HashMap _partitionToFirstMessageIdMap = new HashMap<>(); + private HashMap _partitionToFirstMessageIdMapBatch = new HashMap<>(); @BeforeClass public void setUp() @@ -71,8 +79,10 @@ public void setUp() _pulsarClient = PulsarClient.builder().serviceUrl(bootstrapServer).build(); admin.topics().createPartitionedTopic(TEST_TOPIC, NUM_PARTITION); + admin.topics().createPartitionedTopic(TEST_TOPIC_BATCH, NUM_PARTITION); publishRecords(); + publishRecordsBatch(); } catch (Exception e) { if (_pulsarStandaloneCluster != null) { _pulsarStandaloneCluster.stop(); @@ -107,10 +117,37 @@ public int choosePartition(Message msg, TopicMetadata metadata) { _partitionToFirstMessageIdMap.put(partition, messageId); } } + + producer.flush(); } } - public StreamConfig getStreamConfig() { + public void publishRecordsBatch() + throws Exception { + for (int p = 0; p < NUM_PARTITION; p++) { + final int partition = p; + Producer producer = + _pulsarClient.newProducer(Schema.STRING).topic(TEST_TOPIC_BATCH).messageRouter(new MessageRouter() { + @Override + public int choosePartition(Message msg, TopicMetadata metadata) { + return partition; + } + }).batchingMaxMessages(BATCH_SIZE).batchingMaxPublishDelay(1, TimeUnit.SECONDS).create(); + + for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) { + CompletableFuture messageIdCompletableFuture = producer.sendAsync(MESSAGE_PREFIX + "_" + i); + messageIdCompletableFuture.thenAccept(messageId -> { + if (!_partitionToFirstMessageIdMapBatch.containsKey(partition)) { + _partitionToFirstMessageIdMapBatch.put(partition, messageId); + } + }); + } + + producer.flush(); + } + } + + public StreamConfig getStreamConfig(String topicName) { String streamType = "pulsar"; String streamPulsarBrokerList = "pulsar://localhost:" + _pulsarStandaloneCluster.getBrokerPort(); String streamPulsarConsumerType = "simple"; @@ -119,7 +156,7 @@ public StreamConfig getStreamConfig() { Map streamConfigMap = new HashMap<>(); streamConfigMap.put("streamType", streamType); streamConfigMap.put("stream.pulsar.consumer.type", streamPulsarConsumerType); - streamConfigMap.put("stream.pulsar.topic.name", TEST_TOPIC); + streamConfigMap.put("stream.pulsar.topic.name", topicName); streamConfigMap.put("stream.pulsar.bootstrap.servers", streamPulsarBrokerList); streamConfigMap.put("stream.pulsar.consumer.prop.auto.offset.reset", "smallest"); streamConfigMap.put("stream.pulsar.consumer.factory.class.name", getPulsarConsumerFactoryName()); @@ -140,8 +177,10 @@ protected String getPulsarConsumerFactoryName() { public void testPartitionLevelConsumer() throws Exception { - final StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(getStreamConfig()); - int numPartitions = new PulsarStreamMetadataProvider(CLIENT_ID, getStreamConfig()).fetchPartitionCount(10000); + final StreamConsumerFactory streamConsumerFactory = + StreamConsumerFactoryProvider.create(getStreamConfig(TEST_TOPIC)); + int numPartitions = + new PulsarStreamMetadataProvider(CLIENT_ID, getStreamConfig(TEST_TOPIC)).fetchPartitionCount(10000); for (int partition = 0; partition < numPartitions; partition++) { PartitionGroupConsumptionStatus partitionGroupConsumptionStatus = @@ -161,8 +200,9 @@ public void testPartitionLevelConsumer() totalMessagesReceived++; } - final MessageBatch messageBatch2 = consumer - .fetchMessages(new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 500)), null, 10000); + final MessageBatch messageBatch2 = + consumer.fetchMessages(new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 500)), null, + 10000); Assert.assertEquals(messageBatch2.getMessageCount(), 500); for (int i = 0; i < messageBatch2.getMessageCount(); i++) { final byte[] msg = (byte[]) messageBatch2.getMessageAtIndex(i); @@ -170,8 +210,8 @@ public void testPartitionLevelConsumer() totalMessagesReceived++; } - final MessageBatch messageBatch3 = consumer - .fetchMessages(new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 10)), + final MessageBatch messageBatch3 = + consumer.fetchMessages(new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 10)), new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 35)), 10000); Assert.assertEquals(messageBatch3.getMessageCount(), 25); for (int i = 0; i < messageBatch3.getMessageCount(); i++) { @@ -183,9 +223,66 @@ public void testPartitionLevelConsumer() } } + @Test + public void testPartitionLevelConsumerBatchMessages() + throws Exception { + + final StreamConsumerFactory streamConsumerFactory = + StreamConsumerFactoryProvider.create(getStreamConfig(TEST_TOPIC_BATCH)); + int numPartitions = + new PulsarStreamMetadataProvider(CLIENT_ID, getStreamConfig(TEST_TOPIC_BATCH)).fetchPartitionCount(10000); + + for (int partition = 0; partition < numPartitions; partition++) { + PartitionGroupConsumptionStatus partitionGroupConsumptionStatus = + new PartitionGroupConsumptionStatus(partition, 1, new MessageIdStreamOffset(MessageId.earliest), null, + "CONSUMING"); + + int totalMessagesReceived = 0; + + final PartitionGroupConsumer consumer = + streamConsumerFactory.createPartitionGroupConsumer(CLIENT_ID, partitionGroupConsumptionStatus); + final MessageBatch messageBatch1 = consumer.fetchMessages(new MessageIdStreamOffset(MessageId.earliest), + new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 500)), 10000); + Assert.assertEquals(messageBatch1.getMessageCount(), 500); + for (int i = 0; i < messageBatch1.getMessageCount(); i++) { + final byte[] msg = (byte[]) messageBatch1.getMessageAtIndex(i); + Assert.assertEquals(new String(msg), "sample_msg_" + i); + totalMessagesReceived++; + } + + final MessageBatch messageBatch2 = + consumer.fetchMessages(new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 500)), null, + 10000); + Assert.assertEquals(messageBatch2.getMessageCount(), 500); + for (int i = 0; i < messageBatch2.getMessageCount(); i++) { + final byte[] msg = (byte[]) messageBatch2.getMessageAtIndex(i); + Assert.assertEquals(new String(msg), "sample_msg_" + (500 + i)); + totalMessagesReceived++; + } + + final MessageBatch messageBatch3 = + consumer.fetchMessages(new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 10)), + new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 35)), 10000); + Assert.assertEquals(messageBatch3.getMessageCount(), 25); + for (int i = 0; i < messageBatch3.getMessageCount(); i++) { + final byte[] msg = (byte[]) messageBatch3.getMessageAtIndex(i); + Assert.assertEquals(new String(msg), "sample_msg_" + (10 + i)); + } + + Assert.assertEquals(totalMessagesReceived, NUM_RECORDS_PER_PARTITION); + } + } + public MessageId getMessageIdForPartitionAndIndex(int partitionNum, int index) { MessageId startMessageIdRaw = _partitionToFirstMessageIdMap.get(partitionNum); MessageIdImpl startMessageId = MessageIdImpl.convertToMessageIdImpl(startMessageIdRaw); return DefaultImplementation.newMessageId(startMessageId.getLedgerId(), index, partitionNum); } + + public MessageId getBatchMessageIdForPartitionAndIndex(int partitionNum, int index) { + MessageId startMessageIdRaw = _partitionToFirstMessageIdMapBatch.get(partitionNum); + BatchMessageIdImpl startMessageId = (BatchMessageIdImpl) MessageIdImpl.convertToMessageIdImpl(startMessageIdRaw); + return new BatchMessageIdImpl(startMessageId.getLedgerId(), index / BATCH_SIZE, partitionNum, index % BATCH_SIZE, + startMessageId.getBatchSize(), startMessageId.getAcker()); + } } From 72aec7a85af7fccf00e1a031255ae67c5a647ea9 Mon Sep 17 00:00:00 2001 From: KKcorps Date: Tue, 5 Apr 2022 01:58:01 +0530 Subject: [PATCH 09/13] Close consumers properly and honour offset criteria in arguments --- .../plugin/stream/pulsar/PulsarConfig.java | 10 +---- .../pulsar/PulsarStreamMetadataProvider.java | 20 +++++++-- .../plugin/stream/pulsar/PulsarUtils.java | 41 +++++++++++++++++++ 3 files changed, 60 insertions(+), 11 deletions(-) create mode 100644 pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java index 5032306f78d..8ce884e76c5 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java @@ -19,8 +19,6 @@ package org.apache.pinot.plugin.stream.pulsar; import com.google.common.base.Preconditions; -import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.Map; import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.StreamConfig; @@ -57,12 +55,8 @@ public PulsarConfig(StreamConfig streamConfig, String subscriberId) { _initialMessageId = MessageId.earliest; } else if (offsetCriteria.isLargest()) { _initialMessageId = MessageId.latest; - } else if (offsetCriteria.isCustom()) { - try { - _initialMessageId = MessageId.fromByteArray(offsetCriteria.getOffsetString().getBytes(StandardCharsets.UTF_8)); - } catch (IOException e) { - throw new RuntimeException("Invalid offset string found: " + offsetCriteria.getOffsetString()); - } + } else { + throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java index 47389f464c6..73f1237ba24 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java @@ -82,11 +82,12 @@ public int fetchPartitionCount(long timeoutMillis) { @Override public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) { Preconditions.checkNotNull(offsetCriteria); + Consumer consumer = null; try { MessageId offset = null; - Consumer consumer = + consumer = _pulsarClient.newConsumer().topic(_topic) - .subscriptionInitialPosition(_config.getInitialSubscriberPosition()) + .subscriptionInitialPosition(PulsarUtils.offsetCriteriaToSubscription(offsetCriteria)) .subscriptionName("Pinot_" + UUID.randomUUID()).subscribe(); if (offsetCriteria.isLargest()) { @@ -101,6 +102,8 @@ public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offset LOGGER.error("Cannot fetch offsets for partition " + _partition + " and topic " + _topic + " and offsetCriteria " + offsetCriteria, e); return null; + } finally { + closeConsumer(consumer); } } @@ -119,6 +122,7 @@ public List computePartitionGroupMetadata(String clientI partitionGroupConsumptionStatus.getStartOffset())); } + Consumer consumer = null; try { List partitionedTopicNameList = _pulsarClient.getPartitionsForTopic(_topic).get(); @@ -127,7 +131,7 @@ public List computePartitionGroupMetadata(String clientI for (int p = newPartitionStartIndex; p < partitionedTopicNameList.size(); p++) { - Consumer consumer = _pulsarClient.newConsumer().topic(getPartitionedTopicName(p)) + consumer = _pulsarClient.newConsumer().topic(partitionedTopicNameList.get(p)) .subscriptionInitialPosition(_config.getInitialSubscriberPosition()) .subscriptionName(ConsumerName.generateRandomName()).subscribe(); @@ -149,11 +153,21 @@ public List computePartitionGroupMetadata(String clientI } } catch (Exception e) { LOGGER.warn("Error encountered while calculating pulsar partition group metadata: " + e.getMessage(), e); + } finally { + closeConsumer(consumer); } return newPartitionGroupMetadataList; } + private void closeConsumer(Consumer consumer) { + try { + consumer.close(); + } catch (Exception e) { + LOGGER.warn("Caught exception while shutting down Pulsar consumer with id {}", consumer, e); + } + } + @Override public void close() throws IOException { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java new file mode 100644 index 00000000000..fd2272a4773 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.pulsar; + +import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; + + +public class PulsarUtils { + + private PulsarUtils() { + } + + public static SubscriptionInitialPosition offsetCriteriaToSubscription(OffsetCriteria offsetCriteria) + throws IllegalArgumentException { + if (offsetCriteria.isLargest()) { + return SubscriptionInitialPosition.Latest; + } + if (offsetCriteria.isSmallest()) { + return SubscriptionInitialPosition.Earliest; + } + + throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria); + } +} From 35fbf59962e916842c5e71c78def94080286bea0 Mon Sep 17 00:00:00 2001 From: KKcorps Date: Tue, 5 Apr 2022 02:05:39 +0530 Subject: [PATCH 10/13] Remove default for subscription initial position --- .../apache/pinot/plugin/stream/pulsar/PulsarConfig.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java index 8ce884e76c5..ddda097febe 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java @@ -39,6 +39,7 @@ public class PulsarConfig { private String _subscriberId; private String _bootstrapServers; private MessageId _initialMessageId; + private SubscriptionInitialPosition _subscriptionInitialPosition; public PulsarConfig(StreamConfig streamConfig, String subscriberId) { Map streamConfigMap = streamConfig.getStreamConfigsMap(); @@ -53,8 +54,10 @@ public PulsarConfig(StreamConfig streamConfig, String subscriberId) { if (offsetCriteria.isSmallest()) { _initialMessageId = MessageId.earliest; + _subscriptionInitialPosition = SubscriptionInitialPosition.Earliest; } else if (offsetCriteria.isLargest()) { _initialMessageId = MessageId.latest; + _subscriptionInitialPosition = SubscriptionInitialPosition.Latest; } else { throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria); } @@ -77,9 +80,6 @@ public MessageId getInitialMessageId() { } public SubscriptionInitialPosition getInitialSubscriberPosition() { - if (_initialMessageId == MessageId.earliest) { - return SubscriptionInitialPosition.Earliest; - } - return SubscriptionInitialPosition.Latest; + return _subscriptionInitialPosition; } } From 4927151580cd92b7688246796740aad5919b1930 Mon Sep 17 00:00:00 2001 From: KKcorps Date: Tue, 5 Apr 2022 02:16:32 +0530 Subject: [PATCH 11/13] Honour config passed in function --- .../plugin/stream/pulsar/PulsarStreamMetadataProvider.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java index 73f1237ba24..36b42e4549c 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java @@ -122,6 +122,7 @@ public List computePartitionGroupMetadata(String clientI partitionGroupConsumptionStatus.getStartOffset())); } + PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, clientId); Consumer consumer = null; try { List partitionedTopicNameList = _pulsarClient.getPartitionsForTopic(_topic).get(); @@ -132,7 +133,7 @@ public List computePartitionGroupMetadata(String clientI for (int p = newPartitionStartIndex; p < partitionedTopicNameList.size(); p++) { consumer = _pulsarClient.newConsumer().topic(partitionedTopicNameList.get(p)) - .subscriptionInitialPosition(_config.getInitialSubscriberPosition()) + .subscriptionInitialPosition(pulsarConfig.getInitialSubscriberPosition()) .subscriptionName(ConsumerName.generateRandomName()).subscribe(); Message message = consumer.receive(timeoutMillis, TimeUnit.MILLISECONDS); From da7213203a8c5efafcbc4fe7f699f93dad76754e Mon Sep 17 00:00:00 2001 From: KKcorps Date: Tue, 5 Apr 2022 18:15:43 +0530 Subject: [PATCH 12/13] handle null --- .../plugin/stream/pulsar/PulsarStreamMetadataProvider.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java index 36b42e4549c..742b2ca3328 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java @@ -163,7 +163,9 @@ public List computePartitionGroupMetadata(String clientI private void closeConsumer(Consumer consumer) { try { - consumer.close(); + if (consumer != null) { + consumer.close(); + } } catch (Exception e) { LOGGER.warn("Caught exception while shutting down Pulsar consumer with id {}", consumer, e); } From d0fb9a478604cfa09f948d5beb6f4a490d01a2d9 Mon Sep 17 00:00:00 2001 From: KKcorps Date: Wed, 6 Apr 2022 08:07:14 +0530 Subject: [PATCH 13/13] Move method to pulsar utils --- .../pinot/plugin/stream/pulsar/PulsarConfig.java | 11 ++--------- .../pinot/plugin/stream/pulsar/PulsarUtils.java | 13 +++++++++++++ 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java index ddda097febe..2ce2c7551ab 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java @@ -52,15 +52,8 @@ public PulsarConfig(StreamConfig streamConfig, String subscriberId) { OffsetCriteria offsetCriteria = streamConfig.getOffsetCriteria(); - if (offsetCriteria.isSmallest()) { - _initialMessageId = MessageId.earliest; - _subscriptionInitialPosition = SubscriptionInitialPosition.Earliest; - } else if (offsetCriteria.isLargest()) { - _initialMessageId = MessageId.latest; - _subscriptionInitialPosition = SubscriptionInitialPosition.Latest; - } else { - throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria); - } + _subscriptionInitialPosition = PulsarUtils.offsetCriteriaToSubscription(offsetCriteria); + _initialMessageId = PulsarUtils.offsetCriteriaToMessageId(offsetCriteria); } public String getPulsarTopicName() { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java index fd2272a4773..763b0fc0d42 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java @@ -19,6 +19,7 @@ package org.apache.pinot.plugin.stream.pulsar; import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.SubscriptionInitialPosition; @@ -38,4 +39,16 @@ public static SubscriptionInitialPosition offsetCriteriaToSubscription(OffsetCri throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria); } + + public static MessageId offsetCriteriaToMessageId(OffsetCriteria offsetCriteria) + throws IllegalArgumentException { + if (offsetCriteria.isLargest()) { + return MessageId.latest; + } + if (offsetCriteria.isSmallest()) { + return MessageId.earliest; + } + + throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria); + } }