diff --git a/docs/dev/custom_serializers.md b/docs/dev/custom_serializers.md index 2b72ca0f7ad15..ddfc2ee0e5f1e 100644 --- a/docs/dev/custom_serializers.md +++ b/docs/dev/custom_serializers.md @@ -109,4 +109,16 @@ For Google Protobuf you need the following Maven dependency: Please adjust the versions of both libraries as needed. +### Issue with using Kryo's `JavaSerializer` +If you register Kryo's `JavaSerializer` for your custom type, you may +encounter `ClassNotFoundException`s even though your custom type class is +included in the submitted user code jar. This is due to a know issue with +Kryo's `JavaSerializer`, which may incorrectly use the wrong classloader. + +In this case, you should use `org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer` +instead to resolve the issue. This is a reimplemented `JavaSerializer` in Flink +that makes sure the user code classloader is used. + +Please refer to [FLINK-6025](https://issues.apache.org/jira/browse/FLINK-6025) +for more details. diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java new file mode 100644 index 0000000000000..a51647c11c738 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime.kryo; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoException; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.util.ObjectMap; +import org.apache.flink.util.InstantiationUtil; + +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +/** + * This is a reimplementation of Kryo's {@link com.esotericsoftware.kryo.serializers.JavaSerializer}, + * that additionally makes sure the {@link ObjectInputStream} used for deserialization specifically uses Kryo's + * registered classloader. + * + * Flink maintains this reimplementation due to a known issue with Kryo's {@code JavaSerializer}, in which the wrong + * classloader may be used for deserialization, leading to {@link ClassNotFoundException}s. + * + * @see FLINK-6025 + * @see Known issue with Kryo's JavaSerializer + * + * @param The type to be serialized. + */ +public class JavaSerializer extends Serializer { + + public JavaSerializer() {} + + @SuppressWarnings("unchecked") + @Override + public void write(Kryo kryo, Output output, T o) { + try { + ObjectMap graphContext = kryo.getGraphContext(); + ObjectOutputStream objectStream = (ObjectOutputStream)graphContext.get(this); + if (objectStream == null) { + objectStream = new ObjectOutputStream(output); + graphContext.put(this, objectStream); + } + objectStream.writeObject(o); + objectStream.flush(); + } catch (Exception ex) { + throw new KryoException("Error during Java serialization.", ex); + } + } + + @SuppressWarnings("unchecked") + @Override + public T read(Kryo kryo, Input input, Class aClass) { + try { + ObjectMap graphContext = kryo.getGraphContext(); + ObjectInputStream objectStream = (ObjectInputStream)graphContext.get(this); + if (objectStream == null) { + // make sure we use Kryo's classloader + objectStream = new InstantiationUtil.ClassLoaderObjectInputStream(input, kryo.getClassLoader()); + graphContext.put(this, objectStream); + } + return (T) objectStream.readObject(); + } catch (Exception ex) { + throw new KryoException("Error during Java deserialization.", ex); + } + } + +} diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java index e74e2519fcf68..44c952abbd924 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java @@ -24,7 +24,6 @@ import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import com.esotericsoftware.kryo.serializers.JavaSerializer; import org.apache.avro.generic.GenericData; @@ -130,7 +129,7 @@ public boolean isImmutableType() { @Override public KryoSerializer duplicate() { - return new KryoSerializer(this); + return new KryoSerializer<>(this); } @Override @@ -331,6 +330,8 @@ private void checkKryoInitialized() { kryo.setReferences(true); // Throwable and all subclasses should be serialized via java serialization + // Note: the registered JavaSerializer is Flink's own implementation, and not Kryo's. + // This is due to a know issue with Kryo's JavaSerializer. See FLINK-6025 for details. kryo.addDefaultSerializer(Throwable.class, new JavaSerializer()); // Add default serializers first, so that they type registrations without a serializer diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index d4a031c12303a..6441c86924b3f 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -45,9 +45,7 @@ public final class InstantiationUtil { /** - * A custom ObjectInputStream that can also load user-code using a - * user-code ClassLoader. - * + * A custom ObjectInputStream that can load classes using a specific ClassLoader. */ public static class ClassLoaderObjectInputStream extends ObjectInputStream {