Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClassNotFoundException when submitting a Jet pipeline #26343

Open
maragud opened this issue May 24, 2024 · 1 comment
Open

ClassNotFoundException when submitting a Jet pipeline #26343

maragud opened this issue May 24, 2024 · 1 comment

Comments

@maragud
Copy link

maragud commented May 24, 2024

Describe the bug
I tried submitting a Hazelcast Jet job from a client to the cluster using the following configuration.
Hazelcast member configuration

hazelcast:
  cluster-name: dev
  network:
    join:
      tcp-ip:
        enabled: true
        member-list:
          - <node1>:5701
          - <node2>:5701
          - <node3>:5701
...
  user-code-deployment:
    enabled: false
  jet:
    enabled: true
    resource-upload-enabled: true

Hazelcast client configuration

hazelcast-client:
  cluster-name: dev
  network:
    cluster-members:
      - <node1>:5701
      - <node2>:5701
      - <node3>:5701

Hazelcast client code

package com.example;

public static void main(String[] args) {
    Pipeline p = Pipeline.create();
    HazelcastInstance hz = HazelcastClient.newHazelcastClient();
    BatchStage<Long> filter = p.readFrom(TestSources.items(1L, 2L, 3L, 4L))
            .writeTo(Sinks.mapWithUpdating("test", x -> 1L, (BiFunctionEx & Serializable) (x, y) -> x));

    JobConfig jobConfig = new JobConfig()
            .setName("test-pipeline")
            .addPackage("com.example");

    hz.getJet().newJob(p, jobConfig).join();
}

While the above example throws a class not found exception if we change the pipeline’s writer to the following it works without throwing an exception.

...
    BatchStage<Long> filter = p.readFrom(TestSources.items(1L, 2L, 3L, 4L))
            .writeTo(Sinks.map("test", x -> 1L, x -> x));
...

The mapWithMerging method also throws an exception.
Also if the Hazelcast cluster contains only 1 member it will not throw the exception.

I tried to understand what causes the error, and from what I understood the com.hazelcast.jet.impl.connector.UpdateMapP.readData cannot read the updateFn in the members because the thread that it is running the readData is not using a JetClassLoader which contains the classes loaded from the jobConfig.addPackage("com.example"), while in the case of the Sinks.map(...) it was using the correct class loader.

Exception stack trace

hazelcast-jet                 | com.hazelcast.nio.serialization.HazelcastSerializationException: java.lang.ClassNotFoundException: com.example.Main
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.defaultserializers.JavaDefaultSerializers$JavaSerializer.read(JavaDefaultSerializers.java:99)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.defaultserializers.JavaDefaultSerializers$JavaSerializer.read(JavaDefaultSerializers.java:88)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:357)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.InternalSerializationService.readObject(InternalSerializationService.java:81)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:600)
hazelcast-jet                 |         at com.hazelcast.jet.impl.connector.UpdateMapP$ApplyFnEntryProcessor.readData(UpdateMapP.java:180)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:168)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:114)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:53)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:357)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.InternalSerializationService.readObject(InternalSerializationService.java:81)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:600)
hazelcast-jet                 |         at com.hazelcast.internal.namespace.impl.NoOpUserCodeNamespaceService.callWithNamespace(NoOpUserCodeNamespaceService.java:88)
hazelcast-jet                 |         at com.hazelcast.internal.namespace.NamespaceUtil.callWithNamespace(NamespaceUtil.java:128)
hazelcast-jet                 |         at com.hazelcast.map.impl.operation.MapOperation.callWithNamespaceAwareness(MapOperation.java:559)
hazelcast-jet                 |         at com.hazelcast.map.impl.operation.MultipleEntryBackupOperation.readInternal(MultipleEntryBackupOperation.java:56)
hazelcast-jet                 |         at com.hazelcast.spi.impl.operationservice.Operation.readData(Operation.java:804)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:168)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:114)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:53)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:357)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.InternalSerializationService.readObject(InternalSerializationService.java:81)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:600)
hazelcast-jet                 |         at com.hazelcast.spi.impl.operationservice.impl.operations.Backup.readInternal(Backup.java:320)
hazelcast-jet                 |         at com.hazelcast.spi.impl.operationservice.Operation.readData(Operation.java:804)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:168)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:114)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:53)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:271)
hazelcast-jet                 |         at com.hazelcast.spi.impl.NodeEngineImpl.toObject(NodeEngineImpl.java:429)
hazelcast-jet                 |         at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:482)
hazelcast-jet                 |         at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:185)
hazelcast-jet                 |         at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:141)
hazelcast-jet                 |         at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.loop(OperationThread.java:134)
hazelcast-jet                 |         at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:115)
hazelcast-jet                 |         at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:111)
hazelcast-jet                 | Caused by: java.lang.ClassNotFoundException: com.example.Main
hazelcast-jet                 |         at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
hazelcast-jet                 |         at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
hazelcast-jet                 |         at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
hazelcast-jet                 |         at com.hazelcast.internal.namespace.impl.NamespaceAwareClassLoader.loadClass(NamespaceAwareClassLoader.java:49)
hazelcast-jet                 |         at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
hazelcast-jet                 |         at com.hazelcast.internal.nio.ClassLoaderUtil.tryLoadClass(ClassLoaderUtil.java:308)
hazelcast-jet                 |         at com.hazelcast.internal.nio.ClassLoaderUtil.loadClass(ClassLoaderUtil.java:266)
hazelcast-jet                 |         at com.hazelcast.internal.nio.IOUtil$ClassLoaderAwareObjectInputStream.resolveClass(IOUtil.java:952)
hazelcast-jet                 |         at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2045)
hazelcast-jet                 |         at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1909)
hazelcast-jet                 |         at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2235)
hazelcast-jet                 |         at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1744)
hazelcast-jet                 |         at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:514)
hazelcast-jet                 |         at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:472)
hazelcast-jet                 |         at com.hazelcast.internal.serialization.impl.defaultserializers.JavaDefaultSerializers$JavaSerializer.read(JavaDefaultSerializers.java:95)
hazelcast-jet                 |         ... 39 more

After this test, I decided that it is probably better to run the pipeline to the client and use the remoteMapWith(Updating/Merging) sink to write the data to the other Hazelcast cluster, but even in this case, I found the same error occurs.
Example with remoteMapWithUpdating

p.readFrom(TestSources.items(1L, 2L, 3L, 4L))
                .writeTo(Sinks.remoteMapWithUpdating("test",
                        clientConfig,
                        x -> x,
                        (x, y) -> x));

Expected behavior
Run the above example with 2 or more Hazelcast members and a client that submits the job.

To Reproduce
As described above

Additional context
Hazelcast 5.4.0
Java 17

@Fly-Style
Copy link
Member

Hi, @maragud! Thank you for report, I'll take a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants