Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the unwanted dependencies in the pulsar function's instance jar and make SchemaInfo an interface #10878

Merged

Conversation

jerrypeng
Copy link
Contributor

@jerrypeng jerrypeng commented Jun 10, 2021

Motivation

The java-instance.jar generated by the pulsar-functions-runtime-all module should only contain interfaces that Pulsar Function's framework uses to interact with user code. The module should on have the following dependencies
1. pulsar-io-core
2. pulsar-functions-api
3. pulsar-client-api
4. slf4j-api
5. log4j-slf4j-impl
6. log4j-api
7. log4j-core

Explain here the context, and why you're making that change. What is the problem you're trying to solve.

Modifications

Change dep pulsar-client-original to pulsar-client-api

Slight changes in the top level pom for what is included in all sub-modules so that additional deps don't land into java-instance.jar

There is also a fix for an issue introduced by #9673. The thread context class loader was set incorrectly in ThreadRuntime.

Future improvements

  1. We should also add a test in the future to make sure external libraries don't get add accidentally this module and java-instance.jar

  2. Rename the module "pulsar-functions-runtime-all" to something that describes its function better. The current name can be confusing

@jerrypeng jerrypeng requested a review from merlimat June 10, 2021 00:32
@jerrypeng jerrypeng self-assigned this Jun 10, 2021
@jerrypeng jerrypeng added type/bug The PR fixed a bug or issue reported a bug area/function labels Jun 10, 2021
@jerrypeng jerrypeng added this to the 2.8.0 milestone Jun 10, 2021
@jiazhai
Copy link
Member

jiazhai commented Jun 10, 2021

@jerrypeng FYI. the CI of license check failed as following output:

Run src/check-binary-license ./distribution/server/target/apache-pulsar-*-bin.tar.gz
  src/check-binary-license ./distribution/server/target/apache-pulsar-*-bin.tar.gz
  shell: /usr/bin/bash -e {0}
  env:
    MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
    JAVA_HOME: /opt/hostedtoolcache/Java_Adopt_jdk/11.0.11-9/x64
zookeeper-prometheus-metrics-3.6.3.jar mentioned in lib/presto/LICENSE, but not bundled

It looks like there are issues with the LICENSE/NOTICE.
Error: Process completed with exit code 2.

@aahmed-se aahmed-se self-requested a review June 10, 2021 05:11
Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a braking change for the new feature.
Please do not change this behaviour now that we are close to 2.8.0 release.

I also believe that the NAR files should not contain the Pulsar client classes, because this would be to unpredictable behaviour.

Like in a Java EE container, the Container is the owner of the implementation of the JavaEE runtime, and the Web Applications cannot override system libraries

pulsar-functions/runtime-all/pom.xml Show resolved Hide resolved
log.info("value schema type {}", kvSchema.getValueSchema());
log.info("key encoding {}", kvSchema.getKeyValueEncodingType());
// TODO need to expose KeyValueSchema was an interface in pulsar-client-api
// KeyValueSchema kvSchema = (KeyValueSchema) record.getSchema();
Copy link
Contributor

@eolivelli eolivelli Jun 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change for the new feature.

Please do not change this behaviour now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a breaking change because KeyValueSchema is not part of public API. Only symbols that are in pulsar-client-api are. I think the test here should use Schema<KeyValue> instead.

@merlimat
Copy link
Contributor

merlimat commented Jun 10, 2021

I also believe that the NAR files should not contain the Pulsar client classes, because this would be to unpredictable behaviour.

The NAR does not contain the client class. The function runtime does. The "instance" jar is not supposed to have the client implementation because it resides in the same class path of the function.

Apart from the naming pulsar-functions/runtime-all is used to generate java-instance.jar

@eolivelli
Copy link
Contributor

@merlimat the fact that the Integration test was broken is the real problem to me.
It means that inside a Sink you cannot use KeyValueSchema class API, and I bet you won't be able to use other classes.

Do you suggest to bundle the pulsar-client with all of the dependencies in the NAR ?

@merlimat
Copy link
Contributor

@merlimat the fact that the Integration test was broken is the real problem to me.
It means that inside a Sink you cannot use KeyValueSchema class API, and I bet you won't be able to use other classes.

In the Sink you won't have access to the pulsar-client implementation, only the API. The implementation is loaded in a different class loader.

Do you suggest to bundle the pulsar-client with all of the dependencies in the NAR ?

I'm not sure where I would have suggested that.. :) The pulsar-client is included in the worker-runtime. The NAR (or function jar will not have it (or they can add it on their own if they want, and it wouldn't conflict).

The problem here is that the pulsar-client was being included in the java-instance.jar which is what gets added to the function/source/sink class loader.

@merlimat
Copy link
Contributor

It means that inside a Sink you cannot use KeyValueSchema class API,

To reiterate: that class is not part of the API. Schema<KeyValue> is and it's accessible.

@eolivelli
Copy link
Contributor

@merlimat the KeyValueSchema class is an important case.
if we go this way I would like to add a new public interface that represents KeyValueSchema and is it usable by the Sink.

something like
org.apache.pulsar.client.api.KeyValueSchema (next to Schema)

It was something that I wanted to bring up before, but I did not chime in about the lack of such API
in 2.8.0 we did much work in order to support KeyValue in Pulsar Sink and we should not break this now.

I can volunteer to add such class and retrofit the integration tests

@merlimat
Copy link
Contributor

@merlimat the KeyValueSchema class is an important case.
if we go this way I would like to add a new public interface that represents KeyValueSchema and is it usable by the Sink.

That's fine (although I don't understand exactly why Schema<KeyValue> doesn't work there).

Having said that, I think it's a completely different discussion from this PR.

The expectation for functions and connectors is to have access to pulsar-client-api and not (directly) to pulsar-client. That has always been the case, even if through different implementation mechanism (first shading and now class loaders).

@eolivelli
Copy link
Contributor

That's fine (although I don't understand exactly why Schema doesn't work there).

The problem is that you do not have access to "getKeySchema" and "getValueSchema" and so in a Sink you cannot know the Schema for Key and for the Value

@eolivelli
Copy link
Contributor

@jerrypeng
I discussed offline with @merlimat in order to find a good solution to the KeyValueSchema problem.

this is the patch
#10888

Once we make KeyValueSchema a public API interface I am totally +1 with this change

@codelipenghui
Copy link
Contributor

Is it better to add such methods in the Schema interface directly:

boolean isKeyValue();

Schema getKeySchema();

Schema getValueSchema();

So that we don't need to convert to KeyValueSchema interface and then to get the key schema and the value schema

@eolivelli
Copy link
Contributor

@codelipenghui we can follow up on #10888

You can already use getSchemaType() in order to check if this is a KeyValueSchema

Adding those methods to the Schema API may not be a good idea for the API (thinking to the future), because we would add some new methods to the top level API but those methods will be useful only to the KeyValueSchema users.

Also it may encourage developers to think that we can nest KeyValueSchema, but the KeyValueSchema is not strictly a simple KeyValue Pair it is something that is strictly related to message encoding.

@codelipenghui
Copy link
Contributor

@eolivelli makes sense to me.

@eolivelli
Copy link
Contributor

@jerrypeng we merged #10888
can you please rebase this patch ?

thank you for taking care of this topic!
if we do not change it now before the release we will have to carry on with this setup for the whole lifecycle of 2.8.x

@merlimat
Copy link
Contributor

@jerrypeng why do not we have the "functions classloader" be an hierarchical classloader with:
system classes (pulsar api + pulsar impl) (rootclassloader, parent)

@eolivelli that's because the pulsar-impl willl be using its own dependencies, which can conflict with the function code

@eolivelli
Copy link
Contributor

@merlimat that was I am afraid of.

@eolivelli
Copy link
Contributor

if this stuff worked on 2.7.2....and now it does not work anymore..

what did we change in 2.8 release line ?

@eolivelli
Copy link
Contributor

@jerrypeng @merlimat here you can find a good way to see the logs of the failing integration tests
34e8680#diff-0c206b3576c4e4a0b64e63ba83801e54c296cbfedf23c30bf729b556e73af0e8R99
it dumps the logs of the function at the end of the execution
it is very useful and you can see easily what happened

@codelipenghui
Copy link
Contributor

codelipenghui commented Jun 12, 2021

@jerrypeng @merlimat @eolivelli
Logs of the integration tests:

org.apache.pulsar.tests.integration.docker.ContainerExecException: sh -c /pulsar/bin/pulsar-admin functions create --tenant public --namespace default --name test-autoschema-fn-etqamekt --className org.apache.pulsar.functions.api.examples.AutoSchemaFunction --inputs test-autoschema-input-qjtrdrfk --output test-autoshcema-output-kyprhjjh --jar /pulsar/examples/java-test-functions.jar --ram 134217728 failed on 7251d16697c771ce25b0bd35d906ca163c2628aa69d96ca1218ae44f8ecaa65f with error code 1
	at org.apache.pulsar.tests.integration.utils.DockerUtils$2.onComplete(DockerUtils.java:248)
	at org.testcontainers.shaded.com.github.dockerjava.core.exec.AbstrAsyncDockerCmdExec$1.onComplete(AbstrAsyncDockerCmdExec.java:51)
	at org.testcontainers.shaded.com.github.dockerjava.core.DefaultInvocationBuilder.lambda$executeAndStream$1(DefaultInvocationBuilder.java:276)
	at java.base/java.lang.Thread.run(Thread.java:829)

Logs of the Function worker

01:17:52.552 [function-web-24-2] ERROR org.apache.pulsar.functions.worker.rest.api.FunctionsImpl - Invalid register Function request @ /public/default/test-autoschema-fn-jpsqkekv
java.lang.IllegalArgumentException: Function class org.apache.pulsar.functions.api.examples.AutoSchemaFunction must be in class path
	at org.apache.pulsar.functions.utils.FunctionConfigUtils.doJavaChecks(FunctionConfigUtils.java:517) ~[org.apache.pulsar-pulsar-functions-utils-2.9.0-SNAPSHOT.jar:2.9.0-SNAPSHOT]
	at org.apache.pulsar.functions.utils.FunctionConfigUtils.validate(FunctionConfigUtils.java:819) ~[org.apache.pulsar-pulsar-functions-utils-2.9.0-SNAPSHOT.jar:2.9.0-SNAPSHOT]
	at org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.validateUpdateRequestParams(FunctionsImpl.java:756) ~[org.apache.pulsar-pulsar-functions-worker-2.9.0-SNAPSHOT.jar:2.9.0-SNAPSHOT]
	at org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.registerFunction(FunctionsImpl.java:176) ~[org.apache.pulsar-pulsar-functions-worker-2.9.0-SNAPSHOT.jar:2.9.0-SNAPSHOT]
	at org.apache.pulsar.functions.worker.rest.api.v3.FunctionsApiV3Resource.registerFunction(FunctionsApiV3Resource.java:75) ~[org.apache.pulsar-pulsar-functions-worker-2.9.0-SNAPSHOT.jar:2.9.0-SNAPSHOT]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
	at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:475) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:397) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:255) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:292) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:274) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
	at org.glassfish.jersey.internal.Errors.process(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
	at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]
	at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:234) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]
	at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
	at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]
	at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) ~[org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:550) ~[org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1435) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501) ~[org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1350) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:179) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.server.Server.handle(Server.java:516) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:388) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:633) [org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:380) [org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) [org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) [org.eclipse.jetty-jetty-io-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105) [org.eclipse.jetty-jetty-io-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) [org.eclipse.jetty-jetty-io-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) [org.eclipse.jetty-jetty-util-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) [org.eclipse.jetty-jetty-util-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) [org.eclipse.jetty-jetty-util-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) [org.eclipse.jetty-jetty-util-9.4.42.v20210604.jar:9.4.42.v20210604]
	at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:383) [org.eclipse.jetty-jetty-util-9.4.42.v20210604.jar:9.4.42.v20210604]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.ClassNotFoundException: org.apache.pulsar.functions.api.examples.AutoSchemaFunction
	at java.net.URLClassLoader.findClass(URLClassLoader.java:471) ~[?:?]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
	at org.apache.pulsar.functions.utils.FunctionConfigUtils.doJavaChecks(FunctionConfigUtils.java:506) ~[org.apache.pulsar-pulsar-functions-utils-2.9.0-SNAPSHOT.jar:2.9.0-SNAPSHOT]
	... 60 more
01:17:52.912 [function-web-24-2] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [12/Jun/2021:01:17:51 +0000] "POST /admin/v3/functions/public/default/test-autoschema-fn-jpsqkekv HTTP/1.1" 400 109 "-" "Pulsar-Java-v2.9.0-SNAPSHOT" 950

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if I understand the intent of this work and I am supportive of it, I would like to wear a different hat for a moment, thinking about the imminent 2.8.0 release.

This patch is becoming bigger and bigger.

I am not sure that is it safe to modify so many things now that we are doing the release.

I am sure that many people (myself for sure) have been testing their Functions and Pulsar IO connectors for weeks or months against current 2.8.0 codebase.
If we change so many things now it will be probably painful and we can destabilise the system.

I know that if we keep the current status we cannot change it in some 2.8.1 release, but this clean up will be deferred to 2.9.0.

@jerrypeng @sijie @rdhabalia @merlimat @codelipenghui

@codelipenghui
Copy link
Contributor

05:08:22.960 [public/default/test-merge-fn-tagvjrnr-0] INFO  org.apache.pulsar.functions.api.examples.MergeTopicFunction - process message with reader schema KeyValueSchema(SEPARATED,org.apache.pulsar.client.impl.schema.IntSchema@27b051fa,VersionedSchema(type=JSON,schemaVersion=\x00\x00\x00\x00\x00\x00\x00\x00,name=))
05:08:22.961 [public/default/test-merge-fn-tagvjrnr-0] WARN  org.apache.pulsar.functions.instance.JavaInstanceRunnable - Encountered exception when processing message PulsarRecord(topicName=Optional[persistent://public/ns-merge-zerxgrlo/merge-schema-k-int-v-json-userv1-separate], partition=0, message=Optional[org.apache.pulsar.client.impl.MessageImpl@57d404f5], schema=AUTO_CONSUME({schemaVersion=,schemaType=BYTES}{schemaVersion=\x00\x00\x00\x00\x00\x00\x00\x00,schemaType=KEY_VALUE}{schemaVersion=org.apache.pulsar.common.protocol.schema.LatestVersion@2c4cacd3,schemaType=KEY_VALUE}), failFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$226/0x0000000840447440@8ee4207, ackFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$225/0x0000000840447040@2d74ed61)
org.apache.pulsar.client.api.SchemaSerializationException: This method cannot be used under this SEPARATED encoding type
	at org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl.decode(KeyValueSchemaImpl.java:154) ~[org.apache.pulsar-pulsar-client-original-2.9.0-SNAPSHOT.jar:2.9.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl.decode(KeyValueSchemaImpl.java:148) ~[org.apache.pulsar-pulsar-client-original-2.9.0-SNAPSHOT.jar:2.9.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl.decode(KeyValueSchemaImpl.java:40) ~[org.apache.pulsar-pulsar-client-original-2.9.0-SNAPSHOT.jar:2.9.0-SNAPSHOT]
	at org.apache.pulsar.client.api.Schema.validate(Schema.java:64) ~[java-instance.jar:?]
	at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:81) ~[org.apache.pulsar-pulsar-client-original-2.9.0-SNAPSHOT.jar:2.9.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:33) ~[org.apache.pulsar-pulsar-client-original-2.9.0-SNAPSHOT.jar:2.9.0-SNAPSHOT]
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:175) ~[org.apache.pulsar-pulsar-client-original-2.9.0-SNAPSHOT.jar:2.9.0-SNAPSHOT]
	at org.apache.pulsar.functions.instance.ContextImpl$MessageBuilderImpl.value(ContextImpl.java:612) ~[org.apache.pulsar-pulsar-functions-instance-2.9.0-SNAPSHOT.jar:2.9.0-SNAPSHOT]
	at org.apache.pulsar.functions.api.examples.MergeTopicFunction.process(MergeTopicFunction.java:49) ~[?:?]
	at org.apache.pulsar.functions.api.examples.MergeTopicFunction.process(MergeTopicFunction.java:32) ~[?:?]
	at org.apache.pulsar.functions.instance.JavaInstance.handleMessage(JavaInstance.java:95) ~[org.apache.pulsar-pulsar-functions-instance-2.9.0-SNAPSHOT.jar:2.9.0-SNAPSHOT]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:271) [org.apache.pulsar-pulsar-functions-instance-2.9.0-SNAPSHOT.jar:2.9.0-SNAPSHOT]
	at java.lang.Thread.run(Thread.java:829) [?:?]

@codelipenghui
Copy link
Contributor

@eolivelli I think we are on the right way to fix the function regression. with my last commit, I think the tests should be passed.

@codelipenghui codelipenghui merged commit d81b5f8 into apache:master Jun 12, 2021
codelipenghui pushed a commit that referenced this pull request Jun 12, 2021
…r and make SchemaInfo an interface (#10878)

### Motivation

The java-instance.jar generated by the pulsar-functions-runtime-all module should only contain interfaces that Pulsar Function's framework uses to interact with user code.  The module should on have the following dependencies
    1. pulsar-io-core
    2. pulsar-functions-api
    3. pulsar-client-api
    4. slf4j-api
    5. log4j-slf4j-impl
    6. log4j-api
    7. log4j-core

*Explain here the context, and why you're making that change. What is the problem you're trying to solve.*

### Modifications

Change dep pulsar-client-original to pulsar-client-api

Slight changes in the top level pom for what is included in all sub-modules so that additional deps don't land into java-instance.jar

There is also a fix for an issue introduced by #9673. The thread context class loader was set incorrectly in ThreadRuntime.

### Future improvements

1. We should also add a test in the future to make sure external libraries don't get add accidentally this module and java-instance.jar

2. Rename the module "pulsar-functions-runtime-all" to something that describes its function better.  The current name can be confusing


(cherry picked from commit d81b5f8)
@eolivelli
Copy link
Contributor

Sorry for late reply.

Lgtm

@jerrypeng
Copy link
Contributor Author

Nice finally got this merged! Good work everyone!

@lhotari
Copy link
Member

lhotari commented Jun 14, 2021

There's one more follow up in #10918 . The dependencies in the module used for integration tests had invalid dependencies. It was changed by this PR, but there was some need to revisit that. Please check if it's needed for branch-2.8 .

eolivelli pushed a commit to datastax/pulsar that referenced this pull request Jun 14, 2021
…r and make SchemaInfo an interface (apache#10878)

### Motivation

The java-instance.jar generated by the pulsar-functions-runtime-all module should only contain interfaces that Pulsar Function's framework uses to interact with user code.  The module should on have the following dependencies
    1. pulsar-io-core
    2. pulsar-functions-api
    3. pulsar-client-api
    4. slf4j-api
    5. log4j-slf4j-impl
    6. log4j-api
    7. log4j-core

*Explain here the context, and why you're making that change. What is the problem you're trying to solve.*

### Modifications

Change dep pulsar-client-original to pulsar-client-api

Slight changes in the top level pom for what is included in all sub-modules so that additional deps don't land into java-instance.jar

There is also a fix for an issue introduced by apache#9673. The thread context class loader was set incorrectly in ThreadRuntime.

### Future improvements

1. We should also add a test in the future to make sure external libraries don't get add accidentally this module and java-instance.jar

2. Rename the module "pulsar-functions-runtime-all" to something that describes its function better.  The current name can be confusing

(cherry picked from commit d81b5f8)
(cherry picked from commit 89ac98e)
lhotari added a commit to lhotari/pulsar-contributor-toolbox that referenced this pull request Jun 14, 2021
yangl pushed a commit to yangl/pulsar that referenced this pull request Jun 23, 2021
…r and make SchemaInfo an interface (apache#10878)

### Motivation

The java-instance.jar generated by the pulsar-functions-runtime-all module should only contain interfaces that Pulsar Function's framework uses to interact with user code.  The module should on have the following dependencies
    1. pulsar-io-core
    2. pulsar-functions-api
    3. pulsar-client-api
    4. slf4j-api
    5. log4j-slf4j-impl
    6. log4j-api
    7. log4j-core

*Explain here the context, and why you're making that change. What is the problem you're trying to solve.*

### Modifications

Change dep pulsar-client-original to pulsar-client-api

Slight changes in the top level pom for what is included in all sub-modules so that additional deps don't land into java-instance.jar

There is also a fix for an issue introduced by apache#9673. The thread context class loader was set incorrectly in ThreadRuntime.

### Future improvements

1. We should also add a test in the future to make sure external libraries don't get add accidentally this module and java-instance.jar

2. Rename the module "pulsar-functions-runtime-all" to something that describes its function better.  The current name can be confusing
eolivelli added a commit to eolivelli/pulsar that referenced this pull request Jul 22, 2021
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
…r and make SchemaInfo an interface (apache#10878)

### Motivation

The java-instance.jar generated by the pulsar-functions-runtime-all module should only contain interfaces that Pulsar Function's framework uses to interact with user code.  The module should on have the following dependencies
    1. pulsar-io-core
    2. pulsar-functions-api
    3. pulsar-client-api
    4. slf4j-api
    5. log4j-slf4j-impl
    6. log4j-api
    7. log4j-core

*Explain here the context, and why you're making that change. What is the problem you're trying to solve.*

### Modifications

Change dep pulsar-client-original to pulsar-client-api

Slight changes in the top level pom for what is included in all sub-modules so that additional deps don't land into java-instance.jar

There is also a fix for an issue introduced by apache#9673. The thread context class loader was set incorrectly in ThreadRuntime.

### Future improvements

1. We should also add a test in the future to make sure external libraries don't get add accidentally this module and java-instance.jar

2. Rename the module "pulsar-functions-runtime-all" to something that describes its function better.  The current name can be confusing
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/function type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

9 participants