From f0342685a8c34bc23e60cb67296863d3cac88518 Mon Sep 17 00:00:00 2001 From: mjsax Date: Sun, 2 Aug 2015 21:38:31 +0200 Subject: [PATCH] [FLINK-2457] Integrate Tuple0 - extended TupleType helper classes to handle Tuple0 - extended TupleSerializer to handle Tuple0 - included Tuple0 into JUnit tests - simplified Receiver.createTuple(int) --- .../apache/flink/api/java/tuple/Tuple.java | 6 +- .../apache/flink/api/java/tuple/Tuple0.java | 60 +++++++++++- .../flink/api/java/tuple/TupleGenerator.java | 11 +-- .../api/java/tuple/builder/Tuple0Builder.java | 46 ++++++++++ .../java/tuple/builder/Tuple10Builder.java | 4 +- .../java/tuple/builder/Tuple11Builder.java | 4 +- .../java/tuple/builder/Tuple12Builder.java | 4 +- .../java/tuple/builder/Tuple13Builder.java | 4 +- .../java/tuple/builder/Tuple14Builder.java | 4 +- .../java/tuple/builder/Tuple15Builder.java | 4 +- .../java/tuple/builder/Tuple16Builder.java | 4 +- .../java/tuple/builder/Tuple17Builder.java | 4 +- .../java/tuple/builder/Tuple18Builder.java | 4 +- .../java/tuple/builder/Tuple19Builder.java | 4 +- .../api/java/tuple/builder/Tuple1Builder.java | 4 +- .../java/tuple/builder/Tuple20Builder.java | 4 +- .../java/tuple/builder/Tuple21Builder.java | 4 +- .../java/tuple/builder/Tuple22Builder.java | 4 +- .../java/tuple/builder/Tuple23Builder.java | 4 +- .../java/tuple/builder/Tuple24Builder.java | 4 +- .../java/tuple/builder/Tuple25Builder.java | 4 +- .../api/java/tuple/builder/Tuple2Builder.java | 4 +- .../api/java/tuple/builder/Tuple3Builder.java | 4 +- .../api/java/tuple/builder/Tuple4Builder.java | 4 +- .../api/java/tuple/builder/Tuple5Builder.java | 4 +- .../api/java/tuple/builder/Tuple6Builder.java | 4 +- .../api/java/tuple/builder/Tuple7Builder.java | 4 +- .../api/java/tuple/builder/Tuple8Builder.java | 4 +- .../api/java/tuple/builder/Tuple9Builder.java | 4 +- .../api/java/typeutils/TupleTypeInfo.java | 8 +- .../api/java/typeutils/TupleTypeInfoBase.java | 18 ++-- .../api/java/typeutils/TypeExtractor.java | 22 ++++- .../api/java/typeutils/TypeInfoParser.java | 35 ++++--- .../typeutils/runtime/Tuple0Serializer.java | 91 +++++++++++++++++++ .../typeutils/runtime/TupleSerializer.java | 2 +- .../type/extractor/TypeExtractorTest.java | 22 +++++ .../java/typeutils/TypeInfoParserTest.java | 8 ++ .../runtime/TupleSerializerTest.java | 16 +++- .../api/java/common/streaming/Receiver.java | 70 +++----------- 39 files changed, 363 insertions(+), 152 deletions(-) create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple0Builder.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java index 145d21511e433..3b07aed9312ae 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java @@ -93,10 +93,10 @@ public T getFieldNotNull(int pos){ */ @SuppressWarnings("unchecked") public static Class getTupleClass(int arity) { - if (arity < 1 || arity > MAX_ARITY) { + if (arity < 0 || arity > MAX_ARITY) { throw new IllegalArgumentException("The tuple arity must be in [0, " + MAX_ARITY + "]."); } - return (Class) CLASSES[arity - 1]; + return (Class) CLASSES[arity]; } // -------------------------------------------------------------------------------------------- @@ -106,7 +106,7 @@ public static Class getTupleClass(int arity) { // BEGIN_OF_TUPLE_DEPENDENT_CODE // GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator. private static final Class[] CLASSES = new Class[] { - Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class, Tuple25.class + Tuple0.class, Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class, Tuple25.class }; // END_OF_TUPLE_DEPENDENT_CODE } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple0.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple0.java index 82209bf3db822..c2db0bcbbcb6e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple0.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple0.java @@ -17,23 +17,75 @@ */ package org.apache.flink.api.java.tuple; -public class Tuple0 extends Tuple { +import java.io.ObjectStreamException; +/** + * A tuple with 0 fields. + *

+ * {@code Tuple0} is a singleton. + * + * @see Tuple + */ +public class Tuple0 extends Tuple { private static final long serialVersionUID = 1L; - public Tuple0() {} + public static final Tuple0 instance = new Tuple0(); + + @Override + public int getArity() { + return 0; + } @Override public T getField(int pos) { - return null; + throw new IndexOutOfBoundsException(String.valueOf(pos)); } @Override public void setField(T value, int pos) { + throw new IndexOutOfBoundsException(String.valueOf(pos)); + } + + // ------------------------------------------------------------------------------------------------- + // standard utilities + // ------------------------------------------------------------------------------------------------- + + /** + * Creates a string representation of the tuple in the form "()" + * + * @return The string representation of the tuple. + */ + @Override + public String toString() { + return "()"; } + /** + * Deep equality for tuples by calling equals() on the tuple members + * + * @param o + * the object checked for equality + * @return true if this is equal to o. + */ @Override - public int getArity() { + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Tuple0)) { + return false; + } + return true; + } + + @Override + public int hashCode() { return 0; } + + // singleton deserialization + private Object readResolve() throws ObjectStreamException { + return instance; + } + } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java index a87fa0a7997ba..6ab02e4535fc9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java @@ -402,12 +402,9 @@ private static void modifyJoinProjectOperator(File root) throws IOException { private static void modifyTupleType(File root) throws IOException { // generate code StringBuilder sb = new StringBuilder(); - sb.append("\tprivate static final Class[] CLASSES = new Class[] {\n\t\t"); + sb.append("\tprivate static final Class[] CLASSES = new Class[] {\n\t\tTuple0.class"); for (int i = FIRST; i <= LAST; i++) { - if (i > FIRST) { - sb.append(", "); - } - sb.append("Tuple" + i + ".class"); + sb.append(", Tuple" + i + ".class"); } sb.append("\n\t};"); @@ -802,7 +799,7 @@ private static void writeTupleBuilderClass(PrintWriter w, int numFields) { // package and imports w.println("package " + PACKAGE + "." + BUILDER_SUFFIX + ';'); w.println(); - w.println("import java.util.LinkedList;"); + w.println("import java.util.ArrayList;"); w.println("import java.util.List;"); w.println(); w.println("import " + PACKAGE + ".Tuple" + numFields + ";"); @@ -817,7 +814,7 @@ private static void writeTupleBuilderClass(PrintWriter w, int numFields) { // Class-Attributes - a list of tuples w.print("\tprivate List tuples = new LinkedList tuples = new ArrayList();"); w.println(); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple0Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple0Builder.java new file mode 100644 index 0000000000000..2a1546e0e95f7 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple0Builder.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +// -------------------------------------------------------------- +// THIS IS A GENERATED SOURCE FILE. DO NOT EDIT! +// GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator. +// -------------------------------------------------------------- + + +package org.apache.flink.api.java.tuple.builder; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.java.tuple.Tuple0; + +public class Tuple0Builder { + + private List tuples = new ArrayList(); + + public Tuple0Builder add() { + tuples.add(Tuple0.instance); + return this; + } + + public Tuple0[] build() { + return tuples.toArray(new Tuple0[tuples.size()]); + } + +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple10Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple10Builder.java index 79e30a54fab36..d09ba11bbb281 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple10Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple10Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple10; public class Tuple10Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple10Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9){ tuples.add(new Tuple10(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple11Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple11Builder.java index d1a733ac1620e..727fab0a884fd 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple11Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple11Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple11; public class Tuple11Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple11Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10){ tuples.add(new Tuple11(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple12Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple12Builder.java index bbc8967948a89..7f77a8d6d24ab 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple12Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple12Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple12; public class Tuple12Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple12Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11){ tuples.add(new Tuple12(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple13Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple13Builder.java index 7f22f58736f87..e223e7a89e1bc 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple13Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple13Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple13; public class Tuple13Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple13Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12){ tuples.add(new Tuple13(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple14Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple14Builder.java index a587f4569a21d..a3847c258de94 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple14Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple14Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple14; public class Tuple14Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple14Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13){ tuples.add(new Tuple14(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple15Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple15Builder.java index 8068389890de3..b423a3b8ad318 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple15Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple15Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple15; public class Tuple15Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple15Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14){ tuples.add(new Tuple15(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple16Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple16Builder.java index 478c36dc13298..c6987308863b3 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple16Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple16Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple16; public class Tuple16Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple16Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14, T15 value15){ tuples.add(new Tuple16(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple17Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple17Builder.java index 764b3eb2fdb9a..bad64f237ccd9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple17Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple17Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple17; public class Tuple17Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple17Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14, T15 value15, T16 value16){ tuples.add(new Tuple17(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple18Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple18Builder.java index 6a830e233f5ca..14a79f81e160f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple18Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple18Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple18; public class Tuple18Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple18Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14, T15 value15, T16 value16, T17 value17){ tuples.add(new Tuple18(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple19Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple19Builder.java index 08d14ccb767f4..9acfc2f3fdc32 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple19Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple19Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple19; public class Tuple19Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple19Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14, T15 value15, T16 value16, T17 value17, T18 value18){ tuples.add(new Tuple19(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17, value18)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple1Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple1Builder.java index a8c2ce2c2c3cd..a395cd1815dfc 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple1Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple1Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple1; public class Tuple1Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple1Builder add(T0 value0){ tuples.add(new Tuple1(value0)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple20Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple20Builder.java index 139292ed6cbda..8c7dd1d8213ee 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple20Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple20Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple20; public class Tuple20Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple20Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14, T15 value15, T16 value16, T17 value17, T18 value18, T19 value19){ tuples.add(new Tuple20(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17, value18, value19)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple21Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple21Builder.java index e6481f8201ec0..ba18345840329 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple21Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple21Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple21; public class Tuple21Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple21Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14, T15 value15, T16 value16, T17 value17, T18 value18, T19 value19, T20 value20){ tuples.add(new Tuple21(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17, value18, value19, value20)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple22Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple22Builder.java index 0f4f939fd996d..a4903c8616968 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple22Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple22Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple22; public class Tuple22Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple22Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14, T15 value15, T16 value16, T17 value17, T18 value18, T19 value19, T20 value20, T21 value21){ tuples.add(new Tuple22(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17, value18, value19, value20, value21)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple23Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple23Builder.java index 66e0313b49b8b..a7a8102b1dd6f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple23Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple23Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple23; public class Tuple23Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple23Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14, T15 value15, T16 value16, T17 value17, T18 value18, T19 value19, T20 value20, T21 value21, T22 value22){ tuples.add(new Tuple23(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17, value18, value19, value20, value21, value22)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple24Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple24Builder.java index 9aac48e21c611..650951872d4d8 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple24Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple24Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple24; public class Tuple24Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple24Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14, T15 value15, T16 value16, T17 value17, T18 value18, T19 value19, T20 value20, T21 value21, T22 value22, T23 value23){ tuples.add(new Tuple24(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17, value18, value19, value20, value21, value22, value23)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple25Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple25Builder.java index 087b8cc6b697c..3632eec51c36f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple25Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple25Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple25; public class Tuple25Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple25Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14, T15 value15, T16 value16, T17 value17, T18 value18, T19 value19, T20 value20, T21 value21, T22 value22, T23 value23, T24 value24){ tuples.add(new Tuple25(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17, value18, value19, value20, value21, value22, value23, value24)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple2Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple2Builder.java index b53327d4fe0b7..adf697fdd4aa5 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple2Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple2Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple2; public class Tuple2Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple2Builder add(T0 value0, T1 value1){ tuples.add(new Tuple2(value0, value1)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple3Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple3Builder.java index e6948e93fce66..7a0dee00fbc87 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple3Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple3Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple3; public class Tuple3Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple3Builder add(T0 value0, T1 value1, T2 value2){ tuples.add(new Tuple3(value0, value1, value2)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple4Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple4Builder.java index cdb79c6d3927a..5caad15adf28c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple4Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple4Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple4; public class Tuple4Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple4Builder add(T0 value0, T1 value1, T2 value2, T3 value3){ tuples.add(new Tuple4(value0, value1, value2, value3)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple5Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple5Builder.java index c1b21a2aa71bc..1895acacd1cf0 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple5Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple5Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple5; public class Tuple5Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple5Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4){ tuples.add(new Tuple5(value0, value1, value2, value3, value4)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple6Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple6Builder.java index 9334be14ee53f..625ec8f44e8a2 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple6Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple6Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple6; public class Tuple6Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple6Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5){ tuples.add(new Tuple6(value0, value1, value2, value3, value4, value5)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple7Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple7Builder.java index ebc2166825900..2cdde39b2b736 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple7Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple7Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple7; public class Tuple7Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple7Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6){ tuples.add(new Tuple7(value0, value1, value2, value3, value4, value5, value6)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple8Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple8Builder.java index dd0860f9a62e0..c5feb436c3b51 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple8Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple8Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple8; public class Tuple8Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple8Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7){ tuples.add(new Tuple8(value0, value1, value2, value3, value4, value5, value6, value7)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple9Builder.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple9Builder.java index c6639852a74a0..2665d7d0a3748 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple9Builder.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/builder/Tuple9Builder.java @@ -25,14 +25,14 @@ package org.apache.flink.api.java.tuple.builder; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.tuple.Tuple9; public class Tuple9Builder { - private List> tuples = new LinkedList>(); + private List> tuples = new ArrayList>(); public Tuple9Builder add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8){ tuples.add(new Tuple9(value0, value1, value2, value3, value4, value5, value6, value7, value8)); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java index 0e3d61d45cc81..0e59b8e96a965 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.*; +import org.apache.flink.api.java.typeutils.runtime.Tuple0Serializer; //CHECKSTYLE.ON: AvoidStarImport import org.apache.flink.api.java.typeutils.runtime.TupleComparator; import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; @@ -49,7 +50,7 @@ public TupleTypeInfo(TypeInformation... types) { public TupleTypeInfo(Class tupleType, TypeInformation... types) { super(tupleType, types); - if (types == null || types.length == 0 || types.length > Tuple.MAX_ARITY) { + if (types == null || types.length > Tuple.MAX_ARITY) { throw new IllegalArgumentException(); } this.fieldNames = new String[types.length]; @@ -72,8 +73,13 @@ public int getFieldIndex(String fieldName) { return fieldIndex; } + @SuppressWarnings("unchecked") @Override public TupleSerializer createSerializer(ExecutionConfig executionConfig) { + if (this.tupleType == Tuple0.class) { + return (TupleSerializer) Tuple0Serializer.getInstance(); + } + TypeSerializer[] fieldSerializers = new TypeSerializer[getArity()]; for (int i = 0; i < types.length; i++) { fieldSerializers[i] = types[i].createSerializer(executionConfig); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java index 881e690d5ccd7..a2d937f2e8859 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java @@ -160,6 +160,7 @@ public void getFlatFields(String fieldExpression, int offset, List TypeInformation getTypeAt(String fieldExpression) { Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression); @@ -196,6 +197,7 @@ public TypeInformation getTypeAt(String fieldExpression) { } } + @Override public TypeInformation getTypeAt(int pos) { if (pos < 0 || pos >= this.types.length) { throw new IndexOutOfBoundsException(); @@ -227,14 +229,16 @@ public int hashCode() { @Override public String toString() { StringBuilder bld = new StringBuilder("Tuple"); - bld.append(types.length).append('<'); - bld.append(types[0]); - - for (int i = 1; i < types.length; i++) { - bld.append(", ").append(types[i]); + bld.append(types.length); + if (types.length > 0) { + bld.append('<').append(types[0]); + + for (int i = 1; i < types.length; i++) { + bld.append(", ").append(types[i]); + } + + bld.append('>'); } - - bld.append('>'); return bld.toString(); } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 1ae8d3d62fa1f..f2792820b737e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -51,6 +51,7 @@ import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple0; import org.apache.flink.types.Value; import org.apache.flink.util.Collector; import org.apache.hadoop.io.Writable; @@ -407,6 +408,10 @@ private TypeInformation createTypeInfoWithTypeHierarchy(Arr curT = typeToClass(curT).getGenericSuperclass(); } + if(curT == Tuple0.class) { + return new TupleTypeInfo(Tuple0.class, new TypeInformation[0]); + } + // check if immediate child of Tuple has generics if (curT instanceof Class) { throw new InvalidTypesException("Tuple needs to be parameterized by using generics."); @@ -683,7 +688,7 @@ private static Type getParameterType(Class baseClass, ArrayList typeHie private static Type getParameterTypeFromGenericType(Class baseClass, ArrayList typeHierarchy, Type t, int pos) { // base class - if (t instanceof ParameterizedType && baseClass.equals((Class) ((ParameterizedType) t).getRawType())) { + if (t instanceof ParameterizedType && baseClass.equals(((ParameterizedType) t).getRawType())) { if (typeHierarchy != null) { typeHierarchy.add(t); } @@ -783,6 +788,10 @@ else if (typeInfo.isTupleType()) { type = typeToClass(type).getGenericSuperclass(); } + if(type == Tuple0.class) { + return; + } + // check if immediate child of Tuple has generics if (type instanceof Class) { throw new InvalidTypesException("Parameterized Tuple type expected."); @@ -1201,17 +1210,20 @@ private TypeInformation privateGetForClass(Class clazz, // check for subclasses of Tuple if (Tuple.class.isAssignableFrom(clazz)) { - throw new InvalidTypesException("Type information extraction for tuples cannot be done based on the class."); + if(clazz == Tuple0.class) { + return new TupleTypeInfo(Tuple0.class, new TypeInformation[0]); + } + throw new InvalidTypesException("Type information extraction for tuples (except Tuple0) cannot be done based on the class."); } // check for Enums if(Enum.class.isAssignableFrom(clazz)) { - return (TypeInformation) new EnumTypeInfo(clazz); + return new EnumTypeInfo(clazz); } // special case for POJOs generated by Avro. if(SpecificRecordBase.class.isAssignableFrom(clazz)) { - return (TypeInformation) new AvroTypeInfo(clazz); + return new AvroTypeInfo(clazz); } if (countTypeInHierarchy(typeHierarchy, clazz) > 1) { @@ -1500,7 +1512,7 @@ private TypeInformation privateGetForObject(X value) { infos[i] = privateGetForObject(field); } - return (TypeInformation) new TupleTypeInfo(value.getClass(), infos); + return new TupleTypeInfo(value.getClass(), infos); } else { return privateGetForClass((Class) value.getClass(), new ArrayList()); } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java index 2e04acab03a54..b89a830589a61 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java @@ -36,7 +36,7 @@ public class TypeInfoParser { private static final String VALUE_PACKAGE = "org.apache.flink.types"; private static final String WRITABLE_PACKAGE = "org.apache.hadoop.io"; - private static final Pattern tuplePattern = Pattern.compile("^((" + TUPLE_PACKAGE.replaceAll("\\.", "\\\\.") + "\\.)?Tuple[0-9]+)<"); + private static final Pattern tuplePattern = Pattern.compile("^(" + TUPLE_PACKAGE.replaceAll("\\.", "\\\\.") + "\\.)?((Tuple[1-9][0-9]?)<|(Tuple0))"); private static final Pattern writablePattern = Pattern.compile("^((" + WRITABLE_PACKAGE.replaceAll("\\.", "\\\\.") + "\\.)?Writable)<([^\\s,>]*)(,|>|$|\\[)"); private static final Pattern enumPattern = Pattern.compile("^((java\\.lang\\.)?Enum)<([^\\s,>]*)(,|>|$|\\[)"); private static final Pattern basicTypePattern = Pattern @@ -124,18 +124,23 @@ private static TypeInformation parse(StringBuilder sb) throws ClassNotFoundEx // tuples if (tupleMatcher.find()) { - String className = tupleMatcher.group(1); - sb.delete(0, className.length() + 1); - int arity = Integer.parseInt(className.replaceAll("\\D", "")); - - Class clazz; - // check if fully qualified - if (className.startsWith(TUPLE_PACKAGE)) { - clazz = loadClass(className); + boolean isGenericTuple = true; + String className = tupleMatcher.group(3); + if(className == null) { // matched Tuple0 + isGenericTuple = false; + className = tupleMatcher.group(2); + sb.delete(0, className.length()); } else { - clazz = loadClass(TUPLE_PACKAGE + "." + className); + sb.delete(0, className.length() + 1); // +1 for "<" + } + + if (infoString.startsWith(TUPLE_PACKAGE)) { + sb.delete(0, TUPLE_PACKAGE.length() + 1); // +1 for trailing "." } + int arity = Integer.parseInt(className.replaceAll("\\D", "")); + Class clazz = loadClass(TUPLE_PACKAGE + "." + className); + TypeInformation[] types = new TypeInformation[arity]; for (int i = 0; i < arity; i++) { types[i] = parse(sb); @@ -143,11 +148,13 @@ private static TypeInformation parse(StringBuilder sb) throws ClassNotFoundEx throw new IllegalArgumentException("Tuple arity does not match given parameters."); } } - if (sb.charAt(0) != '>') { - throw new IllegalArgumentException("Tuple arity does not match given parameters."); + if (isGenericTuple) { + if(sb.charAt(0) != '>') { + throw new IllegalArgumentException("Tuple arity does not match given parameters."); + } + // remove '>' + sb.deleteCharAt(0); } - // remove '>' - sb.deleteCharAt(0); returnType = new TupleTypeInfo(clazz, types); } // writable types diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java new file mode 100644 index 0000000000000..e2d43e1e3e39c --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime; + +import java.io.IOException; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple0; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public class Tuple0Serializer extends TupleSerializer { + private static final long serialVersionUID = 1278813169022975971L; + + private static final Tuple0Serializer singleton = new Tuple0Serializer(); + + private Tuple0Serializer() { + super(Tuple0.class, new TypeSerializer[0]); + } + + public static Tuple0Serializer getInstance() { + return singleton; + } + + @Override + public Tuple0Serializer duplicate() { + return this; + } + + @Override + public Tuple0 createInstance() { + return Tuple0.instance; + } + + @Override + public Tuple0 createInstance(Object[] fields) { + if (fields == null || fields.length == 0) { + return Tuple0.instance; + } + + throw new UnsupportedOperationException( + "Tuple0 cannot take any data, as it has zero fields."); + } + + @Override + public Tuple0 copy(Tuple0 from) { + return Tuple0.instance; + } + + @Override + public Tuple0 copy(Tuple0 from, Tuple0 reuse) { + return reuse; + } + + @Override + public int getLength() { + return 1; + } + + @Override + public void serialize(Tuple0 record, DataOutputView target) throws IOException { + target.writeByte(42); + } + + @Override + public Tuple0 deserialize(DataInputView source) throws IOException { + source.readByte(); + return Tuple0.instance; + } + + @Override + public Tuple0 deserialize(Tuple0 reuse, DataInputView source) throws IOException { + source.readByte(); + return reuse; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + target.writeByte(source.readByte()); + } + +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java index 2b330c25b479e..46e39901eec84 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java @@ -27,7 +27,7 @@ import org.apache.flink.types.NullFieldException; -public final class TupleSerializer extends TupleSerializerBase { +public class TupleSerializer extends TupleSerializerBase { private static final long serialVersionUID = 1L; diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java index dade55c009c63..17fa8d8386aa1 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java @@ -40,6 +40,7 @@ import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple0; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; @@ -265,6 +266,27 @@ public void flatMap(Tuple3, Tuple1, Tuple2> Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, ((TupleTypeInfo) tti2.getTypeAt(2)).getTypeAt(1)); } + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testTuple0() { + // use getFlatMapReturnTypes() + RichFlatMapFunction function = new RichFlatMapFunction() { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(Tuple0 value, Collector out) throws Exception { + // nothing to do + } + }; + + TypeInformation ti = TypeExtractor.getFlatMapReturnTypes(function, + (TypeInformation) TypeInfoParser.parse("Tuple0")); + + Assert.assertTrue(ti.isTupleType()); + Assert.assertEquals(0, ti.getArity()); + Assert.assertTrue(ti instanceof TupleTypeInfo); + } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Test public void testSubclassOfTuple() { diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java index eadf96dc82ef4..9fe8174950407 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java @@ -144,6 +144,14 @@ public void testTuples() { Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ((TupleTypeInfo)ti).getTypeAt(0)); Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, ((TupleTypeInfo)ti).getTypeAt(1)); + ti = TypeInfoParser.parse("Tuple0"); + Assert.assertEquals(0, ti.getArity()); + Assert.assertEquals("Java Tuple0", ti.toString()); + + ti = TypeInfoParser.parse("org.apache.flink.api.java.tuple.Tuple0"); + Assert.assertEquals(0, ti.getArity()); + Assert.assertEquals("Java Tuple0", ti.toString()); + ti = TypeInfoParser.parse("Tuple3, Tuple1, Tuple2>"); Assert.assertEquals("Java Tuple3, Java Tuple1, Java Tuple2>", ti.toString()); } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java index 96f8306ca8fc5..beda8e93cf5ad 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple0; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; @@ -40,7 +41,14 @@ import org.junit.Test; public class TupleSerializerTest { - + + @Test + public void testTuple0() { + Tuple0[] testTuples = new Tuple0[] { Tuple0.instance, Tuple0.instance, Tuple0.instance }; + + runTests(testTuples); + } + @Test public void testTuple1Int() { @SuppressWarnings("unchecked") @@ -214,7 +222,11 @@ private void runTests(T... instances) { Class tupleClass = tupleTypeInfo.getTypeClass(); - TupleSerializerTestInstance test = new TupleSerializerTestInstance(serializer, tupleClass, -1, instances); + int length = -1; + if(tupleClass == Tuple0.class) { + length = 1; + } + TupleSerializerTestInstance test = new TupleSerializerTestInstance(serializer, tupleClass, length, instances); test.testAll(); } catch (Exception e) { diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java index 23720d71a89a1..59ed20cca6dbe 100644 --- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java +++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java @@ -32,6 +32,8 @@ * General-purpose class to read data from memory-mapped files. */ public class Receiver implements Serializable { + private static final long serialVersionUID = -2474088929850009968L; + private final AbstractRichFunction function; private File inputFile; @@ -39,7 +41,7 @@ public class Receiver implements Serializable { private FileChannel inputChannel; private MappedByteBuffer fileBuffer; - private Deserializer deserializer = null; + private Deserializer deserializer = null; public Receiver(AbstractRichFunction function) { this.function = function; @@ -196,6 +198,7 @@ private Object receiveField(boolean normalized) throws IOException { * @param bufferSize size of the buffer * @throws IOException */ + @SuppressWarnings({ "rawtypes", "unchecked" }) public void collectBuffer(Collector c, int bufferSize) throws IOException { fileBuffer.position(0); @@ -209,7 +212,7 @@ public void collectBuffer(Collector c, int bufferSize) throws IOException { } //=====Deserializer================================================================================================= - private Deserializer getDeserializer(byte type) { + private Deserializer getDeserializer(byte type) { switch (type) { case TYPE_TUPLE: return new TupleDeserializer(); @@ -324,7 +327,7 @@ public byte[] deserialize() { } private class TupleDeserializer implements Deserializer { - Deserializer[] deserializer = null; + Deserializer[] deserializer = null; Tuple reuse; public TupleDeserializer() { @@ -346,61 +349,12 @@ public Tuple deserialize() { } public static Tuple createTuple(int size) { - switch (size) { - case 0: - return new Tuple0(); - case 1: - return new Tuple1(); - case 2: - return new Tuple2(); - case 3: - return new Tuple3(); - case 4: - return new Tuple4(); - case 5: - return new Tuple5(); - case 6: - return new Tuple6(); - case 7: - return new Tuple7(); - case 8: - return new Tuple8(); - case 9: - return new Tuple9(); - case 10: - return new Tuple10(); - case 11: - return new Tuple11(); - case 12: - return new Tuple12(); - case 13: - return new Tuple13(); - case 14: - return new Tuple14(); - case 15: - return new Tuple15(); - case 16: - return new Tuple16(); - case 17: - return new Tuple17(); - case 18: - return new Tuple18(); - case 19: - return new Tuple19(); - case 20: - return new Tuple20(); - case 21: - return new Tuple21(); - case 22: - return new Tuple22(); - case 23: - return new Tuple23(); - case 24: - return new Tuple24(); - case 25: - return new Tuple25(); - default: - throw new IllegalArgumentException("Tuple size not supported: " + size); + try { + return Tuple.getTupleClass(size).newInstance(); + } catch (InstantiationException e) { + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); } } }