Skip to content

Commit

Permalink
support cumulative ack in multiConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Aug 9, 2018
1 parent f774044 commit f95b6ad
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 25 deletions.
Expand Up @@ -357,11 +357,11 @@ public void testPulsarSinkStats() throws Exception {
retryStrategically((test) -> {
try {
SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
return subStats.unackedMessages == 0;
return subStats.unackedMessages == 0 && subStats.msgThroughputOut == totalMsgs;
} catch (PulsarAdminException e) {
return false;
}
}, 5, 500);
}, 5, 200);

FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager();
functionRuntimeManager.updateRates();
Expand Down Expand Up @@ -486,7 +486,7 @@ public void testFileUrlFunctionWithoutPassingTypeArgs() throws Exception {
// set source spec
// source spec classname should be empty so that the default pulsar source will be used
SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.FAILOVER);
sourceSpecBuilder.putTopicsToSerDeClassName(sinkTopic, DefaultSerDe.class.getName());
functionDetailsBuilder.setAutoAck(true);
functionDetailsBuilder.setSource(sourceSpecBuilder);
Expand Down
Expand Up @@ -362,22 +362,27 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
Map<String,Long> properties) {
checkArgument(messageId instanceof TopicMessageIdImpl);
TopicMessageIdImpl messageId1 = (TopicMessageIdImpl) messageId;
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;

if (getState() != State.Ready) {
return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed"));
}

if (ackType == AckType.Cumulative) {
return FutureUtil.failedFuture(new PulsarClientException.NotSupportedException(
"Cumulative acknowledge not supported for topics consumer"));
Consumer individualConsumer = consumers.get(topicMessageId.getTopicName());
if (individualConsumer != null) {
MessageId innerId = topicMessageId.getInnerMessageId();
return individualConsumer.acknowledgeCumulativeAsync(innerId);
} else {
return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
}
} else {
ConsumerImpl<T> consumer = consumers.get(messageId1.getTopicName());
ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicName());

MessageId innerId = messageId1.getInnerMessageId();
MessageId innerId = topicMessageId.getInnerMessageId();
return consumer.doAcknowledge(innerId, ackType, properties)
.thenRun(() ->
unAckedMessageTracker.remove(messageId1));
unAckedMessageTracker.remove(topicMessageId));
}
}

Expand Down Expand Up @@ -839,10 +844,6 @@ public List<String> getPartitionedTopics() {
public List<ConsumerImpl<T>> getConsumers() {
return consumers.values().stream().collect(Collectors.toList());
}

public Optional<Consumer> getConsumer(TopicMessageIdImpl messageId) {
return Optional.ofNullable(consumers.get(messageId.getTopicName()));
}

private static final Logger log = LoggerFactory.getLogger(MultiTopicsConsumerImpl.class);
}
Expand Up @@ -25,14 +25,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.SerDe;
Expand Down Expand Up @@ -142,16 +140,7 @@ public Record<T> read() throws Exception {
.ackFunction(() -> {
if (pulsarSourceConfig
.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
// try to find actual consumer of the messageId
if (inputConsumer instanceof MultiTopicsConsumerImpl) {
TopicMessageIdImpl msgId = (TopicMessageIdImpl) message.getMessageId();
Optional<org.apache.pulsar.client.api.Consumer> individualConsumer = ((MultiTopicsConsumerImpl) inputConsumer)
.getConsumer(msgId);
individualConsumer.orElse(inputConsumer)
.acknowledgeCumulativeAsync(msgId.getInnerMessageId());
} else {
inputConsumer.acknowledgeCumulativeAsync(message);
}
inputConsumer.acknowledgeCumulativeAsync(message);
} else {
inputConsumer.acknowledgeAsync(message);
}
Expand Down

0 comments on commit f95b6ad

Please sign in to comment.