Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pulsar Functions and IO] Cannot upgrade Function built for Pulsar 2.7 to Pulsar 2.8 #11338

Closed
eolivelli opened this issue Jul 16, 2021 · 3 comments · Fixed by #11392
Closed
Labels
area/connector area/function type/bug The PR fixed a bug or issue reported a bug

Comments

@eolivelli
Copy link
Contributor

eolivelli commented Jul 16, 2021

Describe the bug
Pulsar Functions and Pulsar IO connectors that use the Pulsar Client API, in particular the "Schema" API (like Schema.JSON(Pojo.class)) do not work anymore if you upgrade your Pulsar cluster to Pulsar 2.8.x.

This is because of an IncompatibleClassChangeError around the class SchemaInfo that was a "class" in Pulsar 2.7 and now it is a pure interface in Pulsar 2.8 (see commit 89ac98e).

The error is:

11:09:19.623 [public/default/fun1-0] WARN  org.apache.pulsar.functions.instance.JavaInstanceRunnable - Encountered exception when processing message PulsarRecord(topicName=Optional[persistent://public/default/test], partition=0, message=Optional[org.apache.pulsar.client.impl.MessageImpl@718e1e37], schema=org.apache.pulsar.client.impl.schema.StringSchema@7127ce9a, failFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$213/0x000000080056f040@6e18bd1d, ackFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$212/0x000000080056fc40@71fc78c9)
java.lang.RuntimeException: java.lang.IncompatibleClassChangeError: Method 'org.apache.pulsar.common.schema.SchemaInfo$SchemaInfoBuilder org.apache.pulsar.common.schema.SchemaInfo.builder()' must be InterfaceMethodref constant
	at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:42) ~[java-instance.jar:?]
	at org.apache.pulsar.client.internal.DefaultImplementation.newJSONSchema(DefaultImplementation.java:274) ~[java-instance.jar:?]
	at org.apache.pulsar.client.api.Schema.JSON(Schema.java:335) ~[java-instance.jar:?]
	at myfun.Fun1.process(Fun1.java:28) ~[?:?]
	at myfun.Fun1.process(Fun1.java:14) ~[?:?]
	at org.apache.pulsar.functions.instance.JavaInstance.handleMessage(JavaInstance.java:95) ~[pulsar-functions-instance.jar:2.8.0]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:271) [pulsar-functions-instance.jar:2.8.0]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.IncompatibleClassChangeError: Method 'org.apache.pulsar.common.schema.SchemaInfo$SchemaInfoBuilder org.apache.pulsar.common.schema.SchemaInfo.builder()' must be InterfaceMethodref constant
	at org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseSchemaInfo(SchemaUtil.java:50) ~[pulsar-client-original.jar:2.8.0]
	at org.apache.pulsar.client.impl.schema.JSONSchema.of(JSONSchema.java:93) ~[pulsar-client-original.jar:2.8.0]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
	at org.apache.pulsar.client.internal.DefaultImplementation.lambda$newJSONSchema$31(DefaultImplementation.java:277) ~[java-instance.jar:?]
	at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:34) ~[java-instance.jar:?]
	... 7 more

To Reproduce
In order to reproduce the error follow these steps:

  • start a simple Pulsar 2.7.1 instance (pulsar standalone is enough)
  • write a simple Function that contains only "Schema.JSON(MyPojo.class)" (see below) and is it compiled using Pulsar 2.7.1 API
  • deploy the function with pulsar-admin
  • produce one message with pulsar-client produce
  • see the logs of the function (in logs/function/public/default/functionname/*.log)
  • update the cluster to Pulsar 2.8.0 (stop pulsar standalone, upgrade to 2.8.0 and start it again, just keep the 'data' directory contents)
  • produce another message
  • see the error in the logs

The workaround is to rebuild the Function against the 2.8.0 API (and switch from pulsar-client to pulsar-client-original! ) and to redeploy it.

Function body (taken from Pulsar Doc, with the addition of Schema.JSON):

@Slf4j
public class Fun1 implements Function<String, Void>
    {
        public Void process(String input, Context context) {
            Logger LOG = context.getLogger();
            String inputTopics = context.getInputTopics().stream().collect(Collectors.joining(", "));
            String functionName = context.getFunctionName();

            String logMessage = String.format("A message with a value of \"%s\" has arrived on one of the following topics: %s\n",
                    input,
                    inputTopics);

            LOG.info(logMessage);

            Schema<MyPojo> json = Schema.JSON(MyPojo.class);
            LOG.info("schema {}", json);

            return null;
        }

    @Data
    public static final class MyPojo {
        String id;
        BigDecimal data;
    }

}

pom.xml relevant deps:


  <dependencies>
    <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-client</artifactId>
      <version>2.7.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-functions-api</artifactId>
      <version>2.7.1</version>
      <scope>provided</scope>
    </dependency>

Please note that in order to see the Function working you have to add the dependency to "pulsar-client", and "pulsar-client-api" is not enough (otherwise you will see a ClassNotFoundException: org.apache.pulsar.client.impl.schema.JSONSchema error.

Expected behaviour
Upgrading Pulsar must be seamless, and the functions must continue working without changes.

Additional considerations

This is a bad problem for people who try to upgrade Pulsar from 2.7 to 2.8 because you may have many functions and you will need to rebuild them from source.
Therefore it is not possible to make the upgrade without interrupting the service, because you have to upgrade Pulsar and then upgrade the Function. It is not possible to build the function against 2.8.0 and then see it working on Pulsar 2.7

@sijie
Copy link
Member

sijie commented Jul 16, 2021

@eolivelli will take a look.

@nlu90 @freeznet Can you also check this?

@nlu90
Copy link
Member

nlu90 commented Jul 19, 2021

I did some check.
If function compiled with 2.7.1 is submitted to a Pulsar 2.8.0 cluster directly, the following error is reported:

14:50:51.313 [public/default/Fun1-0] INFO  function-Fun1 - A message with a value of "5. hello" has arrived on one of the following topics: public/default/fun-test

14:50:51.313 [public/default/Fun1-0] WARN  org.apache.pulsar.functions.instance.JavaInstanceRunnable - Encountered exception when processing message                       +++PulsarRecord(topicName=Optional[persistent://public/default/fun-test], partition=0, message=Optional[org.apache.pulsar.client.impl.MessageImpl@634a263c], schema=org.   +++apache.pulsar.client.impl.schema.StringSchema@6961b9d7, failFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$134/129491009@21b63931, ackFunction=org.   +++apache.pulsar.functions.source.PulsarSource$$Lambda$133/1005835213@3d56e9eb)
java.lang.RuntimeException: java.lang.NoSuchMethodError: org.apache.pulsar.common.schema.SchemaInfo.builder()Lorg/apache/pulsar/common/schema/SchemaInfo$SchemaInfoBuilder;    at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:42) ~[java-instance.jar:?]
    at org.apache.pulsar.client.internal.DefaultImplementation.newJSONSchema(DefaultImplementation.java:274) ~[java-instance.jar:?]
    at org.apache.pulsar.client.api.Schema.JSON(Schema.java:335) ~[java-instance.jar:?]
    at com.mycompany.app.Fun1.process(Fun1.java:27) ~[?:?]
    at com.mycompany.app.Fun1.process(Fun1.java:13) ~[?:?]
    at org.apache.pulsar.functions.instance.JavaInstance.handleMessage(JavaInstance.java:95) ~[org.apache.pulsar-pulsar-functions-instance-2.8.0.jar:?]
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:271) [org.apache.pulsar-pulsar-functions-instance-2.8.0.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_241]
Caused by: java.lang.NoSuchMethodError: org.apache.pulsar.common.schema.SchemaInfo.builder()Lorg/apache/pulsar/common/schema/SchemaInfo$SchemaInfoBuilder;
    at org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseSchemaInfo(SchemaUtil.java:50) ~[org.apache.pulsar-pulsar-client-original-2.8.0.jar:2.8.0]
    at org.apache.pulsar.client.impl.schema.JSONSchema.of(JSONSchema.java:93) ~[org.apache.pulsar-pulsar-client-original-2.8.0.jar:2.8.0]
    at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) ~[?:?]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_241]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_241]
    at org.apache.pulsar.client.internal.DefaultImplementation.lambda$newJSONSchema$31(DefaultImplementation.java:277) ~[java-instance.jar:?]
    at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:34) ~[java-instance.jar:?]
    ... 7 more

User need to compile their function code with pulsar 2.8.0 dependency in order to avoid this problem. And after recomplie, the problem is resolved:

15:08:48.284 [public/default/Fun1-0] INFO  function-Fun1 - schema org.apache.pulsar.client.impl.schema.JSONSchema@3d102034

And if a function compiled with 2.8.0 dependency is submitted to Pulsar Cluster 2.7.2, the following error reported:

15:11:55.423 [public/default/Fun1-0] WARN  org.apache.pulsar.functions.instance.JavaInstanceRunnable - Encountered exception when processing message                       +++PulsarRecord(topicName=Optional[persistent://public/default/fun-test], partition=0, message=Optional[org.apache.pulsar.client.impl.MessageImpl@2421d146], schema=org.   +++apache.pulsar.client.impl.schema.StringSchema@2376fd6a, failFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$140/1102914687@79223ac3, ackFunction=org.  +++apache.pulsar.functions.source.PulsarSource$$Lambda$139/1983986396@5ea1a88a)
java.lang.RuntimeException: java.lang.IncompatibleClassChangeError: Implementing class
    at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:46) ~[java-instance.jar:?]
    at org.apache.pulsar.client.internal.DefaultImplementation.newJSONSchema(DefaultImplementation.java:275) ~[java-instance.jar:?]
    at org.apache.pulsar.client.api.Schema.JSON(Schema.java:306) ~[java-instance.jar:?]
    at com.mycompany.app.Fun1.process(Fun1.java:27) ~[?:?]
    at com.mycompany.app.Fun1.process(Fun1.java:13) ~[?:?]
    at org.apache.pulsar.functions.instance.JavaInstance.handleMessage(JavaInstance.java:81) ~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:?]
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:260) [org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_241]
Caused by: java.lang.IncompatibleClassChangeError: Implementing class
    at java.lang.ClassLoader.defineClass1(Native Method) ~[?:1.8.0_241]
    at java.lang.ClassLoader.defineClass(ClassLoader.java:756) ~[?:1.8.0_241]
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) ~[?:1.8.0_241]
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) ~[?:1.8.0_241]
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74) ~[?:1.8.0_241]
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369) ~[?:1.8.0_241]
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363) ~[?:1.8.0_241]
    at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_241]
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362) ~[?:1.8.0_241]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_241]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_241]
    at java.lang.Class.getDeclaredMethods0(Native Method) ~[?:1.8.0_241]
    at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) ~[?:1.8.0_241]
    at java.lang.Class.privateGetMethodRecursive(Class.java:3048) ~[?:1.8.0_241]
    at java.lang.Class.getMethod0(Class.java:3018) ~[?:1.8.0_241]
    at java.lang.Class.getMethod(Class.java:1784) ~[?:1.8.0_241]
    at org.apache.pulsar.client.internal.ReflectionUtils.getStaticMethod(ReflectionUtils.java:79) ~[java-instance.jar:?]
    at org.apache.pulsar.client.internal.DefaultImplementation.lambda$newJSONSchema$31(DefaultImplementation.java:276) ~[java-instance.jar:?]
    at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:35) ~[java-instance.jar:?]
    ... 7 more

@eolivelli
Copy link
Contributor Author

@nlu90 that's exactly the same problem that I found.
Thanks for your time in reproducing the issue.

As discussed in the mailing list we have to add a note in the release notes about the upgrade path.

I will send a PR against the website @Anonymitaet

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connector area/function type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants