Skip to content

[fix][fn] Use functions classloader in TopicSchema#20090

Closed
gmiklos-ltg wants to merge 2 commits intoapache:branch-2.11from
gmiklos-ltg:serde-source-classloader-fix
Closed

[fix][fn] Use functions classloader in TopicSchema#20090
gmiklos-ltg wants to merge 2 commits intoapache:branch-2.11from
gmiklos-ltg:serde-source-classloader-fix

Conversation

@gmiklos-ltg
Copy link
Contributor

Fixes #5350

Motivation

While evaluating Pulsar Functions for our data transformation use-cases we ran into the following startup issue with our function that was using a custom SerDe class:
2023-04-12T11:30:35,368+0200 [public/default/user-transform-new-2-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - [public/default/user-transform-new-2:0] Uncaught exception in Java Instance java.lang.RuntimeException: User class must be in class path at org.apache.pulsar.common.util.Reflections.createInstance(Reflections.java:72) ~[org.apache.pulsar-pulsar-common-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.instance.InstanceUtils.createInstance(InstanceUtils.java:92) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.instance.InstanceUtils.initializeSerDe(InstanceUtils.java:51) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:238) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:246) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.source.TopicSchema.lambda$getSchema$0(TopicSchema.java:68) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at java.util.HashMap.computeIfAbsent(HashMap.java:1220) ~[?:?] at org.apache.pulsar.functions.source.TopicSchema.getSchema(TopicSchema.java:68) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.source.PulsarSource.buildPulsarSourceConsumerConfig(PulsarSource.java:176) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.source.SingleConsumerPulsarSource.open(SingleConsumerPulsarSource.java:69) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupInput(JavaInstanceRunnable.java:774) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setup(JavaInstanceRunnable.java:226) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:259) ~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0] at java.lang.Thread.run(Thread.java:833) ~[?:?] Caused by: java.lang.ClassNotFoundException: com.bridge.data.UserMessageSerDe at java.net.URLClassLoader.findClass(URLClassLoader.java:445) ~[?:?] at java.lang.ClassLoader.loadClass(ClassLoader.java:587) ~[?:?] at java.lang.ClassLoader.loadClass(ClassLoader.java:520) ~[?:?] at java.lang.Class.forName0(Native Method) ~[?:?] at java.lang.Class.forName(Class.java:467) ~[?:?] at org.apache.pulsar.common.util.Reflections.createInstance(Reflections.java:70) ~[org.apache.pulsar-pulsar-common-2.11.0.jar:2.11.0] ... 13 more

We made it absolutely sure that the class is included in the fat jar that we have compiled.

config.yaml looks like this:

tenant: public
namespace: default
name: user-transform
className: com.bridge.data.TransformUserMessage
inputs:
  - "persistent://public/default/debezium.public.users"
customSerdeInputs:
  "persistent://public/default/debezium.public.users": com.bridge.data.UserMessageSerDe
output: "persistent://public/default/debezium.public.users-public"
outputSerdeClassName: com.bridge.data.PublicUserSerDe
jar: "transform-function-v1.jar"
parallelism: 1
retainOrdering: true

The mentioned UserMessageSerDe class is just a very simple wrapper around Jackson to deserialize UserMessage data classes.

Modifications

I have modified TopicSchema in such a way that it accepts a functions classloader parameter which will be used by newSchemaInstance() to resolve this class. This way our function can now start up without any issues.

Verifying this change

  • Make sure that the change passes the CI checks.

This change is already covered by existing tests, such as TopicSchemaTest.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: [(https://github.com/gmiklos-ltg/pull/2)]

…lassNotFoundException when using custom SerDe classes.
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Apr 13, 2023
@tisonkun tisonkun requested review from nlu90 and tisonkun April 13, 2023 13:13
@tisonkun
Copy link
Member

Generally, we firstly review a PR against master branch and pick it to maintained branches. Unless this is a 2.11 specific issue, please close this one and open a new PR against master branch first.

@gmiklos-ltg
Copy link
Contributor Author

Generally, we firstly review a PR against master branch and pick it to maintained branches. Unless this is a 2.11 specific issue, please close this one and open a new PR against master branch first.

Hi @tisonkun! I have prepared a PR for master here:
#20115

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants