Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Function localrun quits when using partitioned topic. #2330

Closed
codelipenghui opened this issue Aug 7, 2018 · 6 comments
Closed

Function localrun quits when using partitioned topic. #2330

codelipenghui opened this issue Aug 7, 2018 · 6 comments
Assignees
Labels
area/function type/bug The PR fixed a bug or issue reported a bug
Milestone

Comments

@codelipenghui
Copy link
Contributor

First i write a demo package by gradle

plugins {
    id 'java'
    id "com.github.johnrengelman.shadow" version "2.0.4"
}
package com.codelipenghui.pulsar.function;

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public class WordCountFunction implements Function<String, Void> {

    @Override
    public Void process(String input, Context context) throws Exception {
        context.incrCounter("test-case-total", 1);
        return null;
    }
}

I got info :

[root@qa-5-171 apache-pulsar-2.2.0-incubating-SNAPSHOT]# bin/pulsar-admin functions localrun --jar target/pulsar-functions-0.1.0-SNAPSHOT-jar-with-dependencies.jar --className pulsarfunctions.starter.sdk.WordCountFunction --tenant public --namespace default --name word-count-test --inputs persistent://public/default/plat.correctness.verification --output persistent://public/default/word-count-test-output
19:58:34.946 [main] INFO  org.apache.pulsar.functions.runtime.ProcessRuntimeFactory - Java instance jar location is not defined, using the location defined in system environment : /opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/instances/java-instance.jar
19:58:34.954 [main] INFO  org.apache.pulsar.functions.runtime.ProcessRuntimeFactory - Python instance file location is not defined using the location defined in system environment : /opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/instances/python-instance/python_instance_main.py
19:58:34.974 [main] INFO  org.apache.pulsar.functions.runtime.RuntimeSpawner - RuntimeSpawner starting function word-count-test - 0
19:58:34.987 [main] INFO  org.apache.pulsar.functions.runtime.ProcessRuntime - ProcessBuilder starting the process with args java -cp /opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/instances/java-instance.jar -Dpulsar.functions.java.instance.jar=/opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/instances/java-instance.jar -Dlog4j.configurationFile=java_instance_log4j2.yml -Dpulsar.log.dir=/opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/logs/functions -Dpulsar.log.file=word-count-test org.apache.pulsar.functions.runtime.JavaInstanceMain --jar target/pulsar-functions-0.1.0-SNAPSHOT-jar-with-dependencies.jar --instance_id 0 --function_id ac8f5862-34cd-4f0e-869e-39f5717a99a2 --function_version 1498f7f5-5483-42c2-b2ac-c07c5c1626ce --tenant public --namespace default --name word-count-test --function_classname pulsarfunctions.starter.sdk.WordCountFunction --auto_ack true --processing_guarantees ATLEAST_ONCE --pulsar_serviceurl http://localhost:8080/ --use_tls false --tls_allow_insecure false --hostname_verification_enabled false --max_buffered_tuples 1024 --port 16821 --source_type_classname "java.lang.String" --source_subscription_type SHARED --source_topics_serde_classname {"persistent://public/default/plat.correctness.verification":""} --sink_type_classname "java.lang.Void" --sink_topic persistent://public/default/word-count-test-output
19:58:34.995 [main] INFO  org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
19:58:41.297 [main] INFO  org.apache.pulsar.admin.cli.CmdFunctions - RuntimeSpawner quit because of
19:58:41.299 [Thread-3] INFO  org.apache.pulsar.admin.cli.CmdFunctions - Shutting down the localrun runtimeSpawner ...

And then i use https://github.com/streamlio/pulsar-functions-java-starter.git

I got the same.

@sijie sijie added deprecated/question Questions should happened in GitHub Discussions area/function labels Aug 7, 2018
@srkukarni
Copy link
Contributor

Hi,
Just tried it from pulsar master(your logs indicated that you were on 2.2.0-incubating) and was able to start the function. Which exact pulsar branch/head that you saw this error?

@codelipenghui
Copy link
Contributor Author

master branch on monday.
ok i will retry in master branch today.

@sijie
Copy link
Member

sijie commented Aug 8, 2018

I think the problem is state storage is not enabled in cluster mode by default. I am updating the documentation in #2335 . I will verify local run and update here.

@codelipenghui
Copy link
Contributor Author

codelipenghui commented Aug 8, 2018

Topic of plat.correctness.verification is a partitioned topic with six partitions.
When i set input to plat.correctness.verification local run will quit.
When i set input to plat.correctness.verification-partition-0 local run can process success.

  • Local run success with command below
bin/pulsar-admin functions localrun --jar target/function-starter-1.0.0-all.jar --className com.codelipenghui.pulsar.function.WordCountFunction --inputs persistent://public/default/plat.correctness.verification-partition-0 --output persistent://public/default/plat.correctness.verification.wordcount.output --name plat-correctness-verification-wordcount
  • Local run quit with command below
bin/pulsar-admin functions localrun --jar target/function-starter-1.0.0-all.jar --className com.codelipenghui.pulsar.function.WordCountFunction --inputs persistent://public/default/plat.correctness.verification --output persistent://public/default/plat.correctness.verification.wordcount.output --name plat-correctness-verification-wordcount

Log for quit

[root@qa-5-170 /opt/apache-pulsar-2.2.0-incubating-SNAPSHOT]#  bin/pulsar-admin functions localrun --jar target/function-starter-1.0.0-all.jar --className com.codelipenghui.pulsar.function.WordCountFunction --inputs persistent://public/default/plat.correctness.verification --output persistent://public/default/plat.correctness.verification.wordcount.output --name plat-correctness-verification-wordcount
14:20:24.335 [main] INFO  org.apache.pulsar.functions.runtime.ProcessRuntimeFactory - Java instance jar location is not defined, using the location defined in system environment : /opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/instances/java-instance.jar
14:20:24.345 [main] INFO  org.apache.pulsar.functions.runtime.ProcessRuntimeFactory - Python instance file location is not defined using the location defined in system environment : /opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/instances/python-instance/python_instance_main.py
14:20:24.367 [main] INFO  org.apache.pulsar.functions.runtime.RuntimeSpawner - RuntimeSpawner starting function plat-correctness-verification-wordcount - 0
14:20:24.383 [main] INFO  org.apache.pulsar.functions.runtime.ProcessRuntime - ProcessBuilder starting the process with args java -cp /opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/instances/java-instance.jar -Dpulsar.functions.java.instance.jar=/opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/instances/java-instance.jar -Dlog4j.configurationFile=java_instance_log4j2.yml -Dpulsar.log.dir=/opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/logs/functions -Dpulsar.log.file=plat-correctness-verification-wordcount org.apache.pulsar.functions.runtime.JavaInstanceMain --jar target/function-starter-1.0.0-all.jar --instance_id 0 --function_id 0b81a970-2cf7-43ad-9b88-17ee169cab5b --function_version 5795d1da-3a02-43e5-a004-c4689e678688 --tenant public --namespace default --name plat-correctness-verification-wordcount --function_classname com.codelipenghui.pulsar.function.WordCountFunction --auto_ack true --processing_guarantees ATLEAST_ONCE --pulsar_serviceurl http://localhost:8080/ --use_tls false --tls_allow_insecure false --hostname_verification_enabled false --max_buffered_tuples 1024 --port 38107 --source_type_classname "java.lang.String" --source_subscription_type SHARED --source_topics_serde_classname {"persistent://public/default/plat.correctness.verification":""} --sink_type_classname "java.lang.String" --sink_topic persistent://public/default/plat.correctness.verification.wordcount.output
14:20:24.397 [main] INFO  org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
14:20:34.250 [main] INFO  org.apache.pulsar.admin.cli.CmdFunctions - RuntimeSpawner quit because of
14:20:34.252 [Thread-3] INFO  org.apache.pulsar.admin.cli.CmdFunctions - Shutting down the localrun runtimeSpawner ...

@sijie sijie changed the title Functions localrun quit by can't get the reason Function localrun quits when using partitioned topic. Aug 8, 2018
@sijie sijie self-assigned this Aug 8, 2018
@sijie sijie added type/bug The PR fixed a bug or issue reported a bug and removed deprecated/question Questions should happened in GitHub Discussions labels Aug 8, 2018
@sijie sijie added this to the 2.2.0-incubating milestone Aug 8, 2018
@sijie
Copy link
Member

sijie commented Aug 8, 2018

I had a discussion with @codelipenghui .

the original question was around stateful function. stateful function was shipped as a preview feature in 2.1. however it is not fully integrated in cluster mode. so I will be working on that piece in #2335 .

However there is a bigger problem around partitioned-topic. It seems there is some regression in partitioned topic in 2.1, which pulsar functions can't run with partitioned-topic. because we switched to using multi-topic subscription in 2.1, so it will return a partition name instead of topic name, which cause function runtime confused on finding a suitable serde to deserialize the message and it throws RuntimeException and quits.

I marked this issue as bug and updated the issue title to be more specific. this fix should also be cherry-picked to 2.1.1 release as well.

sijie pushed a commit that referenced this issue Aug 27, 2018
* change getTopicName in MultiTopicsConsumer

* change following sijie's comments

* keep both topicName and topicPartitonName in consumer to avoid new string
sijie pushed a commit that referenced this issue Aug 27, 2018
* change getTopicName in MultiTopicsConsumer

* change following sijie's comments

* keep both topicName and topicPartitonName in consumer to avoid new string
@sijie
Copy link
Member

sijie commented Sep 12, 2018

The underlying bug has been fixed at #2346. The fix is included at 2.1.1 and will be included at 2.2.0 release..

@sijie sijie closed this as completed Sep 12, 2018
@sijie sijie assigned jiazhai and unassigned sijie Sep 12, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/function type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

4 participants