From 44870e4c7448f2c976ddd35b95d6aec0d25f2249 Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Sun, 12 Mar 2017 22:46:27 +0800 Subject: [PATCH] [FLINK-6025] [core] Add Flink's own JavaSerializer for Kryo serialization This commit adds a reimplemented JavaSerializer to be registered with Kryo. This is due to a know issue with Kryo's JavaSerializer that may use the wrong classloader for deserialzation. Instead of registering Kryo's JavaSerializer for Throwables, it is now changed to register the reimplemented JavaSerializer. Users who bump into ClassNotFoundExceptions if they are using Kryo's JavaSerializer for their own types are also recommended to change to Flink's JavaSerializer. --- docs/apis/best_practices.md | 13 +++ .../runtime/kryo/JavaSerializer.java | 82 +++++++++++++++++++ .../runtime/kryo/KryoSerializer.java | 5 +- .../apache/flink/util/InstantiationUtil.java | 4 +- 4 files changed, 99 insertions(+), 5 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java diff --git a/docs/apis/best_practices.md b/docs/apis/best_practices.md index 7ae1b64541bfc..7e0c3e4ff4187 100644 --- a/docs/apis/best_practices.md +++ b/docs/apis/best_practices.md @@ -270,6 +270,19 @@ For Google Protobuf you need the following Maven dependency: Please adjust the versions of both libraries as needed. +### Issue with using Kryo's `JavaSerializer` + +If you register Kryo's `JavaSerializer` for your custom type, you may +encounter `ClassNotFoundException`s even though your custom type class is +included in the submitted user code jar. This is due to a know issue with +Kryo's `JavaSerializer`, which may incorrectly use the wrong classloader. + +In this case, you should use `org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer` +instead to resolve the issue. This is a reimplemented `JavaSerializer` in Flink +that makes sure the user code classloader is used. + +Please refer to [FLINK-6025](https://issues.apache.org/jira/browse/FLINK-6025) +for more details. ## Using Logback instead of Log4j diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java new file mode 100644 index 0000000000000..a51647c11c738 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime.kryo; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoException; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.util.ObjectMap; +import org.apache.flink.util.InstantiationUtil; + +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +/** + * This is a reimplementation of Kryo's {@link com.esotericsoftware.kryo.serializers.JavaSerializer}, + * that additionally makes sure the {@link ObjectInputStream} used for deserialization specifically uses Kryo's + * registered classloader. + * + * Flink maintains this reimplementation due to a known issue with Kryo's {@code JavaSerializer}, in which the wrong + * classloader may be used for deserialization, leading to {@link ClassNotFoundException}s. + * + * @see FLINK-6025 + * @see Known issue with Kryo's JavaSerializer + * + * @param The type to be serialized. + */ +public class JavaSerializer extends Serializer { + + public JavaSerializer() {} + + @SuppressWarnings("unchecked") + @Override + public void write(Kryo kryo, Output output, T o) { + try { + ObjectMap graphContext = kryo.getGraphContext(); + ObjectOutputStream objectStream = (ObjectOutputStream)graphContext.get(this); + if (objectStream == null) { + objectStream = new ObjectOutputStream(output); + graphContext.put(this, objectStream); + } + objectStream.writeObject(o); + objectStream.flush(); + } catch (Exception ex) { + throw new KryoException("Error during Java serialization.", ex); + } + } + + @SuppressWarnings("unchecked") + @Override + public T read(Kryo kryo, Input input, Class aClass) { + try { + ObjectMap graphContext = kryo.getGraphContext(); + ObjectInputStream objectStream = (ObjectInputStream)graphContext.get(this); + if (objectStream == null) { + // make sure we use Kryo's classloader + objectStream = new InstantiationUtil.ClassLoaderObjectInputStream(input, kryo.getClassLoader()); + graphContext.put(this, objectStream); + } + return (T) objectStream.readObject(); + } catch (Exception ex) { + throw new KryoException("Error during Java deserialization.", ex); + } + } + +} diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java index e74e2519fcf68..44c952abbd924 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java @@ -24,7 +24,6 @@ import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import com.esotericsoftware.kryo.serializers.JavaSerializer; import org.apache.avro.generic.GenericData; @@ -130,7 +129,7 @@ public boolean isImmutableType() { @Override public KryoSerializer duplicate() { - return new KryoSerializer(this); + return new KryoSerializer<>(this); } @Override @@ -331,6 +330,8 @@ private void checkKryoInitialized() { kryo.setReferences(true); // Throwable and all subclasses should be serialized via java serialization + // Note: the registered JavaSerializer is Flink's own implementation, and not Kryo's. + // This is due to a know issue with Kryo's JavaSerializer. See FLINK-6025 for details. kryo.addDefaultSerializer(Throwable.class, new JavaSerializer()); // Add default serializers first, so that they type registrations without a serializer diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index bcf71f0aa2567..70464e72c28e3 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -44,9 +44,7 @@ public final class InstantiationUtil { /** - * A custom ObjectInputStream that can also load user-code using a - * user-code ClassLoader. - * + * A custom ObjectInputStream that can load classes using a specific ClassLoader. */ public static class ClassLoaderObjectInputStream extends ObjectInputStream {