Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java Function cannot work with any JSON parser JAR #5136

Closed
borlandor opened this issue Sep 6, 2019 · 2 comments
Closed

Java Function cannot work with any JSON parser JAR #5136

borlandor opened this issue Sep 6, 2019 · 2 comments
Labels
area/function type/bug The PR fixed a bug or issue reported a bug

Comments

@borlandor
Copy link

borlandor commented Sep 6, 2019

Describe the bug
When I use any json.jar ,such as Gson, fastjson, org.json, the Pulsar Java function with JSONObject/JsonObject will crash.

To Reproduce
Steps to reproduce the behavior:

  1. Add JSON parse process based a function example ,such as ContextWindowFunction.java or AddWindowFunction.java:
    cat AddWindowFunction.java

package org.apache.pulsar.functions.api.examples;
import lombok.extern.slf4j.Slf4j;
import java.util.Collection;
import java.util.function.Function;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
//import com.alibaba.fastjson.JSON;
//import com.alibaba.fastjson.JSONObject;
//import org.json.JSONObject;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.apache.pulsar.functions.api.examples.pojo.MoteStatValue;
@slf4j
//public class AddWindowFunction implements Function <Collection, String> {
// @OverRide
// public String apply(Collection LogItems) {
public class AddWindowFunction implements Function <String, String> {
@OverRide
public String apply(String LogItems) {
String result = "";
Map<String, MoteStatValue > mapMoteStat = new HashMap<String, MoteStatValue>();
String record = LogItems;
//for (String record : LogItems)
{
JsonObject jsonObject = new Gson().fromJson(record, JsonObject.class);
String s = jsonObject.get("pktype").getAsString();
if (s.equals("motetx"))
{
String eui = jsonObject.get("eui").getAsString();//jsonObject.getString("eui");
MoteStatValue statValue = mapMoteStat.get(eui);
if (statValue == null)
{
statValue = new MoteStatValue(0,0);
}
statValue.incUpNbs();
statValue.incUpThroughput(jsonObject.get("payloadlen").getAsInt());
mapMoteStat.put(eui,statValue);
result = eui ;
}
}
MoteStatValue statValue = mapMoteStat.get(result);
return result + "=" + statValue.getUpNbs() + "," + statValue.getUpThroughput();
}
}

  1. Make and Unit Test .....................................OK
    [INFO] -------------------------------------------------------
    [INFO] T E S T S
    [INFO] -------------------------------------------------------
    [INFO] Running org.apache.pulsar.functions.api.examples.TestAddWindowFunction
    [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 3.342 s - in org.apache.pulsar.functions.api.examples.TestAddWindowFunction
    cat TestAddWindowFunction.java

package org.apache.pulsar.functions.api.examples;
import org.junit.;
import static junit.framework.Assert.
;
import org.apache.pulsar.functions.api.examples.AddWindowFunction;
public class TestAddWindowFunction{
@test
public void testAddWindowFunction(){
AddWindowFunction myWindowFunction = new AddWindowFunction();
String output = myWindowFunction.apply("{"pktype":"motetx","eui":"abcdefg","payloadlen":100}");
Assert.assertEquals(output, "abcdefg=1,100");
}
}

  1. Set schema of input and output topic
    Because function-related input and output topic schema is Integer by default, and the schema defined by function can not be installed automatically, It is necessary to set up input and output topic schema first.
    bin/pulsar-admin schemas upload --filename /home/Pulsar/apache-pulsar-2.4.0/examples/strschema.json test_src1
    bin/pulsar-admin schemas upload --filename /home/Pulsar/apache-pulsar-2.4.0/examples/strschema.json test_result1
    cat examples/strschema.json

{"type": "STRING","schema": "","properties": {}}

  1. Deploy the function
    bin/pulsar-admin functions create --function-config-file examples/java-window-function-config.yaml
    cat examples/java-window-function-config.yaml

jar: "/home/Pulsar/apache-pulsar-2.4.0-src/pulsar-functions/java-examples/target/pulsar-functions-api-examples.jar"
tenant: "public"
namespace: "default"
name: "AddWindowFunction"
className: "org.apache.pulsar.functions.api.examples.AddWindowFunction"
inputs: ["test_src1"]
userConfig:
"PublishTopic": "test_result1"
output: "test_result1"
autoAck: true
parallelism: 1

  1. Produce JSON message
    bin/pulsar-client produce test_src1 --files /home/Pulsar/apache-pulsar-2.4.0/examples/test.json
    cat examples/test.json

{"pktype":"motetx","eui":"abcdefg","payloadlen":100}

  1. See error
    Expected behavior
    The Pulsar function, such as AddWindowFunction, can parse JSON message correctly.

Screenshots
While executing "bin/pulsar-client produce test_src1 --files /home/Pulsar/apache-pulsar-2.4.0/examples/test.json", the Pulsar function is bound to die with following log:

14:55:08.187 [pulsar-web-57-5] INFO org.eclipse.jetty.server.RequestLog - 192.168.0.128 - - [06/Sep/2019:14:55:08 +0800] "GET /admin/v2/persistent/public/functions/coordinate/stats HTTP/1.1" 200 804 "-" "Pulsar-Java-v2.4.0" 38
14:55:10.467 [pulsar-io-50-6] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /127.0.0.1:56952
14:55:10.990 [pulsar-io-50-7] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /127.0.0.1:56958
14:55:11.009 [pulsar-io-50-7] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:56958][persistent://public/default/test_src1] Creating producer. producerId=0
14:55:11.034 [ForkJoinPool.commonPool-worker-0] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:56958]-0 persistent://public/default/test_src1 configured with schema true
14:55:11.040 [ForkJoinPool.commonPool-worker-0] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:56958] Created new producer: Producer{topic=PersistentTopic{topic=persistent://public/default/test_src1}, client=/127.0.0.1:56958, producerName=standalone-0-3, producerId=0}
14:55:11.390 [pulsar-io-50-7] INFO org.apache.pulsar.broker.service.ServerCnx - [PersistentTopic{topic=persistent://public/default/test_src1}][standalone-0-3] Closing producer on cnx /127.0.0.1:56958
14:55:11.405 [pulsar-io-50-7] INFO org.apache.pulsar.broker.service.ServerCnx - [PersistentTopic{topic=persistent://public/default/test_src1}][standalone-0-3] Closed producer on cnx /127.0.0.1:56958
14:55:11.530 [pulsar-io-50-6] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /127.0.0.1:56952
14:55:11.530 [pulsar-io-50-7] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /127.0.0.1:56958
14:55:11.542 [pulsar-io-50-5] INFO org.apache.pulsar.broker.service.ServerCnx - [/192.168.0.128:45986] Closing consumer: 0
14:55:11.543 [pulsar-io-50-5] INFO org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - Removed consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/test_src1, name=public/default/AddWindowFunction}, consumerId=0, consumerName=66bd6, address=/192.168.0.128:45986} with pending 1 acks
14:55:11.546 [pulsar-io-50-5] INFO org.apache.pulsar.broker.service.ServerCnx - [/192.168.0.128:45986] Closed consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/test_src1, name=public/default/AddWindowFunction}, consumerId=0, consumerName=66bd6, address=/192.168.0.128:45986}
14:55:11.560 [pulsar-io-50-5] INFO org.apache.pulsar.broker.service.ServerCnx - [PersistentTopic{topic=persistent://public/default/test_result1}][standalone-0-2] Closing producer on cnx /192.168.0.128:45986
14:55:11.561 [pulsar-io-50-5] INFO org.apache.pulsar.broker.service.ServerCnx - [PersistentTopic{topic=persistent://public/default/test_result1}][standalone-0-2] Closed producer on cnx /192.168.0.128:45986
14:55:11.628 [pulsar-io-50-5] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /192.168.0.128:45986
14:55:26.946 [function-timer-thread-90-1] ERROR org.apache.pulsar.functions.runtime.ProcessRuntime - Health check failed for AddWindowFunction-0
java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_222]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_222]
at org.apache.pulsar.functions.runtime.ProcessRuntime.lambda$start$1(ProcessRuntime.java:164) ~[org.apache.pulsar-pulsar-functions-runtime-2.4.0.jar:2.4.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_222]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_222]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_222]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_222]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_222]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_222]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
at io.grpc.Status.asRuntimeException(Status.java:530) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:482) ~[io.grpc-grpc-stub-1.18.0.jar:1.18.0]
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:699) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:397) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:459) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:546) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:467) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:584) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_222]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_222]
... 1 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:33568
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_222]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[?:1.8.0_222]
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
... 2 more
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_222]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[?:1.8.0_222]
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
... 2 more
14:55:26.992 [function-timer-thread-90-1] ERROR org.apache.pulsar.functions.runtime.ProcessRuntime - Extracted Process death exception
java.lang.RuntimeException:
at org.apache.pulsar.functions.runtime.ProcessRuntime.tryExtractingDeathException(ProcessRuntime.java:380) ~[org.apache.pulsar-pulsar-functions-runtime-2.4.0.jar:2.4.0]
at org.apache.pulsar.functions.runtime.ProcessRuntime.isAlive(ProcessRuntime.java:367) ~[org.apache.pulsar-pulsar-functions-runtime-2.4.0.jar:2.4.0]
at org.apache.pulsar.functions.runtime.RuntimeSpawner.lambda$start$0(RuntimeSpawner.java:88) ~[org.apache.pulsar-pulsar-functions-runtime-2.4.0.jar:2.4.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_222]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_222]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_222]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_222]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_222]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_222]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
14:55:26.997 [function-timer-thread-90-1] ERROR org.apache.pulsar.functions.runtime.RuntimeSpawner - public/default/AddWindowFunction-java.lang.RuntimeException: Function Container is dead with exception.. restarting
14:55:26.999 [function-timer-thread-90-1] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - Creating function log directory /home/Pulsar/apache-pulsar-2.4.0/logs/functions/public/default/AddWindowFunction
14:55:27.005 [function-timer-thread-90-1] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - Created or found function log directory /home/Pulsar/apache-pulsar-2.4.0/logs/functions/public/default/AddWindowFunction
14:55:27.006 [function-timer-thread-90-1] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - ProcessBuilder starting the process with args java -cp /home/Pulsar/apache-pulsar-2.4.0/instances/java-instance.jar:/home/Pulsar/apache-pulsar-2.4.0/instances/deps/* -Dpulsar.functions.java.instance.jar=/home/Pulsar/apache-pulsar-2.4.0/instances/java-instance.jar -Dpulsar.functions.extra.dependencies.dir=/home/Pulsar/apache-pulsar-2.4.0/instances/deps -Dlog4j.configurationFile=java_instance_log4j2.yml -Dpulsar.function.log.dir=/home/Pulsar/apache-pulsar-2.4.0/logs/functions/public/default/AddWindowFunction -Dpulsar.function.log.file=AddWindowFunction-0 -Xmx1073741824 org.apache.pulsar.functions.runtime.JavaInstanceMain --jar /tmp/pulsar_functions/public/default/AddWindowFunction/0/pulsar-functions-api-examples.jar --instance_id 0 --function_id 68145a82-a243-4b55-8b98-76eda32e60e7 --function_version e585d105-ac58-49bb-9765-4d47aaa03d32 --function_details '{"tenant":"public","namespace":"default","name":"AddWindowFunction","className":"org.apache.pulsar.functions.api.examples.AddWindowFunction","userConfig":"{"PublishTopic":"test_result1"}","autoAck":true,"parallelism":1,"source":{"typeClassName":"java.util.Collection","inputSpecs":{"test_src1":{}},"cleanupSubscription":true},"sink":{"topic":"test_result1","typeClassName":"java.lang.Integer"},"resources":{"cpu":1.0,"ram":"1073741824","disk":"10737418240"},"componentType":"FUNCTION"}' --pulsar_serviceurl pulsar://msp2:6650 --max_buffered_tuples 1024 --port 33568 --metrics_port 42630 --state_storage_serviceurl bk://127.0.0.1:4181 --expected_healthcheck_interval 30 --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider --cluster_name standalone
14:55:27.021 [function-timer-thread-90-1] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
14:55:35.048 [pulsar-io-50-8] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /192.168.0.128:46350
14:55:35.708 [pulsar-io-50-8] INFO org.apache.pulsar.broker.service.ServerCnx - [/192.168.0.128:46350][persistent://public/default/test_result1] Creating producer. producerId=0
14:55:35.751 [ForkJoinPool.commonPool-worker-2] INFO org.apache.pulsar.broker.service.ServerCnx - [/192.168.0.128:46350] Created new producer: Producer{topic=PersistentTopic{topic=persistent://public/default/test_result1}, client=/192.168.0.128:46350, producerName=standalone-0-4, producerId=0}
14:55:36.101 [pulsar-io-50-8] INFO org.apache.pulsar.broker.service.ServerCnx - [/192.168.0.128:46350] Subscribing on topic persistent://public/default/test_src1 / public/default/AddWindowFunction
14:55:36.151 [ForkJoinPool.commonPool-worker-1] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/test_src1-public%2Fdefault%2FAddWindowFunction] Rewind from 16:1 to 16:0
14:55:36.154 [ForkJoinPool.commonPool-worker-1] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/test_src1] There are no replicated subscriptions on the topic
14:55:36.155 [ForkJoinPool.commonPool-worker-1] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/test_src1][public/default/AddWindowFunction] Created new subscription for 0
14:55:36.155 [ForkJoinPool.commonPool-worker-1] INFO org.apache.pulsar.broker.service.ServerCnx - [/192.168.0.128:46350] Created subscription on topic persistent://public/default/test_src1 / public/default/AddWindowFunction
14:55:36.583 [pulsar-io-50-8] INFO org.apache.pulsar.broker.service.ServerCnx - [/192.168.0.128:46350] Closing consumer: 0
14:55:36.584 [pulsar-io-50-8] INFO org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - Removed consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/test_src1, name=public/default/AddWindowFunction}, consumerId=0, consumerName=3fb47, address=/192.168.0.128:46350} with pending 1 acks
14:55:36.590 [pulsar-io-50-8] INFO org.apache.pulsar.broker.service.ServerCnx - [/192.168.0.128:46350] Closed consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/test_src1, name=public/default/AddWindowFunction}, consumerId=0, consumerName=3fb47, address=/192.168.0.128:46350}
14:55:36.603 [pulsar-io-50-8] INFO org.apache.pulsar.broker.service.ServerCnx - [PersistentTopic{topic=persistent://public/default/test_result1}][standalone-0-4] Closing producer on cnx /192.168.0.128:46350
14:55:36.603 [pulsar-io-50-8] INFO org.apache.pulsar.broker.service.ServerCnx - [PersistentTopic{topic=persistent://public/default/test_result1}][standalone-0-4] Closed producer on cnx /192.168.0.128:46350
14:55:36.670 [pulsar-io-50-8] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /192.168.0.128:46350
14:55:38.183 [pulsar-web-57-8] INFO org.eclipse.jetty.server.RequestLog - 192.168.0.128 - - [06/Sep/2019:14:55:38 +0800] "GET /admin/v2/persistent/public/functions/coordinate/stats HTTP/1.1" 200 804 "-" "Pulsar-Java-v2.4.0" 24

Additional context

  1. This problem occurs in either Window functon or None-Window function.
  2. This problem occurs in either Context functon or None-Context function.
  3. While deploying the function, its input and output topic schemas are all Integer, but not String in the Pulsar log:
    14:53:56.324 [pulsar-external-listener-77-1] INFO org.apache.pulsar.functions.worker.FunctionAssignmentTailer - Received assignment update: instance {
    functionMetaData {
    functionDetails {
    tenant: "public"
    namespace: "default"
    name: "AddWindowFunction"
    className: "org.apache.pulsar.functions.api.examples.AddWindowFunction"
    userConfig: "{"PublishTopic":"test_result1"}"
    autoAck: true
    parallelism: 1
    source {
    typeClassName: "java.util.Collection"
    inputSpecs {
    key: "test_src1"
    value {
    }
    }
    cleanupSubscription: true
    }
    sink {
    topic: "test_result1"
    typeClassName: "java.lang.Integer"
    }
    resources {
    cpu: 1.0
    ram: 1073741824
    disk: 10737418240
    }
    componentType: FUNCTION
    }
    packageLocation {
    packagePath: "public/default/AddWindowFunction/1a83d3a0-a550-484b-b1e4-6ea1436df8f2-pulsar-functions-api-examples.jar"
    originalFileName: "pulsar-functions-api-examples.jar"
    }
    createTime: 1567752835453
    }
    }
    workerId: "c-standalone-fw-msp2-8080"
@borlandor borlandor added the type/bug The PR fixed a bug or issue reported a bug label Sep 6, 2019
@borlandor borlandor changed the title Java Function cannot work with Java Function cannot work with any JSON parser JAR Sep 6, 2019
@borlandor
Copy link
Author

Trying to use Localrun mode, the function works well, but always works abnormally in Create mode.

@sijie
Copy link
Member

sijie commented Sep 18, 2019

the original AddWindowFunction example is Function <Collection<Integer>, Integer>. If you run this function before, the input topic and output topic will be Schema.Integer. You can't change the schema type.

So I would suggest double-check if you are submitting the right function to Pulsar.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/function type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

2 participants