Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-6025] [core] Add Flink's own JavaSerializer for Kryo serialization #3517

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/dev/custom_serializers.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,16 @@ For Google Protobuf you need the following Maven dependency:

Please adjust the versions of both libraries as needed.

### Issue with using Kryo's `JavaSerializer`

If you register Kryo's `JavaSerializer` for your custom type, you may
encounter `ClassNotFoundException`s even though your custom type class is
included in the submitted user code jar. This is due to a know issue with
Kryo's `JavaSerializer`, which may incorrectly use the wrong classloader.

In this case, you should use `org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer`
instead to resolve the issue. This is a reimplemented `JavaSerializer` in Flink
that makes sure the user code classloader is used.

Please refer to [FLINK-6025](https://issues.apache.org/jira/browse/FLINK-6025)
for more details.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.java.typeutils.runtime.kryo;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.util.ObjectMap;
import org.apache.flink.util.InstantiationUtil;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

/**
* This is a reimplementation of Kryo's {@link com.esotericsoftware.kryo.serializers.JavaSerializer},
* that additionally makes sure the {@link ObjectInputStream} used for deserialization specifically uses Kryo's
* registered classloader.
*
* Flink maintains this reimplementation due to a known issue with Kryo's {@code JavaSerializer}, in which the wrong
* classloader may be used for deserialization, leading to {@link ClassNotFoundException}s.
*
* @see <a href="https://issues.apache.org/jira/browse/FLINK-6025">FLINK-6025</a>
* @see <a href="https://github.com/EsotericSoftware/kryo/pull/483">Known issue with Kryo's JavaSerializer</a>
*
* @param <T> The type to be serialized.
*/
public class JavaSerializer<T> extends Serializer<T> {

public JavaSerializer() {}

@SuppressWarnings("unchecked")
@Override
public void write(Kryo kryo, Output output, T o) {
try {
ObjectMap graphContext = kryo.getGraphContext();
ObjectOutputStream objectStream = (ObjectOutputStream)graphContext.get(this);
if (objectStream == null) {
objectStream = new ObjectOutputStream(output);
graphContext.put(this, objectStream);
}
objectStream.writeObject(o);
objectStream.flush();
} catch (Exception ex) {
throw new KryoException("Error during Java serialization.", ex);
}
}

@SuppressWarnings("unchecked")
@Override
public T read(Kryo kryo, Input input, Class aClass) {
try {
ObjectMap graphContext = kryo.getGraphContext();
ObjectInputStream objectStream = (ObjectInputStream)graphContext.get(this);
if (objectStream == null) {
// make sure we use Kryo's classloader
objectStream = new InstantiationUtil.ClassLoaderObjectInputStream(input, kryo.getClassLoader());
graphContext.put(this, objectStream);
}
return (T) objectStream.readObject();
} catch (Exception ex) {
throw new KryoException("Error during Java deserialization.", ex);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;

import org.apache.avro.generic.GenericData;

Expand Down Expand Up @@ -130,7 +129,7 @@ public boolean isImmutableType() {

@Override
public KryoSerializer<T> duplicate() {
return new KryoSerializer<T>(this);
return new KryoSerializer<>(this);
}

@Override
Expand Down Expand Up @@ -331,6 +330,8 @@ private void checkKryoInitialized() {
kryo.setReferences(true);

// Throwable and all subclasses should be serialized via java serialization
// Note: the registered JavaSerializer is Flink's own implementation, and not Kryo's.
// This is due to a know issue with Kryo's JavaSerializer. See FLINK-6025 for details.
kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());

// Add default serializers first, so that they type registrations without a serializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@
public final class InstantiationUtil {

/**
* A custom ObjectInputStream that can also load user-code using a
* user-code ClassLoader.
*
* A custom ObjectInputStream that can load classes using a specific ClassLoader.
*/
public static class ClassLoaderObjectInputStream extends ObjectInputStream {

Expand Down