From 5ef83e310c90286b85a5c4f6715c193a56899012 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Thu, 12 Feb 2015 12:32:27 +0100 Subject: [PATCH 1/2] [FLINK-1391] Register common Avro types at Kryo --- .../flink/api/java/typeutils/runtime/KryoSerializer.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java index b73f0b1b4cd4a..87ad0cf249e92 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java @@ -33,6 +33,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.util.Utf8; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -232,8 +233,13 @@ private void checkKryoInitialized() { if(SpecificRecordBase.class.isAssignableFrom(type)) { ClassTag tag = scala.reflect.ClassTag$.MODULE$.apply(type); this.kryo.register(type, com.twitter.chill.avro.AvroSerializer.SpecificRecordSerializer(tag)); - } + // register Avro types. + this.kryo.register(Utf8.class); + this.kryo.register(GenericData.EnumSymbol.class); + this.kryo.register(GenericData.Fixed.class); + this.kryo.register(GenericData.StringType.class); + // Avro POJOs contain java.util.List which have GenericData.Array as their runtime type // because Kryo is not able to serialize them properly, we use this serializer for them this.kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializer(ArrayList.class)); From e3d4d35fe1e7c29a8e7319d3c4ac9f7aef55437d Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Thu, 12 Feb 2015 14:18:17 +0100 Subject: [PATCH 2/2] pr --- .../api/java/typeutils/runtime/KryoSerializer.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java index 87ad0cf249e92..8d8a9225b728a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java @@ -233,12 +233,14 @@ private void checkKryoInitialized() { if(SpecificRecordBase.class.isAssignableFrom(type)) { ClassTag tag = scala.reflect.ClassTag$.MODULE$.apply(type); this.kryo.register(type, com.twitter.chill.avro.AvroSerializer.SpecificRecordSerializer(tag)); + + // register Avro types. + this.kryo.register(Utf8.class); + this.kryo.register(GenericData.EnumSymbol.class); + this.kryo.register(GenericData.Fixed.class); + this.kryo.register(GenericData.StringType.class); } - // register Avro types. - this.kryo.register(Utf8.class); - this.kryo.register(GenericData.EnumSymbol.class); - this.kryo.register(GenericData.Fixed.class); - this.kryo.register(GenericData.StringType.class); + // Avro POJOs contain java.util.List which have GenericData.Array as their runtime type // because Kryo is not able to serialize them properly, we use this serializer for them