From 4825f2fce000744ead50368473eeb1022855edc7 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 28 Oct 2014 16:47:43 +0100 Subject: [PATCH 1/2] Added canCreateInstance method to type serializers. Conflicts: flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java --- .../streamrecord/StreamRecordSerializer.java | 5 ++++ .../api/common/typeutils/TypeSerializer.java | 10 +++++-- .../typeutils/base/BooleanSerializer.java | 7 ++++- .../base/BooleanValueSerializer.java | 7 ++++- .../common/typeutils/base/ByteSerializer.java | 7 ++++- .../typeutils/base/ByteValueSerializer.java | 7 ++++- .../common/typeutils/base/CharSerializer.java | 7 ++++- .../typeutils/base/CharValueSerializer.java | 7 ++++- .../typeutils/base/DoubleSerializer.java | 5 ++++ .../typeutils/base/DoubleValueSerializer.java | 7 ++++- .../typeutils/base/FloatSerializer.java | 5 ++++ .../typeutils/base/FloatValueSerializer.java | 7 ++++- .../base/GenericArraySerializer.java | 7 ++++- .../common/typeutils/base/IntSerializer.java | 7 ++++- .../typeutils/base/IntValueSerializer.java | 7 ++++- .../common/typeutils/base/LongSerializer.java | 7 ++++- .../typeutils/base/LongValueSerializer.java | 7 ++++- .../typeutils/base/ShortSerializer.java | 5 ++++ .../typeutils/base/ShortValueSerializer.java | 7 ++++- .../typeutils/base/StringSerializer.java | 9 ++++-- .../typeutils/base/StringValueSerializer.java | 7 ++++- .../BooleanPrimitiveArraySerializer.java | 12 +++++--- .../array/BytePrimitiveArraySerializer.java | 11 +++++-- .../array/CharPrimitiveArraySerializer.java | 12 +++++--- .../array/DoublePrimitiveArraySerializer.java | 12 +++++--- .../array/FloatPrimitiveArraySerializer.java | 12 +++++--- .../array/IntPrimitiveArraySerializer.java | 12 +++++--- .../array/LongPrimitiveArraySerializer.java | 12 +++++--- .../array/ShortPrimitiveArraySerializer.java | 12 +++++--- .../base/array/StringArraySerializer.java | 11 ++++--- .../typeutils/record/RecordSerializer.java | 7 ++++- .../typeutils/runtime/AvroSerializer.java | 7 ++++- .../runtime/CopyableValueSerializer.java | 5 ++++ .../typeutils/runtime/KryoSerializer.java | 29 +++++++++---------- .../typeutils/runtime/PojoSerializer.java | 15 ++++++++-- .../runtime/TupleSerializerBase.java | 12 +++++++- .../typeutils/runtime/ValueSerializer.java | 7 ++++- .../typeutils/runtime/WritableSerializer.java | 7 ++++- .../testutils/types/IntListSerializer.java | 7 ++++- .../testutils/types/IntPairSerializer.java | 5 ++++ .../testutils/types/StringPairSerializer.java | 7 ++++- .../VertexWithAdjacencyListSerializer.java | 7 ++++- .../VertexWithRankAndDanglingSerializer.java | 7 ++++- .../types/VertexWithRankSerializer.java | 7 ++++- .../scala/io/CollectionInputFormatTest.scala | 2 +- 45 files changed, 296 insertions(+), 85 deletions(-) diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java index 85faa9e2f3cc2..37ad6f75aeb21 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java @@ -51,6 +51,11 @@ public boolean isStateful() { return false; } + @Override + public boolean canCreateInstance() { + return typeSerializer.canCreateInstance(); + } + @Override public StreamRecord createInstance() { try { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java index 5e32c86cf403e..24e4fbbf27a12 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java @@ -59,8 +59,14 @@ public abstract class TypeSerializer implements Serializable { * @return True, if the serializer is stateful, false if it is stateless; */ public abstract boolean isStateful(); - - + + + /** + * Gets whether an instance of the underlying type can be created by this serializer + * @return + */ + public abstract boolean canCreateInstance(); + // -------------------------------------------------------------------------------------------- // Instantiation & Cloning // -------------------------------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java index ecfb3c2ef1a3a..c9976a02052e1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java @@ -40,7 +40,12 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return true; + } + @Override public Boolean createInstance() { return FALSE; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java index 47950551c08b5..f6c14baddd897 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java @@ -41,7 +41,12 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return true; + } + @Override public BooleanValue createInstance() { return new BooleanValue(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java index 32f3edd805914..7cd972d7f4023 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java @@ -42,7 +42,12 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return true; + } + @Override public Byte createInstance() { return ZERO; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java index 24cc98b497695..ed6997ef9ecfb 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java @@ -41,7 +41,12 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return true; + } + @Override public ByteValue createInstance() { return new ByteValue(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java index c46d3a02318a5..4051d9435d46c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java @@ -42,7 +42,12 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return true; + } + @Override public Character createInstance() { return ZERO; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java index 71a8ef4aafb9a..1c5cf411e255b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java @@ -40,7 +40,12 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return true; + } + @Override public CharValue createInstance() { return new CharValue(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java index 8e09f7cf3ead5..6c98ec5fcb024 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java @@ -42,6 +42,11 @@ public boolean isStateful() { return false; } + @Override + public boolean canCreateInstance() { + return true; + } + @Override public Double createInstance() { return ZERO; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java index f4c7f3782dedc..f533f946d2d25 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java @@ -41,7 +41,12 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return true; + } + @Override public DoubleValue createInstance() { return new DoubleValue(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java index b1a46b0a3fafc..90dac31fb5774 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java @@ -42,6 +42,11 @@ public boolean isStateful() { return false; } + @Override + public boolean canCreateInstance() { + return true; + } + @Override public Float createInstance() { return ZERO; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java index 6ebb268735a72..d552b8a0ce550 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java @@ -41,7 +41,12 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return true; + } + @Override public FloatValue createInstance() { return new FloatValue(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java index 9d616e2318ea7..ae0417fc7bfc9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java @@ -62,7 +62,12 @@ public boolean isStateful() { return this.componentSerializer.isStateful(); } - + @Override + public boolean canCreateInstance() { + return true; + } + + @Override public C[] createInstance() { return EMPTY; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java index 2937b2ad8d915..dc88a99e1a6a7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java @@ -42,7 +42,12 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return true; + } + @Override public Integer createInstance() { return ZERO; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java index ec1f345361194..d9b67d565d5ab 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java @@ -41,7 +41,12 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return true; + } + @Override public IntValue createInstance() { return new IntValue(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java index 6b25596ced541..2a63072ca50ac 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java @@ -42,7 +42,12 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return true; + } + @Override public Long createInstance() { return ZERO; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java index 95caf04f76b3c..4cfe017f761a7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java @@ -41,7 +41,12 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return true; + } + @Override public LongValue createInstance() { return new LongValue(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java index c6e7870cb81c4..533bff12226be 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java @@ -43,6 +43,11 @@ public boolean isStateful() { return false; } + @Override + public boolean canCreateInstance() { + return true; + } + @Override public Short createInstance() { return ZERO; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java index ab58987021427..5241a50bbe48c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java @@ -41,7 +41,12 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return true; + } + @Override public ShortValue createInstance() { return new ShortValue(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java index 71221a20f8cf8..aa29fa07acb81 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java @@ -31,8 +31,6 @@ public final class StringSerializer extends TypeSerializerSingleton { public static final StringSerializer INSTANCE = new StringSerializer(); - private static final String EMPTY = ""; - @Override public boolean isImmutableType() { return true; @@ -43,9 +41,14 @@ public boolean isStateful() { return false; } + @Override + public boolean canCreateInstance() { + return false; + } + @Override public String createInstance() { - return EMPTY; + throw new UnsupportedOperationException("StringSerializer cannot create an instance."); } @Override diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java index c5d5b55a0b11b..9bb8304157b74 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java @@ -43,7 +43,12 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return true; + } + @Override public StringValue createInstance() { return new StringValue(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java index e9941a88c753c..568bea3f1ac89 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java @@ -31,8 +31,6 @@ public final class BooleanPrimitiveArraySerializer extends TypeSerializerSinglet private static final long serialVersionUID = 1L; - private static final boolean[] EMPTY = new boolean[0]; - public static final BooleanPrimitiveArraySerializer INSTANCE = new BooleanPrimitiveArraySerializer(); @@ -45,10 +43,16 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return false; + } + @Override public boolean[] createInstance() { - return EMPTY; + throw new UnsupportedOperationException("BooleanPrimitiveArraySerializer cannot create an" + + " instance."); } @Override diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java index aaf867f2d08c7..249ac78016968 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java @@ -31,7 +31,6 @@ public final class BytePrimitiveArraySerializer extends TypeSerializerSingleton< private static final long serialVersionUID = 1L; - private static final byte[] EMPTY = new byte[0]; public static final BytePrimitiveArraySerializer INSTANCE = new BytePrimitiveArraySerializer(); @@ -44,10 +43,16 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return false; + } + @Override public byte[] createInstance() { - return EMPTY; + throw new UnsupportedOperationException("BytePrimitiveArraySerializer cannot create an " + + "instance."); } @Override diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java index 64632bd3d7673..c0e6042311703 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java @@ -31,8 +31,6 @@ public final class CharPrimitiveArraySerializer extends TypeSerializerSingleton< private static final long serialVersionUID = 1L; - private static final char[] EMPTY = new char[0]; - public static final CharPrimitiveArraySerializer INSTANCE = new CharPrimitiveArraySerializer(); @@ -45,10 +43,16 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return false; + } + @Override public char[] createInstance() { - return EMPTY; + throw new UnsupportedOperationException("CharPrimitiveArraySerializer cannot create an " + + "instance."); } @Override diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java index 846ae748b24e5..a02116e7c8a25 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java @@ -31,8 +31,6 @@ public final class DoublePrimitiveArraySerializer extends TypeSerializerSingleto private static final long serialVersionUID = 1L; - private static final double[] EMPTY = new double[0]; - public static final DoublePrimitiveArraySerializer INSTANCE = new DoublePrimitiveArraySerializer(); @@ -45,10 +43,16 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return false; + } + @Override public double[] createInstance() { - return EMPTY; + throw new UnsupportedOperationException("DoublePrimitiveArraySerializer cannot create an " + + "instance."); } @Override diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java index 8f42ac8b6f92a..da83607bad6f9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java @@ -31,8 +31,6 @@ public final class FloatPrimitiveArraySerializer extends TypeSerializerSingleton private static final long serialVersionUID = 1L; - private static final float[] EMPTY = new float[0]; - public static final FloatPrimitiveArraySerializer INSTANCE = new FloatPrimitiveArraySerializer(); @@ -45,10 +43,16 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return false; + } + @Override public float[] createInstance() { - return EMPTY; + throw new UnsupportedOperationException("FloatPrimitiveArraySerializer cannot create an " + + "instance."); } @Override diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java index 2ab056cb321c9..c94c49018f8e4 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java @@ -31,8 +31,6 @@ public class IntPrimitiveArraySerializer extends TypeSerializerSingleton{ private static final long serialVersionUID = 1L; - private static final int[] EMPTY = new int[0]; - public static final IntPrimitiveArraySerializer INSTANCE = new IntPrimitiveArraySerializer(); @@ -45,10 +43,16 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return false; + } + @Override public int[] createInstance() { - return EMPTY; + throw new UnsupportedOperationException("IntPrimitiveArraySerializer cannot create an " + + "instance."); } @Override diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java index 5d34dfe37d483..c6435c3893318 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java @@ -31,8 +31,6 @@ public final class LongPrimitiveArraySerializer extends TypeSerializerSingleton< private static final long serialVersionUID = 1L; - private static final long[] EMPTY = new long[0]; - public static final LongPrimitiveArraySerializer INSTANCE = new LongPrimitiveArraySerializer(); @@ -45,10 +43,16 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return false; + } + @Override public long[] createInstance() { - return EMPTY; + throw new UnsupportedOperationException("LongPrimitiveArraySerializer cannot create an " + + "instance."); } @Override diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java index 2f3703338b0f3..4aa4aa94b9424 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java @@ -31,8 +31,6 @@ public final class ShortPrimitiveArraySerializer extends TypeSerializerSingleton private static final long serialVersionUID = 1L; - private static final short[] EMPTY = new short[0]; - public static final ShortPrimitiveArraySerializer INSTANCE = new ShortPrimitiveArraySerializer(); @@ -45,10 +43,16 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return false; + } + @Override public short[] createInstance() { - return EMPTY; + throw new UnsupportedOperationException("ShortPrimitiveArraySerializer cannot create an " + + "instance."); } @Override diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java index d5ab030e12c67..bc2c5340aa827 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java @@ -33,8 +33,6 @@ public final class StringArraySerializer extends TypeSerializerSingleton extends TypeSerializer { private static final long serialVersionUID = 1L; private final Class type; - private final Class typeToInstantiate; private transient Kryo kryo; - private transient T copyInstance; - + private transient DataOutputView previousOut; private transient DataInputView previousIn; @@ -44,19 +42,18 @@ public class KryoSerializer extends TypeSerializer { private transient Output output; public KryoSerializer(Class type){ - this(type,type); - } - - public KryoSerializer(Class type, Class typeToInstantiate){ - if(type == null || typeToInstantiate == null){ + if(type == null){ throw new NullPointerException("Type class cannot be null."); } this.type = type; +<<<<<<< HEAD this.typeToInstantiate = typeToInstantiate; kryo = new Kryo(); kryo.setAsmEnabled(true); kryo.register(type); +======= +>>>>>>> 77468f0... Added canCreateInstance method to type serializers. } @Override @@ -69,10 +66,13 @@ public boolean isStateful() { return true; } + @Override + public boolean canCreateInstance() { return false; } + @Override public T createInstance() { - checkKryoInitialized(); - return kryo.newInstance(typeToInstantiate); + throw new UnsupportedOperationException("The KryoSerializer cannot create instances " + + "because its type parameter might be an interface or abstract."); } @Override @@ -124,11 +124,8 @@ public T deserialize(T reuse, DataInputView source) throws IOException { @Override public void copy(DataInputView source, DataOutputView target) throws IOException { checkKryoInitialized(); - if(this.copyInstance == null){ - this.copyInstance = createInstance(); - } - T tmp = deserialize(copyInstance, source); + T tmp = deserialize(source); serialize(tmp, target); } @@ -136,14 +133,14 @@ public void copy(DataInputView source, DataOutputView target) throws IOException @Override public int hashCode() { - return type.hashCode() + 31 * typeToInstantiate.hashCode(); + return type.hashCode(); } @Override public boolean equals(Object obj) { if (obj != null && obj instanceof KryoSerializer) { KryoSerializer other = (KryoSerializer) obj; - return other.type == this.type && other.typeToInstantiate == this.typeToInstantiate; + return other.type == this.type; } else { return false; } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index 99b9f6551271a..9229f7d2a67a3 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -44,6 +44,8 @@ public final class PojoSerializer extends TypeSerializer { private final boolean stateful; + private final boolean canCreateInstance; + @SuppressWarnings("unchecked") public PojoSerializer(Class clazz, TypeSerializer[] fieldSerializers, Field[] fields) { @@ -57,13 +59,16 @@ public PojoSerializer(Class clazz, TypeSerializer[] fieldSerializers, Fiel } boolean stateful = false; + boolean canCreateInstance = true; for (TypeSerializer ser : fieldSerializers) { if (ser.isStateful()) { stateful = true; - break; } + + canCreateInstance &= ser.canCreateInstance(); } this.stateful = stateful; + this.canCreateInstance = canCreateInstance; } private void writeObject(ObjectOutputStream out) @@ -112,8 +117,12 @@ public boolean isImmutableType() { public boolean isStateful() { return this.stateful; } - - + + @Override + public boolean canCreateInstance() { + return canCreateInstance; + } + @Override public T createInstance() { try { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java index 08df7d3c0a755..5062313562a4d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java @@ -38,6 +38,8 @@ public abstract class TupleSerializerBase extends TypeSerializer { protected final boolean stateful; + protected final boolean canCreateInstance; + @SuppressWarnings("unchecked") public TupleSerializerBase(Class tupleClass, TypeSerializer[] fieldSerializers) { @@ -46,13 +48,16 @@ public TupleSerializerBase(Class tupleClass, TypeSerializer[] fieldSeriali this.arity = fieldSerializers.length; boolean stateful = false; + boolean canCreateInstance = true; for (TypeSerializer ser : fieldSerializers) { if (ser.isStateful()) { stateful = true; - break; } + + canCreateInstance &= ser.canCreateInstance(); } this.stateful = stateful; + this.canCreateInstance = canCreateInstance; } @@ -66,6 +71,11 @@ public boolean isStateful() { return this.stateful; } + @Override + public boolean canCreateInstance(){ + return canCreateInstance; + } + @Override public int getLength() { return -1; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java index d6c35cbffa8cc..a4a5aefe5c730 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java @@ -65,7 +65,12 @@ public boolean isImmutableType() { public boolean isStateful() { return true; } - + + @Override + public boolean canCreateInstance() { + return true; + } + @Override public T createInstance() { return InstantiationUtil.instantiate(this.type); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java index 0fe8fdf3d9f76..031f456ad6cc3 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java @@ -96,7 +96,12 @@ public boolean isImmutableType() { public boolean isStateful() { return true; } - + + @Override + public boolean canCreateInstance() { + return true; + } + // -------------------------------------------------------------------------------------------- private void ensureInstanceInstantiated() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java index 2134bcd134825..634d6ce31f99d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java @@ -38,7 +38,12 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return true; + } + @Override public IntList createInstance() { return new IntList(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java index 361585d2fd8ff..dae218364da2f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java @@ -42,6 +42,11 @@ public boolean isStateful() { return false; } + @Override + public boolean canCreateInstance() { + return true; + } + @Override public IntPair createInstance() { return new IntPair(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java index a38633cc40c4d..6b7ef2e369f18 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java @@ -38,7 +38,12 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return true; + } + @Override public StringPair createInstance() { return new StringPair(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java index 822b4f23a60ec..0d24f7bd0c2aa 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java @@ -38,7 +38,12 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return true; + } + @Override public VertexWithAdjacencyList createInstance() { return new VertexWithAdjacencyList(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java index e972cd1b82901..acb935cbaaff6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java @@ -38,7 +38,12 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return true; + } + @Override public VertexWithRankAndDangling createInstance() { return new VertexWithRankAndDangling(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java index 928d4f41f9783..f6538bd12baf9 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java @@ -38,7 +38,12 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return true; + } + @Override public VertexWithRank createInstance() { return new VertexWithRank(); diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala index a81dc45615f39..dffbfec88213d 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala @@ -31,7 +31,7 @@ import java.io.ObjectOutputStream import org.apache.flink.api.scala._ import scala.collection.JavaConverters._ -class ElementType(val id: Int) { +class ElementType(var id: Int) { def this() { this(-1) } From cb075ffac35fd558685354c740f210bde020d478 Mon Sep 17 00:00:00 2001 From: twalthr Date: Wed, 26 Nov 2014 21:08:01 +0100 Subject: [PATCH 2/2] Update remaining serializers --- .../common/typeutils/base/DateSerializer.java | 7 ++++- .../common/typeutils/base/EnumSerializer.java | 7 ++++- .../typeutils/runtime/KryoSerializer.java | 29 ++++++++++++------- 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java index 4bd2ea8e7799c..b5314b54e3e28 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java @@ -40,7 +40,12 @@ public boolean isImmutableType() { public boolean isStateful() { return false; } - + + @Override + public boolean canCreateInstance() { + return true; + } + @Override public Date createInstance() { return new Date(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java index a99fbf54725a8..ce29948097785 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java @@ -50,6 +50,11 @@ public boolean isStateful() { return false; } + @Override + public boolean canCreateInstance() { + return true; + } + @Override public T createInstance() { return values[0]; @@ -93,7 +98,7 @@ public void copy(DataInputView source, DataOutputView target) throws IOException @Override public boolean equals(Object obj) { if(obj instanceof EnumSerializer) { - EnumSerializer other = (EnumSerializer) obj; + EnumSerializer other = (EnumSerializer) obj; return other.enumClass == this.enumClass; } else { return false; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java index c405f81b3c45e..9da082b6f9cfc 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java @@ -32,9 +32,11 @@ public class KryoSerializer extends TypeSerializer { private static final long serialVersionUID = 1L; private final Class type; + private final Class typeToInstantiate; private transient Kryo kryo; - + private transient T copyInstance; + private transient DataOutputView previousOut; private transient DataInputView previousIn; @@ -42,18 +44,19 @@ public class KryoSerializer extends TypeSerializer { private transient Output output; public KryoSerializer(Class type){ - if(type == null){ + this(type,type); + } + + public KryoSerializer(Class type, Class typeToInstantiate){ + if(type == null || typeToInstantiate == null){ throw new NullPointerException("Type class cannot be null."); } this.type = type; -<<<<<<< HEAD this.typeToInstantiate = typeToInstantiate; kryo = new Kryo(); kryo.setAsmEnabled(true); kryo.register(type); -======= ->>>>>>> 77468f0... Added canCreateInstance method to type serializers. } @Override @@ -67,12 +70,13 @@ public boolean isStateful() { } @Override - public boolean canCreateInstance() { return false; } + public boolean canCreateInstance() { + return false; + } @Override public T createInstance() { - throw new UnsupportedOperationException("The KryoSerializer cannot create instances " + - "because its type parameter might be an interface or abstract."); + throw new UnsupportedOperationException("KryoSerializer cannot create an instance."); } @Override @@ -124,8 +128,11 @@ public T deserialize(T reuse, DataInputView source) throws IOException { @Override public void copy(DataInputView source, DataOutputView target) throws IOException { checkKryoInitialized(); + if(this.copyInstance == null){ + this.copyInstance = createInstance(); + } - T tmp = deserialize(source); + T tmp = deserialize(copyInstance, source); serialize(tmp, target); } @@ -133,14 +140,14 @@ public void copy(DataInputView source, DataOutputView target) throws IOException @Override public int hashCode() { - return type.hashCode(); + return type.hashCode() + 31 * typeToInstantiate.hashCode(); } @Override public boolean equals(Object obj) { if (obj != null && obj instanceof KryoSerializer) { KryoSerializer other = (KryoSerializer) obj; - return other.type == this.type; + return other.type == this.type && other.typeToInstantiate == this.typeToInstantiate; } else { return false; }