From 924b02afd7b7ae519dbadfbff88354689ef728ce Mon Sep 17 00:00:00 2001 From: Fabian Hueske Date: Thu, 9 Oct 2014 15:04:20 +0200 Subject: [PATCH] [FLINK-1151] Adds statistics for collection data sources based on collection size and serializer info. Adds getMinimumLength() to TypeSerializer to get a lower bound for size estimations. --- .../streamrecord/StreamRecordSerializer.java | 5 +++ .../api/common/typeutils/TypeSerializer.java | 8 +++++ .../typeutils/base/BooleanSerializer.java | 5 +++ .../base/BooleanValueSerializer.java | 5 +++ .../common/typeutils/base/ByteSerializer.java | 5 +++ .../typeutils/base/ByteValueSerializer.java | 5 +++ .../common/typeutils/base/CharSerializer.java | 5 +++ .../typeutils/base/CharValueSerializer.java | 5 +++ .../typeutils/base/DoubleSerializer.java | 5 +++ .../typeutils/base/DoubleValueSerializer.java | 5 +++ .../typeutils/base/FloatSerializer.java | 5 +++ .../typeutils/base/FloatValueSerializer.java | 5 +++ .../base/GenericArraySerializer.java | 5 +++ .../common/typeutils/base/IntSerializer.java | 5 +++ .../typeutils/base/IntValueSerializer.java | 5 +++ .../common/typeutils/base/LongSerializer.java | 5 +++ .../typeutils/base/LongValueSerializer.java | 5 +++ .../typeutils/base/ShortSerializer.java | 5 +++ .../typeutils/base/ShortValueSerializer.java | 5 +++ .../typeutils/base/StringSerializer.java | 5 +++ .../typeutils/base/StringValueSerializer.java | 5 +++ .../BooleanPrimitiveArraySerializer.java | 4 +++ .../array/BytePrimitiveArraySerializer.java | 4 +++ .../array/CharPrimitiveArraySerializer.java | 4 +++ .../array/DoublePrimitiveArraySerializer.java | 4 +++ .../array/FloatPrimitiveArraySerializer.java | 4 +++ .../array/IntPrimitiveArraySerializer.java | 4 +++ .../array/LongPrimitiveArraySerializer.java | 4 +++ .../array/ShortPrimitiveArraySerializer.java | 4 +++ .../base/array/StringArraySerializer.java | 5 +++ .../typeutils/record/RecordSerializer.java | 5 +++ .../api/java/io/CollectionInputFormat.java | 34 +++++++++++++++++++ .../typeutils/runtime/AvroSerializer.java | 5 +++ .../runtime/CopyableValueSerializer.java | 5 +++ .../typeutils/runtime/KryoSerializer.java | 5 +++ .../typeutils/runtime/PojoSerializer.java | 8 +++++ .../typeutils/runtime/TupleSerializer.java | 9 +++++ .../runtime/TupleSerializerBase.java | 2 +- .../typeutils/runtime/ValueSerializer.java | 5 +++ .../typeutils/runtime/WritableSerializer.java | 5 +++ .../testutils/types/IntListSerializer.java | 5 +++ .../testutils/types/IntPairSerializer.java | 5 +++ .../testutils/types/StringPairSerializer.java | 5 +++ .../scala/typeutils/CaseClassSerializer.scala | 8 +++++ .../VertexWithAdjacencyListSerializer.java | 5 +++ .../VertexWithRankAndDanglingSerializer.java | 5 +++ .../types/VertexWithRankSerializer.java | 5 +++ 47 files changed, 265 insertions(+), 1 deletion(-) diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java index 85faa9e2f3cc2..81e635ac0868c 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java @@ -84,6 +84,11 @@ public StreamRecord copy(StreamRecord from, StreamRecord reuse) { public int getLength() { return -1; } + + @Override + public int getMinimumLength() { + return 20 + typeSerializer.getMinimumLength(); + } @Override public void serialize(StreamRecord value, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java index 87d7e208aa8cf..4d19afb9f5371 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java @@ -100,6 +100,14 @@ public abstract class TypeSerializer implements Serializable { */ public abstract int getLength(); + /** + * Gets the minimum length of the data type. + * The minimum length might be used as a lower bound for size estimations. + * + * @return The minimum length of the data type. + */ + public abstract int getMinimumLength(); + // -------------------------------------------------------------------------------------------- /** diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java index ecfb3c2ef1a3a..e611f7fcfbe1f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java @@ -60,6 +60,11 @@ public Boolean copy(Boolean from, Boolean reuse) { public int getLength() { return 1; } + + @Override + public int getMinimumLength() { + return getLength(); + } @Override public void serialize(Boolean record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java index 47950551c08b5..ac5b55e57da70 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java @@ -64,6 +64,11 @@ public BooleanValue copy(BooleanValue from, BooleanValue reuse) { public int getLength() { return 1; } + + @Override + public int getMinimumLength() { + return getLength(); + } @Override public void serialize(BooleanValue record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java index 32f3edd805914..6f35299aea6ea 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java @@ -62,6 +62,11 @@ public Byte copy(Byte from, Byte reuse) { public int getLength() { return 1; } + + @Override + public int getMinimumLength() { + return getLength(); + } @Override public void serialize(Byte record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java index 24cc98b497695..e7a1ab6904987 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java @@ -62,6 +62,11 @@ public ByteValue copy(ByteValue from, ByteValue reuse) { public int getLength() { return 1; } + + @Override + public int getMinimumLength() { + return getLength(); + } @Override public void serialize(ByteValue record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java index c46d3a02318a5..1127ae7af9057 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java @@ -62,6 +62,11 @@ public Character copy(Character from, Character reuse) { public int getLength() { return 2; } + + @Override + public int getMinimumLength() { + return getLength(); + } @Override public void serialize(Character record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java index 71a8ef4aafb9a..b721adae889f8 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java @@ -61,6 +61,11 @@ public CharValue copy(CharValue from, CharValue reuse) { public int getLength() { return 2; } + + @Override + public int getMinimumLength() { + return getLength(); + } @Override public void serialize(CharValue record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java index 8e09f7cf3ead5..747a04db34c6a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java @@ -61,6 +61,11 @@ public Double copy(Double from, Double reuse) { public int getLength() { return 8; } + + @Override + public int getMinimumLength() { + return getLength(); + } @Override public void serialize(Double record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java index f4c7f3782dedc..39aa967e35185 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java @@ -63,6 +63,11 @@ public int getLength() { return 8; } + @Override + public int getMinimumLength() { + return getLength(); + } + @Override public void serialize(DoubleValue record, DataOutputView target) throws IOException { record.write(target); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java index b1a46b0a3fafc..ba4e957634eca 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java @@ -62,6 +62,11 @@ public int getLength() { return 4; } + @Override + public int getMinimumLength() { + return getLength(); + } + @Override public void serialize(Float record, DataOutputView target) throws IOException { target.writeFloat(record.floatValue()); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java index 6ebb268735a72..538f96a93d5f0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java @@ -62,6 +62,11 @@ public FloatValue copy(FloatValue from, FloatValue reuse) { public int getLength() { return 4; } + + @Override + public int getMinimumLength() { + return getLength(); + } @Override public void serialize(FloatValue record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java index 504b41b1bf7c8..dcb322b53493f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java @@ -88,6 +88,11 @@ public C[] copy(C[] from, C[] reuse) { public int getLength() { return -1; } + + @Override + public int getMinimumLength() { + return 4; + } @Override public void serialize(C[] value, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java index 2937b2ad8d915..393d226778b16 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java @@ -62,6 +62,11 @@ public Integer copy(Integer from, Integer reuse) { public int getLength() { return 4; } + + @Override + public int getMinimumLength() { + return getLength(); + } @Override public void serialize(Integer record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java index ec1f345361194..fdcd450e2c0f0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java @@ -62,6 +62,11 @@ public IntValue copy(IntValue from, IntValue reuse) { public int getLength() { return 4; } + + @Override + public int getMinimumLength() { + return getLength(); + } @Override public void serialize(IntValue record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java index 6b25596ced541..b95eadd86519c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java @@ -62,6 +62,11 @@ public Long copy(Long from, Long reuse) { public int getLength() { return 8; } + + @Override + public int getMinimumLength() { + return getLength(); + } @Override public void serialize(Long record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java index 95caf04f76b3c..9fa406372e8ca 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java @@ -62,6 +62,11 @@ public LongValue copy(LongValue from, LongValue reuse) { public int getLength() { return 8; } + + @Override + public int getMinimumLength() { + return getLength(); + } @Override public void serialize(LongValue record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java index c6e7870cb81c4..fc4098d3d15d3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java @@ -62,6 +62,11 @@ public Short copy(Short from, Short reuse) { public int getLength() { return 2; } + + @Override + public int getMinimumLength() { + return getLength(); + } @Override public void serialize(Short record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java index ab58987021427..496716e57234b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java @@ -62,6 +62,11 @@ public ShortValue copy(ShortValue from, ShortValue reuse) { public int getLength() { return 2; } + + @Override + public int getMinimumLength() { + return getLength(); + } @Override public void serialize(ShortValue record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java index 71221a20f8cf8..9f71e66b95412 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java @@ -62,6 +62,11 @@ public String copy(String from, String reuse) { public int getLength() { return -1; } + + @Override + public int getMinimumLength() { + return 1; + } @Override public void serialize(String record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java index c5d5b55a0b11b..281deb41be5fb 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java @@ -64,6 +64,11 @@ public StringValue copy(StringValue from, StringValue reuse) { public int getLength() { return -1; } + + @Override + public int getMinimumLength() { + return 1; + } @Override public void serialize(StringValue record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java index e9941a88c753c..148f83dee637c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java @@ -68,6 +68,10 @@ public int getLength() { return -1; } + @Override + public int getMinimumLength() { + return 4; + } @Override public void serialize(boolean[] record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java index aaf867f2d08c7..38039f15e84f7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java @@ -67,6 +67,10 @@ public int getLength() { return -1; } + @Override + public int getMinimumLength() { + return 4; + } @Override public void serialize(byte[] record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java index 64632bd3d7673..1d52f0a2cb95a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java @@ -68,6 +68,10 @@ public int getLength() { return -1; } + @Override + public int getMinimumLength() { + return 4; + } @Override public void serialize(char[] record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java index 846ae748b24e5..f3c56c16496c4 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java @@ -68,6 +68,10 @@ public int getLength() { return -1; } + @Override + public int getMinimumLength() { + return 4; + } @Override public void serialize(double[] record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java index 8f42ac8b6f92a..35e4f28a993f1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java @@ -68,6 +68,10 @@ public int getLength() { return -1; } + @Override + public int getMinimumLength() { + return 4; + } @Override public void serialize(float[] record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java index 2ab056cb321c9..44ac3bbebd244 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java @@ -68,6 +68,10 @@ public int getLength() { return -1; } + @Override + public int getMinimumLength() { + return 4; + } @Override public void serialize(int[] record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java index 5d34dfe37d483..fc466b6172739 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java @@ -68,6 +68,10 @@ public int getLength() { return -1; } + @Override + public int getMinimumLength() { + return 4; + } @Override public void serialize(long[] record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java index 2f3703338b0f3..27ce30b2583a3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java @@ -68,6 +68,10 @@ public int getLength() { return -1; } + @Override + public int getMinimumLength() { + return 4; + } @Override public void serialize(short[] record, DataOutputView target) throws IOException { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java index d5ab030e12c67..02561a2e09248 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java @@ -70,6 +70,11 @@ public int getLength() { return -1; } + @Override + public int getMinimumLength() { + return 4; + } + @Override public void serialize(String[] record, DataOutputView target) throws IOException { if (record == null) { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java index 7b72e894b9d9f..40b92e0acfb60 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java @@ -82,6 +82,11 @@ public int getLength() { return -1; } + @Override + public int getMinimumLength() { + return 1; + } + // -------------------------------------------------------------------------------------------- @Override diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java index 77ba66613dfe3..0ab8192877b6b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.io.GenericInputFormat; import org.apache.flink.api.common.io.NonParallelInput; +import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.memory.InputViewObjectInputStreamWrapper; @@ -49,6 +50,7 @@ public class CollectionInputFormat extends GenericInputFormat implements N public CollectionInputFormat(Collection dataSet, TypeSerializer serializer) { + if (dataSet == null) { throw new NullPointerException(); } @@ -75,6 +77,11 @@ public void open(GenericInputSplit split) throws IOException { public T nextRecord(T record) throws IOException { return this.iterator.next(); } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { + return new CollectionStatistics(this.dataSet.size(), this.serializer.getMinimumLength()); + } // -------------------------------------------------------------------------------------------- @@ -128,4 +135,31 @@ public static void checkCollection(Collection elements, Class viewedAs } } } + + public static class CollectionStatistics implements BaseStatistics { + + private final long numRecords; + private final float recordWidth; + + public CollectionStatistics(long numRecords, float recordWidth) { + this.numRecords = numRecords; + this.recordWidth = recordWidth; + } + + @Override + public long getTotalInputSize() { + return (long)this.recordWidth * this.numRecords; + } + + @Override + public long getNumberOfRecords() { + return this.numRecords; + } + + @Override + public float getAverageRecordWidth() { + return recordWidth; + } + + } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java index 31a04c9ebf873..30ca27991dbfc 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java @@ -104,6 +104,11 @@ public T copy(T from, T reuse) { public int getLength() { return -1; } + + @Override + public int getMinimumLength() { + return Math.max(reader.getSchema().getFixedSize(), 1); + } @Override public void serialize(T value, DataOutputView target) throws IOException { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java index 9d12d7e6e7968..52ddad28ef23e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java @@ -72,6 +72,11 @@ public int getLength() { ensureInstanceInstantiated(); return instance.getBinaryLength(); } + + @Override + public int getMinimumLength() { + return getLength(); + } @Override public void serialize(T value, DataOutputView target) throws IOException { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java index a3acb2073bbcd..c314041407d4b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java @@ -91,6 +91,11 @@ public T copy(T from, T reuse) { public int getLength() { return -1; } + + @Override + public int getMinimumLength() { + return 1; + } @Override public void serialize(T record, DataOutputView target) throws IOException { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index 99b9f6551271a..c311a7d2a31c7 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -171,6 +171,14 @@ public int getLength() { return -1; } + @Override + public int getMinimumLength() { + int minLen = 0; + for(TypeSerializer s : this.fieldSerializers) { + minLen += s.getMinimumLength(); + } + return minLen; + } @Override public void serialize(T value, DataOutputView target) throws IOException { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java index ae429a773fb8b..69bec5f1a544a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java @@ -118,6 +118,15 @@ public T deserialize(T reuse, DataInputView source) throws IOException { return reuse; } + @Override + public int getMinimumLength() { + int minLen = 0; + for(TypeSerializer s : this.fieldSerializers) { + minLen += s.getMinimumLength(); + } + return minLen; + } + private T instantiateRaw() { try { return tupleClass.newInstance(); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java index 08df7d3c0a755..c8abd49d7a3c6 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java @@ -70,7 +70,7 @@ public boolean isStateful() { public int getLength() { return -1; } - + public int getArity() { return arity; } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java index 69a5ff62ab1e3..1ce4de0be8ceb 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java @@ -87,6 +87,11 @@ public T copy(T from, T reuse) { public int getLength() { return -1; } + + @Override + public int getMinimumLength() { + return 1; + } @Override public void serialize(T value, DataOutputView target) throws IOException { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java index d5a0470368d04..6ab3a189ab6c1 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java @@ -64,6 +64,11 @@ public int getLength() { return -1; } + @Override + public int getMinimumLength() { + return 1; + } + @Override public void serialize(T record, DataOutputView target) throws IOException { record.write(target); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java index 2134bcd134825..3e3128894dc96 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java @@ -70,6 +70,11 @@ public int getLength() { return -1; } + @Override + public int getMinimumLength() { + return 8; + } + @Override public void serialize(IntList record, DataOutputView target) throws IOException { target.writeInt(record.getKey()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java index 361585d2fd8ff..5b1267a38e530 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java @@ -64,6 +64,11 @@ public IntPair copy(IntPair from, IntPair reuse) { public int getLength() { return 8; } + + @Override + public int getMinimumLength() { + return getLength(); + } @Override public void serialize(IntPair record, DataOutputView target) throws IOException { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java index a38633cc40c4d..10155a717585e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java @@ -59,6 +59,11 @@ public StringPair copy(StringPair from, StringPair reuse) { public int getLength() { return -1; } + + @Override + public int getMinimumLength() { + return 2; + } @Override public void serialize(StringPair record, DataOutputView target) throws IOException { diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala index f9cd10c9037e7..ae01ac8b23a36 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala @@ -83,4 +83,12 @@ abstract class CaseClassSerializer[T <: Product]( fields = new Array[AnyRef](arity) } } + + def getMinimumLength(): Int = { + var minLen = 0; + for(s <- fieldSerializers) { + minLen += s.getMinimumLength() + } + minLen + } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java index 822b4f23a60ec..2df8cfee7f613 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java @@ -68,6 +68,11 @@ public VertexWithAdjacencyList copy(VertexWithAdjacencyList from, VertexWithAdja public int getLength() { return -1; } + + @Override + public int getMinimumLength() { + return 12; + } @Override public void serialize(VertexWithAdjacencyList record, DataOutputView target) throws IOException { diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java index e972cd1b82901..10d27818f0276 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java @@ -61,6 +61,11 @@ public VertexWithRankAndDangling copy(VertexWithRankAndDangling from, VertexWith public int getLength() { return 17; } + + @Override + public int getMinimumLength() { + return getLength(); + } @Override public void serialize(VertexWithRankAndDangling record, DataOutputView target) throws IOException { diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java index 928d4f41f9783..2e7b3333cb80c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java @@ -60,6 +60,11 @@ public VertexWithRank copy(VertexWithRank from, VertexWithRank reuse) { public int getLength() { return 16; } + + @Override + public int getMinimumLength() { + return getLength(); + } @Override public void serialize(VertexWithRank record, DataOutputView target) throws IOException {