diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java index b35c32a79bcff..7943d5a0a5418 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java @@ -437,8 +437,5 @@ public void reset() { statTotalRecordsReceived1min.clear(); _statTotalRecordsReceived1min = statTotalRecordsReceived1min.labels(metricsLabels); - - latestUserExceptions.clear(); - latestSystemExceptions.clear(); } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java index 1b2d8b0954341..3ed2aa15c99d7 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java @@ -191,9 +191,6 @@ public void reset() { statTotalWritten1min.clear(); _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels); - - latestSystemExceptions.clear(); - latestSinkExceptions.clear(); } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java index aac672054c3fc..f496709804e98 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java @@ -190,9 +190,6 @@ public void reset() { statTotalWritten1min.clear(); _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels); - - latestSystemExceptions.clear(); - latestSourceExceptions.clear(); } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index 83e7566a1437b..fd8126077d925 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -122,6 +122,7 @@ public void received(Consumer consumer, Message message) { if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { throw new RuntimeException("Failed to process message: " + message.getMessageId()); } + consumer.negativeAcknowledge(message); }) .build(); diff --git a/pulsar-functions/instance/src/main/python/function_stats.py b/pulsar-functions/instance/src/main/python/function_stats.py index 34793a52eb1b8..63089b642fca4 100644 --- a/pulsar-functions/instance/src/main/python/function_stats.py +++ b/pulsar-functions/instance/src/main/python/function_stats.py @@ -179,9 +179,9 @@ def set_last_invocation(self, time): def add_user_exception(self, exception): error = traceback.format_exc() ts = int(time.time() * 1000) if sys.version_info.major >= 3 else long(time.time() * 1000) - self.latest_sys_exception.append((error, ts)) - if len(self.latest_sys_exception) > 10: - self.latest_sys_exception.pop(0) + self.latest_user_exception.append((error, ts)) + if len(self.latest_user_exception) > 10: + self.latest_user_exception.pop(0) # report exception via prometheus try: @@ -213,8 +213,6 @@ def report_system_exception_prometheus(self, exception, ts): self.system_exceptions.labels(*exception_metric_labels).set(1.0) def reset(self): - self.latest_user_exception = [] - self.latest_sys_exception = [] self._stat_total_processed_successfully_1min._value.set(0.0) self._stat_total_user_exceptions_1min._value.set(0.0) self._stat_total_sys_exceptions_1min._value.set(0.0) diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 8f740f197c3de..86f8644bff25c 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -249,6 +249,8 @@ def actual_execution(self): except Exception as e: Log.exception("Exception while executing user method") self.stats.incr_total_user_exceptions(e) + # If function throws exception then send neg ack for input message back to broker + msg.consumer.negative_acknowledge(msg.message) if self.log_topic_handler is not None: log.remove_all_handlers() @@ -260,6 +262,8 @@ def actual_execution(self): except Exception as e: Log.error("Uncaught exception in Python instance: %s" % e); self.stats.incr_total_sys_exceptions(e) + if msg: + msg.consumer.negative_acknowledge(msg.message) def done_producing(self, consumer, orig_message, topic, result, sent_message): if result == pulsar.Result.Ok: @@ -269,6 +273,8 @@ def done_producing(self, consumer, orig_message, topic, result, sent_message): error_msg = "Failed to publish to topic [%s] with error [%s] with src message id [%s]" % (topic, result, orig_message.message_id()) Log.error(error_msg) self.stats.incr_total_sys_exceptions(Exception(error_msg)) + # If producer fails send output then send neg ack for input message back to broker + consumer.negative_acknowledge(orig_message) def process_result(self, output, msg): diff --git a/tests/docker-images/java-test-functions/pom.xml b/tests/docker-images/java-test-functions/pom.xml new file mode 100644 index 0000000000000..68f1160eae866 --- /dev/null +++ b/tests/docker-images/java-test-functions/pom.xml @@ -0,0 +1,81 @@ + + + + org.apache.pulsar.tests + docker-images + 2.4.0-SNAPSHOT + + 4.0.0 + org.apache.pulsar.tests + java-test-functions + Apache Pulsar :: Tests :: Docker Images :: Java Test Functions + jar + + + + docker + + + integrationTests + + + + + + org.apache.pulsar + pulsar-functions-api-examples + ${project.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + + false> + + + + + + + + + + + + + + + diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/ExceptionFunction.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/ExceptionFunction.java new file mode 100644 index 0000000000000..c2a13e271642d --- /dev/null +++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/ExceptionFunction.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.functions; + +import java.util.function.Function; + +public class ExceptionFunction implements Function { + + int i = 0; + @Override + public String apply(String s) { + i++; + if (i % 10 == 0) { + throw new RuntimeException("test"); + } + + return s + "!"; + } +} diff --git a/tests/docker-images/latest-version-image/Dockerfile b/tests/docker-images/latest-version-image/Dockerfile index ab7b7f8193ad7..be212c0287831 100644 --- a/tests/docker-images/latest-version-image/Dockerfile +++ b/tests/docker-images/latest-version-image/Dockerfile @@ -48,3 +48,5 @@ COPY python-examples/exclamation_with_extra_deps.py /pulsar/examples/python-exam COPY python-examples/exclamation.zip /pulsar/examples/python-examples/ COPY python-examples/producer_schema.py /pulsar/examples/python-examples/ COPY python-examples/consumer_schema.py /pulsar/examples/python-examples/ +COPY python-examples/exception_function.py /pulsar/examples/python-examples/ +COPY target/java-test-functions.jar /pulsar/examples/ diff --git a/tests/docker-images/latest-version-image/pom.xml b/tests/docker-images/latest-version-image/pom.xml index 2f3db0443285d..5076110a78979 100644 --- a/tests/docker-images/latest-version-image/pom.xml +++ b/tests/docker-images/latest-version-image/pom.xml @@ -40,6 +40,11 @@ + + org.apache.pulsar.tests + java-test-functions + ${project.parent.version} + org.apache.pulsar pulsar-all-docker-image @@ -49,6 +54,31 @@ + + maven-dependency-plugin + + + copy-installed + package + + copy + + + + + org.apache.pulsar.tests + java-test-functions + ${project.parent.version} + jar + true + ${project.build.directory} + java-test-functions.jar + + + + + + com.spotify dockerfile-maven-plugin diff --git a/tests/docker-images/latest-version-image/python-examples/exception_function.py b/tests/docker-images/latest-version-image/python-examples/exception_function.py new file mode 100644 index 0000000000000..b14f38975006b --- /dev/null +++ b/tests/docker-images/latest-version-image/python-examples/exception_function.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +i = 0 +def process(input): + global i + i += 1 + if i % 10 == 0: + raise Exception("test"); + + return input + "!" \ No newline at end of file diff --git a/tests/docker-images/pom.xml b/tests/docker-images/pom.xml index 3084e0777f2e8..f65001b6bfbd8 100644 --- a/tests/docker-images/pom.xml +++ b/tests/docker-images/pom.xml @@ -32,6 +32,7 @@ docker-images Apache Pulsar :: Tests :: Docker Images + java-test-functions latest-version-image diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index bb4c6b4cfe454..c31329fbf069c 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -725,6 +725,182 @@ protected void getSourceInfoNotFound(String tenant, String namespace, String sou // Test CRUD functions on different runtimes. // + @Test + public void testPythonFunctionNegAck() throws Exception { + testFunctionNegAck(Runtime.PYTHON); + } + + @Test + public void testJavaFunctionNegAck() throws Exception { + testFunctionNegAck(Runtime.JAVA); + } + + private void testFunctionNegAck(Runtime runtime) throws Exception { + if (functionRuntimeType == FunctionRuntimeType.THREAD) { + return; + } + + Schema schema; + if (Runtime.JAVA == runtime) { + schema = Schema.STRING; + } else { + schema = Schema.BYTES; + } + + String inputTopicName = "persistent://public/default/test-neg-ack-" + runtime + "-input-" + randomName(8); + String outputTopicName = "test-neg-ack-" + runtime + "-output-" + randomName(8); + + String functionName = "test-neg-ack-fn-" + randomName(8); + final int numMessages = 20; + + // submit the exclamation function + + if (runtime == Runtime.PYTHON) { + submitFunction( + runtime, inputTopicName, outputTopicName, functionName, EXCEPTION_FUNCTION_PYTHON_FILE, EXCEPTION_PYTHON_CLASS, schema); + } else { + submitFunction( + runtime, inputTopicName, outputTopicName, functionName, null, EXCEPTION_JAVA_CLASS, schema); + } + + // get function info + getFunctionInfoSuccess(functionName); + + // get function stats + getFunctionStatsEmpty(functionName); + + // publish and consume result + if (Runtime.JAVA == runtime) { + // java supports schema + @Cleanup PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) + .build(); + @Cleanup Consumer consumer = client.newConsumer(Schema.STRING) + .topic(outputTopicName) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName("test-sub") + .subscribe(); + @Cleanup Producer producer = client.newProducer(Schema.STRING) + .topic(inputTopicName) + .create(); + + for (int i = 0; i < numMessages; i++) { + producer.send("message-" + i); + } + + Set expectedMessages = new HashSet<>(); + for (int i = 0; i < numMessages; i++) { + expectedMessages.add("message-" + i + "!"); + } + + for (int i = 0; i < numMessages; i++) { + Message msg = consumer.receive(60 * 2, TimeUnit.SECONDS); + log.info("Received: {}", msg.getValue()); + assertTrue(expectedMessages.contains(msg.getValue())); + expectedMessages.remove(msg.getValue()); + } + assertEquals(expectedMessages.size(), 0); + + } else { + // python doesn't support schema + + @Cleanup PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) + .build(); + @Cleanup Consumer consumer = client.newConsumer(Schema.BYTES) + .topic(outputTopicName) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName("test-sub") + .subscribe(); + + @Cleanup Producer producer = client.newProducer(Schema.BYTES) + .topic(inputTopicName) + .create(); + + for (int i = 0; i < numMessages; i++) { + producer.newMessage().value(("message-" + i).getBytes(UTF_8)).send(); + } + + Set expectedMessages = new HashSet<>(); + for (int i = 0; i < numMessages; i++) { + expectedMessages.add("message-" + i + "!"); + } + + for (int i = 0; i < numMessages; i++) { + Message msg = consumer.receive(60 * 2, TimeUnit.SECONDS); + String msgValue = new String(msg.getValue(), UTF_8); + log.info("Received: {}", msgValue); + assertTrue(expectedMessages.contains(msgValue)); + expectedMessages.remove(msgValue); + } + assertEquals(expectedMessages.size(), 0); + } + + // get function status + ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd( + PulsarCluster.ADMIN_SCRIPT, + "functions", + "status", + "--tenant", "public", + "--namespace", "default", + "--name", functionName + ); + + FunctionStatus functionStatus = FunctionStatus.decode(result.getStdout()); + + assertEquals(functionStatus.getNumInstances(), 1); + assertEquals(functionStatus.getNumRunning(), 1); + assertEquals(functionStatus.getInstances().size(), 1); + assertEquals(functionStatus.getInstances().get(0).getInstanceId(), 0); + assertTrue(functionStatus.getInstances().get(0).getStatus().getAverageLatency() > 0.0); + assertEquals(functionStatus.getInstances().get(0).getStatus().isRunning(), true); + assertTrue(functionStatus.getInstances().get(0).getStatus().getLastInvocationTime() > 0); + // going to receive two more tuples because of delivery + assertEquals(functionStatus.getInstances().get(0).getStatus().getNumReceived(), numMessages + 2); + // only going to successfully process 20 + assertEquals(functionStatus.getInstances().get(0).getStatus().getNumSuccessfullyProcessed(), numMessages); + assertEquals(functionStatus.getInstances().get(0).getStatus().getNumRestarts(), 0); + assertEquals(functionStatus.getInstances().get(0).getStatus().getLatestUserExceptions().size(), 2); + assertEquals(functionStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0); + + // get function stats + result = pulsarCluster.getAnyWorker().execCmd( + PulsarCluster.ADMIN_SCRIPT, + "functions", + "stats", + "--tenant", "public", + "--namespace", "default", + "--name", functionName + ); + + log.info("FUNCTION STATS: {}", result.getStdout()); + + FunctionStats functionStats = FunctionStats.decode(result.getStdout()); + assertEquals(functionStats.getReceivedTotal(), numMessages + 2); + assertEquals(functionStats.getProcessedSuccessfullyTotal(), numMessages); + assertEquals(functionStats.getSystemExceptionsTotal(), 0); + assertEquals(functionStats.getUserExceptionsTotal(), 2); + assertTrue(functionStats.avgProcessLatency > 0); + assertTrue(functionStats.getLastInvocation() > 0); + + assertEquals(functionStats.instances.size(), 1); + assertEquals(functionStats.instances.get(0).getInstanceId(), 0); + assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(), numMessages + 2); + assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(), numMessages); + assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(), 0); + assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(), 2); + assertTrue(functionStats.instances.get(0).getMetrics().avgProcessLatency > 0); + + // delete function + deleteFunction(functionName); + + // get function info + getFunctionInfoNotFound(functionName); + + // make sure subscriptions are cleanup + checkSubscriptionsCleanup(inputTopicName); + } + @Test public void testPythonPublishFunction() throws Exception { testPublishFunction(Runtime.PYTHON); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java index e7173896868e9..823b7d1bce298 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java @@ -76,6 +76,9 @@ public void teardownFunctionWorkers() { public static final String PUBLISH_JAVA_CLASS = "org.apache.pulsar.functions.api.examples.PublishFunctionWithMessageConf"; + public static final String EXCEPTION_JAVA_CLASS = + "org.apache.pulsar.tests.integration.functions.ExceptionFunction"; + public static final String EXCLAMATION_PYTHON_CLASS = "exclamation_function.ExclamationFunction"; @@ -86,13 +89,15 @@ public void teardownFunctionWorkers() { public static final String EXCLAMATION_PYTHONZIP_CLASS = "exclamation"; + public static final String EXCEPTION_PYTHON_CLASS = "exception_function"; + public static final String PUBLISH_PYTHON_CLASS = "publish_function_with_message_conf.PublishFunctionWithMessageConf"; public static final String EXCLAMATION_PYTHON_FILE = "exclamation_function.py"; public static final String EXCLAMATION_WITH_DEPS_PYTHON_FILE = "exclamation_with_extra_deps.py"; public static final String EXCLAMATION_PYTHONZIP_FILE = "exclamation.zip"; public static final String PUBLISH_FUNCTION_PYTHON_FILE = "publish_function_with_message_conf.py"; - + public static final String EXCEPTION_FUNCTION_PYTHON_FILE = "exception_function.py"; protected static String getExclamationClass(Runtime runtime, boolean pyZip, diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java index 64b70a63f91fe..2b16b29345f39 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java @@ -54,7 +54,7 @@ public enum Runtime { private Long slidingIntervalDurationMs; private Map userConfig = new HashMap<>(); - private static final String JAVAJAR = "/pulsar/examples/api-examples.jar"; + private static final String JAVAJAR = "/pulsar/examples/java-test-functions.jar"; private static final String PYTHONBASE = "/pulsar/examples/python-examples/"; public static CommandGenerator createDefaultGenerator(String sourceTopic, String functionClassName) {