From b27c26500ca5da79000423a88ca0f3b98df28971 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Sun, 12 Oct 2014 10:32:48 +0200 Subject: [PATCH] Enable forgotten JoinITCase (for POJOs) for Scala and Java API --- .../typeutils/runtime/PojoComparator.java | 3 +-- .../typeutils/runtime/TupleComparator.java | 1 - .../api/scala/operators/JoinITCase.scala | 10 ++++----- .../api/scala/util/CollectionDataSets.scala | 9 ++++++++ .../test/javaApiOperators/CoGroupITCase.java | 6 ++--- .../test/javaApiOperators/JoinITCase.java | 22 +++++++++---------- .../util/CollectionDataSets.java | 22 +++++++++++++++---- 7 files changed, 47 insertions(+), 26 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java index 2cccfcf65b8f2..7c15ecd899eba 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java @@ -199,7 +199,7 @@ private final Object accessField(Field field, Object object) { throw new NullKeyFieldException("Unable to access field "+field+" on object "+object); } catch (IllegalAccessException iaex) { throw new RuntimeException("This should not happen since we call setAccesssible(true) in PojoTypeInfo." - + " fiels: " + field + " obj: " + object); + + " fields: " + field + " obj: " + object); } return object; } @@ -211,7 +211,6 @@ public int hash(T value) { for (; i < this.keyFields.length; i++) { code *= TupleComparatorBase.HASH_SALT[i & 0x1F]; code += this.comparators[i].hash(accessField(keyFields[i], value)); - } return code; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java index 89b77945d962e..875ecc2760cd1 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java @@ -49,7 +49,6 @@ public int hash(T value) { int i = 0; try { int code = this.comparators[0].hash(value.getFieldNotNull(keyPositions[0])); - for (i = 1; i < this.keyPositions.length; i++) { code *= HASH_SALT[i & 0x1F]; // salt code with (i % HASH_SALT.length)-th salt component code += this.comparators[i].hash(value.getFieldNotNull(keyPositions[i])); diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala index 4a2355c871ab0..26058304f4cb2 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala @@ -34,7 +34,7 @@ import org.apache.flink.api.scala._ object JoinProgs { - var NUM_PROGRAMS: Int = 19 + var NUM_PROGRAMS: Int = 20 def runProgram(progId: Int, resultPath: String): String = { progId match { @@ -324,14 +324,14 @@ object JoinProgs { */ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val ds1 = CollectionDataSets.getSmallPojoDataSet(env) - val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env) + val ds2 = CollectionDataSets.getSmallTuplebasedDataSetMatchingPojo(env) val joinDs = ds1.join(ds2).where("*").equalTo("*") joinDs.writeAsCsv(resultPath) env.setDegreeOfParallelism(1) env.execute() - "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" + "2 Second (20,200," + - "2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" + "3 Third (30,300,3000," + - "Three) 30000,(3,Third,30,300,3000,Three,30000)\n" + "1 First (10,100,1000,One) 10000,(10000,10,100,1000,One,1,First)\n" + + "2 Second (20,200,2000,Two) 20000,(20000,20,200,2000,Two,2,Second)\n" + + "3 Third (30,300,3000,Three) 30000,(30000,30,300,3000,Three,3,Third)\n" case _ => throw new IllegalArgumentException("Invalid program id: " + progId) diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala index ace195a460a5b..09e049b885f25 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala @@ -271,6 +271,15 @@ object CollectionDataSets { env.fromCollection(data) } + def getSmallTuplebasedDataSetMatchingPojo(env: ExecutionEnvironment): + DataSet[(Long, Integer, Integer, Long, String, Integer, String)] = { + val data = new mutable.MutableList[(Long, Integer, Integer, Long, String, Integer, String)] + data.+=((10000L, 10, 100, 1000L, "One", 1, "First")) + data.+=((20000L, 20, 200, 2000L, "Two", 2, "Second")) + data.+=((30000L, 30, 300, 3000L, "Three", 3, "Third")) + env.fromCollection(data) + } + def getPojoWithMultiplePojos(env: ExecutionEnvironment): DataSet[CollectionDataSets .PojoWithMultiplePojos] = { val data = new mutable.MutableList[CollectionDataSets diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java index f0e89df204ae0..ffc208c3fe43b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java @@ -390,7 +390,7 @@ public Tuple2 getKey(Tuple3 t) { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds = CollectionDataSets.getSmallPojoDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); DataSet coGroupDs = ds.coGroup(ds2) .where("nestedPojo.longNumber").equalTo(6).with(new CoGroupFunction, CustomType>() { private static final long serialVersionUID = 1L; @@ -425,7 +425,7 @@ public void coGroup( final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds = CollectionDataSets.getSmallPojoDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); DataSet coGroupDs = ds.coGroup(ds2) .where(new KeySelector>() { private static final long serialVersionUID = 1L; @@ -468,7 +468,7 @@ public void coGroup( final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds = CollectionDataSets.getSmallPojoDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); DataSet coGroupDs = ds.coGroup(ds2) .where(new KeySelector() { private static final long serialVersionUID = 1L; diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java index e8d8be951d107..bfae922b690d2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java @@ -48,7 +48,7 @@ @RunWith(Parameterized.class) public class JoinITCase extends JavaProgramTestBase { - private static int NUM_PROGRAMS = 21; + private static int NUM_PROGRAMS = 22; private int curProgId = config.getInteger("ProgramId", -1); private String resultPath; @@ -505,7 +505,7 @@ public Tuple2 getKey(Tuple5 t) final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds1 = CollectionDataSets.getSmallPojoDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); DataSet >> joinDs = ds1.join(ds2).where("nestedPojo.longNumber").equalTo("f6"); @@ -525,7 +525,7 @@ public Tuple2 getKey(Tuple5 t) final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds1 = CollectionDataSets.getSmallPojoDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); DataSet >> joinDs = ds1.join(ds2).where("nestedPojo.longNumber").equalTo(6); // <--- difference! @@ -544,7 +544,7 @@ public Tuple2 getKey(Tuple5 t) final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds1 = CollectionDataSets.getSmallPojoDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); DataSet >> joinDs = ds1.join(ds2).where("nestedPojo.longNumber", "number", "str").equalTo("f6","f0","f1"); @@ -565,7 +565,7 @@ public Tuple2 getKey(Tuple5 t) final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds1 = CollectionDataSets.getSmallPojoDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); DataSet >> joinDs = ds1.join(ds2).where("nestedPojo.longNumber", "number","nestedTupleWithCustom.f0").equalTo("f6","f0","f2"); @@ -586,7 +586,7 @@ public Tuple2 getKey(Tuple5 t) final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds1 = CollectionDataSets.getSmallPojoDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); DataSet >> joinDs = ds1.join(ds2).where("nestedTupleWithCustom.f0","nestedTupleWithCustom.f1.myInt","nestedTupleWithCustom.f1.myLong").equalTo("f2","f3","f4"); @@ -649,8 +649,8 @@ public Tuple2 getKey(Tuple5 t) final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds1 = CollectionDataSets.getSmallPojoDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); - DataSet >> joinDs = + DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSetMatchingPojo(env); + DataSet >> joinDs = ds1.join(ds2).where("*").equalTo("*"); joinDs.writeAsCsv(resultPath); @@ -658,9 +658,9 @@ public Tuple2 getKey(Tuple5 t) env.execute(); // return expected result - return "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" + - "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" + - "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n"; + return "1 First (10,100,1000,One) 10000,(10000,10,100,1000,One,1,First)\n"+ + "2 Second (20,200,2000,Two) 20000,(20000,20,200,2000,Two,2,Second)\n"+ + "3 Third (30,300,3000,Three) 30000,(30000,30,300,3000,Three,3,Third)\n"; } default: throw new IllegalArgumentException("Invalid program id: "+progId); diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java index 6757f66ab577b..0f8097abf2672 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java @@ -29,6 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.java.tuple.Tuple8; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -286,17 +287,32 @@ public String toString() { } } - public static DataSet> getSmallTuplebasedPojoMatchingDataSet(ExecutionEnvironment env) { + public static DataSet> getSmallTuplebasedDataSet(ExecutionEnvironment env) { List> data = new ArrayList>(); data.add(new Tuple7(1, "First", 10, 100, 1000L, "One", 10000L)); data.add(new Tuple7(2, "Second", 20, 200, 2000L, "Two", 20000L)); data.add(new Tuple7(3, "Third", 30, 300, 3000L, "Three", 30000L)); return env.fromCollection(data); } + + public static DataSet> getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env) { + List> data = + new ArrayList>(); + data.add(new Tuple7 + (10000L, 10, 100, 1000L, "One", 1, "First")); + + data.add(new Tuple7 + (20000L, 20, 200, 2000L, "Two", 2, "Second")); + + data.add(new Tuple7 + (30000L, 30, 300, 3000L, "Three", 3, "Third")); + + return env.fromCollection(data); + } public static DataSet getSmallPojoDataSet(ExecutionEnvironment env) { List data = new ArrayList(); - data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); + data.add(new POJO(1 /*number*/, "First" /*str*/, 10 /*f0*/, 100/*f1.myInt*/, 1000L/*f1.myLong*/, "One" /*f1.myString*/, 10000L /*nestedPojo.longNumber*/)); data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L)); data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L)); return env.fromCollection(data); @@ -320,7 +336,6 @@ public static class POJO { public String str; public Tuple2 nestedTupleWithCustom; public NestedPojo nestedPojo; - public Date date; public transient Long ignoreMe; public POJO(int i0, String s0, @@ -330,7 +345,6 @@ public POJO(int i0, String s0, this.str = s0; this.nestedTupleWithCustom = new Tuple2(i1, new CustomType(i2, l0, s1)); this.nestedPojo = new NestedPojo(); - this.date = new Date(); this.nestedPojo.longNumber = l1; }