Skip to content

Commit

Permalink
Fix: function with multi-topic not acking on effectively-once (#2347)
Browse files Browse the repository at this point in the history
### Motivation

`MultiTopicsConsumerImpl` doesn't support `acknowledgeCumulativeAsync` and therefore, function with multi-topic and `EFFECTIVELY_ONCE` processing is not acking message and failing `EFFECTIVELY_ONCE` behavior.

### Modifications

Function should ack message for a specific topic consumer if `inputTopicConsumer` is multi-topic consumer.

### Result

Function should able to ack messages for multi-topic consumer when processing-guarantee is `EFFECTIVELY_ONCE`
  • Loading branch information
rdhabalia authored and sijie committed Aug 13, 2018
1 parent 777223d commit 0d2154e
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 26 deletions.
Expand Up @@ -290,19 +290,14 @@ public void testSillyUser() throws Exception {
} }


try { try {
producer = pulsarClient.newProducer().topic(topicName.toString()) producer = pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false)
.enableBatching(false) .messageRoutingMode(MessageRoutingMode.SinglePartition).create();
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
consumer = pulsarClient.newConsumer().topic(topicName.toString()).subscriptionName("my-sub").subscribe(); consumer = pulsarClient.newConsumer().topic(topicName.toString()).subscriptionName("my-sub").subscribe();
producer.send("message1".getBytes()); producer.send("message1".getBytes());
producer.send("message2".getBytes()); producer.send("message2".getBytes());
/* Message<byte[]> msg1 = */ consumer.receive(); /* Message<byte[]> msg1 = */ consumer.receive();
Message<byte[]> msg2 = consumer.receive(); Message<byte[]> msg2 = consumer.receive();
consumer.acknowledgeCumulative(msg2); consumer.acknowledgeCumulative(msg2);
Assert.fail("should fail since ack cumulative is not supported for partitioned topic");
} catch (PulsarClientException e) {
Assert.assertTrue(e instanceof PulsarClientException.NotSupportedException);
} finally { } finally {
producer.close(); producer.close();
consumer.unsubscribe(); consumer.unsubscribe();
Expand Down
Expand Up @@ -62,6 +62,7 @@
import org.apache.pulsar.functions.instance.JavaInstanceRunnable; import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
Expand Down Expand Up @@ -356,11 +357,11 @@ public void testPulsarSinkStats() throws Exception {
retryStrategically((test) -> { retryStrategically((test) -> {
try { try {
SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
return subStats.unackedMessages == 0; return subStats.unackedMessages == 0 && subStats.msgThroughputOut == totalMsgs;
} catch (PulsarAdminException e) { } catch (PulsarAdminException e) {
return false; return false;
} }
}, 5, 500); }, 5, 200);


FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager(); FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager();
functionRuntimeManager.updateRates(); functionRuntimeManager.updateRates();
Expand Down Expand Up @@ -399,11 +400,12 @@ protected static FunctionDetails createSinkConfig(String jarFile, String tenant,
functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA); functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
functionDetailsBuilder.setParallelism(1); functionDetailsBuilder.setParallelism(1);
functionDetailsBuilder.setClassName(IdentityFunction.class.getName()); functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
functionDetailsBuilder.setProcessingGuarantees(ProcessingGuarantees.EFFECTIVELY_ONCE);


// set source spec // set source spec
// source spec classname should be empty so that the default pulsar source will be used // source spec classname should be empty so that the default pulsar source will be used
SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder(); SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED); sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.FAILOVER);
sourceSpecBuilder.setTypeClassName(typeArg.getName()); sourceSpecBuilder.setTypeClassName(typeArg.getName());
sourceSpecBuilder.setTopicsPattern(sourceTopicPattern); sourceSpecBuilder.setTopicsPattern(sourceTopicPattern);
sourceSpecBuilder.setSubscriptionName(subscriptionName); sourceSpecBuilder.setSubscriptionName(subscriptionName);
Expand Down Expand Up @@ -484,7 +486,7 @@ public void testFileUrlFunctionWithoutPassingTypeArgs() throws Exception {
// set source spec // set source spec
// source spec classname should be empty so that the default pulsar source will be used // source spec classname should be empty so that the default pulsar source will be used
SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder(); SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED); sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.FAILOVER);
sourceSpecBuilder.putTopicsToSerDeClassName(sinkTopic, DefaultSerDe.class.getName()); sourceSpecBuilder.putTopicsToSerDeClassName(sinkTopic, DefaultSerDe.class.getName());
functionDetailsBuilder.setAutoAck(true); functionDetailsBuilder.setAutoAck(true);
functionDetailsBuilder.setSource(sourceSpecBuilder); functionDetailsBuilder.setSource(sourceSpecBuilder);
Expand Down
Expand Up @@ -362,22 +362,27 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType, protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
Map<String,Long> properties) { Map<String,Long> properties) {
checkArgument(messageId instanceof TopicMessageIdImpl); checkArgument(messageId instanceof TopicMessageIdImpl);
TopicMessageIdImpl messageId1 = (TopicMessageIdImpl) messageId; TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;


if (getState() != State.Ready) { if (getState() != State.Ready) {
return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed")); return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed"));
} }


if (ackType == AckType.Cumulative) { if (ackType == AckType.Cumulative) {
return FutureUtil.failedFuture(new PulsarClientException.NotSupportedException( Consumer individualConsumer = consumers.get(topicMessageId.getTopicName());
"Cumulative acknowledge not supported for topics consumer")); if (individualConsumer != null) {
MessageId innerId = topicMessageId.getInnerMessageId();
return individualConsumer.acknowledgeCumulativeAsync(innerId);
} else {
return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
}
} else { } else {
ConsumerImpl<T> consumer = consumers.get(messageId1.getTopicName()); ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicName());


MessageId innerId = messageId1.getInnerMessageId(); MessageId innerId = topicMessageId.getInnerMessageId();
return consumer.doAcknowledge(innerId, ackType, properties) return consumer.doAcknowledge(innerId, ackType, properties)
.thenRun(() -> .thenRun(() ->
unAckedMessageTracker.remove(messageId1)); unAckedMessageTracker.remove(topicMessageId));
} }
} }


Expand Down
Expand Up @@ -20,25 +20,17 @@


import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank;


import com.google.common.annotations.VisibleForTesting;

import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.api.SerDe;
Expand All @@ -49,6 +41,10 @@
import org.apache.pulsar.io.core.Source; import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.SourceContext;


import com.google.common.annotations.VisibleForTesting;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver; import net.jodah.typetools.TypeResolver;


@Slf4j @Slf4j
Expand Down Expand Up @@ -142,7 +138,8 @@ public Record<T> read() throws Exception {
.message(message) .message(message)
.topicName(topicName) .topicName(topicName)
.ackFunction(() -> { .ackFunction(() -> {
if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { if (pulsarSourceConfig
.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
inputConsumer.acknowledgeCumulativeAsync(message); inputConsumer.acknowledgeCumulativeAsync(message);
} else { } else {
inputConsumer.acknowledgeAsync(message); inputConsumer.acknowledgeAsync(message);
Expand Down

0 comments on commit 0d2154e

Please sign in to comment.