Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch Pulsar offsets from Consumer interface instead of Reader #8017

Merged
merged 13 commits into from
Apr 6, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
package org.apache.pinot.plugin.stream.pulsar;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;


/**
Expand All @@ -40,6 +39,7 @@ public class PulsarConfig {
private String _subscriberId;
private String _bootstrapServers;
private MessageId _initialMessageId;
private SubscriptionInitialPosition _subscriptionInitialPosition;

public PulsarConfig(StreamConfig streamConfig, String subscriberId) {
Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
Expand All @@ -52,17 +52,8 @@ public PulsarConfig(StreamConfig streamConfig, String subscriberId) {

OffsetCriteria offsetCriteria = streamConfig.getOffsetCriteria();

if (offsetCriteria.isSmallest()) {
_initialMessageId = MessageId.earliest;
} else if (offsetCriteria.isLargest()) {
_initialMessageId = MessageId.latest;
} else if (offsetCriteria.isCustom()) {
try {
_initialMessageId = MessageId.fromByteArray(offsetCriteria.getOffsetString().getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
throw new RuntimeException("Invalid offset string found: " + offsetCriteria.getOffsetString());
}
}
_subscriptionInitialPosition = PulsarUtils.offsetCriteriaToSubscription(offsetCriteria);
_initialMessageId = PulsarUtils.offsetCriteriaToMessageId(offsetCriteria);
}

public String getPulsarTopicName() {
Expand All @@ -80,4 +71,8 @@ public String getBootstrapServers() {
public MessageId getInitialMessageId() {
return _initialMessageId;
}

public SubscriptionInitialPosition getInitialSubscriberPosition() {
return _subscriptionInitialPosition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.internal.DefaultImplementation;

Expand Down Expand Up @@ -75,9 +76,28 @@ public int getMessageLengthAtIndex(int index) {
@Override
public StreamPartitionMsgOffset getNextStreamPartitionMsgOffsetAtIndex(int index) {
MessageIdImpl currentMessageId = MessageIdImpl.convertToMessageIdImpl(_messageList.get(index).getMessageId());
MessageId nextMessageId = DefaultImplementation
.newMessageId(currentMessageId.getLedgerId(), currentMessageId.getEntryId() + 1,
currentMessageId.getPartitionIndex());
MessageId nextMessageId;

long currentLedgerId = currentMessageId.getLedgerId();
long currentEntryId = currentMessageId.getEntryId();
int currentPartitionIndex = currentMessageId.getPartitionIndex();

if (currentMessageId instanceof BatchMessageIdImpl) {
int currentBatchIndex = ((BatchMessageIdImpl) currentMessageId).getBatchIndex();
int currentBatchSize = ((BatchMessageIdImpl) currentMessageId).getBatchSize();

if (currentBatchIndex < currentBatchSize - 1) {
nextMessageId =
new BatchMessageIdImpl(currentLedgerId, currentEntryId, currentPartitionIndex, currentBatchIndex + 1,
currentBatchSize, ((BatchMessageIdImpl) currentMessageId).getAcker());
} else {
nextMessageId =
new BatchMessageIdImpl(currentLedgerId, currentEntryId + 1, currentPartitionIndex, 0, currentBatchSize,
((BatchMessageIdImpl) currentMessageId).getAcker());
}
} else {
nextMessageId = DefaultImplementation.newMessageId(currentLedgerId, currentEntryId + 1, currentPartitionIndex);
}
return new MessageIdStreamOffset(nextMessageId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public PulsarPartitionLevelConnectionHandler(String clientId, StreamConfig strea
_pulsarClient = PulsarClient.builder().serviceUrl(_config.getBootstrapServers()).build();

_reader = _pulsarClient.newReader().topic(getPartitionedTopicName(partition))
.startMessageId(_config.getInitialMessageId()).create();
.startMessageId(_config.getInitialMessageId()).startMessageIdInclusive().create();

LOGGER.info("Created consumer with id {} for topic {}", _reader, _config.getPulsarTopicName());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.util.ConsumerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -78,18 +82,18 @@ public int fetchPartitionCount(long timeoutMillis) {
@Override
public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) {
Preconditions.checkNotNull(offsetCriteria);
Consumer consumer = null;
try {
MessageId offset = null;
consumer =
_pulsarClient.newConsumer().topic(_topic)
.subscriptionInitialPosition(PulsarUtils.offsetCriteriaToSubscription(offsetCriteria))
.subscriptionName("Pinot_" + UUID.randomUUID()).subscribe();

if (offsetCriteria.isLargest()) {
_reader.seek(MessageId.latest);
if (_reader.hasMessageAvailable()) {
offset = _reader.readNext().getMessageId();
}
offset = consumer.getLastMessageId();
} else if (offsetCriteria.isSmallest()) {
_reader.seek(MessageId.earliest);
if (_reader.hasMessageAvailable()) {
offset = _reader.readNext().getMessageId();
}
offset = consumer.receive().getMessageId();
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
} else {
throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria);
}
Expand All @@ -98,6 +102,8 @@ public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offset
LOGGER.error("Cannot fetch offsets for partition " + _partition + " and topic " + _topic + " and offsetCriteria "
+ offsetCriteria, e);
return null;
} finally {
closeConsumer(consumer);
}
}

Expand All @@ -116,6 +122,8 @@ public List<PartitionGroupMetadata> computePartitionGroupMetadata(String clientI
partitionGroupConsumptionStatus.getStartOffset()));
}

PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, clientId);
Consumer consumer = null;
try {
List<String> partitionedTopicNameList = _pulsarClient.getPartitionsForTopic(_topic).get();

Expand All @@ -124,24 +132,45 @@ public List<PartitionGroupMetadata> computePartitionGroupMetadata(String clientI

for (int p = newPartitionStartIndex; p < partitionedTopicNameList.size(); p++) {

Reader reader =
_pulsarClient.newReader().topic(getPartitionedTopicName(p)).startMessageId(_config.getInitialMessageId())
.create();
consumer = _pulsarClient.newConsumer().topic(partitionedTopicNameList.get(p))
.subscriptionInitialPosition(pulsarConfig.getInitialSubscriberPosition())
.subscriptionName(ConsumerName.generateRandomName()).subscribe();

if (reader.hasMessageAvailable()) {
Message message = reader.readNext();
Message message = consumer.receive(timeoutMillis, TimeUnit.MILLISECONDS);
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
if (message != null) {
newPartitionGroupMetadataList.add(
new PartitionGroupMetadata(p, new MessageIdStreamOffset(message.getMessageId())));
} else {
MessageId lastMessageId;
try {
lastMessageId = (MessageId) consumer.getLastMessageIdAsync().get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (TimeoutException t) {
lastMessageId = MessageId.latest;
}
newPartitionGroupMetadataList.add(
new PartitionGroupMetadata(p, new MessageIdStreamOffset(lastMessageId)));
}
}
}
} catch (Exception e) {
// No partition found
LOGGER.warn("Error encountered while calculating pulsar partition group metadata: " + e.getMessage(), e);
} finally {
closeConsumer(consumer);
}

return newPartitionGroupMetadataList;
}

private void closeConsumer(Consumer consumer) {
try {
if (consumer != null) {
consumer.close();
}
} catch (Exception e) {
LOGGER.warn("Caught exception while shutting down Pulsar consumer with id {}", consumer, e);
}
}

@Override
public void close()
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.stream.pulsar;

import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;


public class PulsarUtils {

private PulsarUtils() {
}

public static SubscriptionInitialPosition offsetCriteriaToSubscription(OffsetCriteria offsetCriteria)
throws IllegalArgumentException {
if (offsetCriteria.isLargest()) {
return SubscriptionInitialPosition.Latest;
}
if (offsetCriteria.isSmallest()) {
return SubscriptionInitialPosition.Earliest;
}

throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria);
}

public static MessageId offsetCriteriaToMessageId(OffsetCriteria offsetCriteria)
throws IllegalArgumentException {
if (offsetCriteria.isLargest()) {
return MessageId.latest;
}
if (offsetCriteria.isSmallest()) {
return MessageId.earliest;
}

throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria);
}
}
Loading