From 9ff6366599127ddd1f54bb50e73ba8584e509564 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Thu, 16 Jun 2016 15:26:12 +0200 Subject: [PATCH 1/2] [FLINK-4083] [java,DataSet] Introduce closure cleaning in Join.where() and Join.equaltTo() --- .../api/java/operators/JoinOperator.java | 4 +- .../test/javaApiOperators/JoinITCase.java | 83 ++++++++++++++++++- 2 files changed, 83 insertions(+), 4 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java index 88c479b83a4a2..11f6c82e9b8b6 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java @@ -891,7 +891,7 @@ public JoinOperatorSetsPredicate where(String... fields) { @Override public JoinOperatorSetsPredicate where(KeySelector keySelector) { TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType()); - return new JoinOperatorSetsPredicate(new SelectorFunctionKeys<>(keySelector, input1.getType(), keyType)); + return new JoinOperatorSetsPredicate(new SelectorFunctionKeys<>(input1.clean(keySelector), input1.getType(), keyType)); } @@ -956,7 +956,7 @@ public DefaultJoin equalTo(String... fields) { @Override public DefaultJoin equalTo(KeySelector keySelector) { TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType()); - return createDefaultJoin(new SelectorFunctionKeys<>(keySelector, input2.getType(), keyType)); + return createDefaultJoin(new SelectorFunctionKeys<>(input2.clean(keySelector), input2.getType(), keyType)); } } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java index 0d8c80b5c6ded..56c7131e0ea4d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.List; +import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.distributions.DataDistribution; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.JoinFunction; @@ -398,8 +399,86 @@ public void testDefaultJoinOnTwoCustomTypeInputsWithKeyExtractors() throws Excep DataSet> joinDs = ds1.join(ds2) - .where(new KeySelector5()) - .equalTo(new KeySelector6()); + .where(new KeySelector5()) + .equalTo(new KeySelector6()); + + List> result = joinDs.collect(); + + String expected = "1,0,Hi,1,0,Hi\n" + + "2,1,Hello,2,1,Hello\n" + + "2,1,Hello,2,2,Hello world\n" + + "2,2,Hello world,2,1,Hello\n" + + "2,2,Hello world,2,2,Hello world\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testDefaultJoinOnTwoCustomTypeInputsWithInnerClassKeyExtractorsClosureCleaner() throws Exception { + /* + * (Default) Join on two custom type inputs with key extractors, implemented as inner classes to test closure + * cleaning + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet ds1 = CollectionDataSets.getCustomTypeDataSet(env); + DataSet ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env); + + DataSet> joinDs = + ds1.join(ds2) + .where(new KeySelector() { + @Override + public Integer getKey(CustomType value) { + return value.myInt; + } + }) + .equalTo(new KeySelector(){ + + @Override + public Integer getKey(CustomType value) throws Exception { + return value.myInt; + } + }); + + List> result = joinDs.collect(); + + String expected = "1,0,Hi,1,0,Hi\n" + + "2,1,Hello,2,1,Hello\n" + + "2,1,Hello,2,2,Hello world\n" + + "2,2,Hello world,2,1,Hello\n" + + "2,2,Hello world,2,2,Hello world\n"; + + compareResultAsTuples(result, expected); + } + + @Test(expected = InvalidProgramException.class) + public void testDefaultJoinOnTwoCustomTypeInputsWithInnerClassKeyExtractorsDisabledClosureCleaner() throws Exception { + /* + * (Default) Join on two custom type inputs with key extractors, check if disableing closure cleaning works + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableClosureCleaner(); + + DataSet ds1 = CollectionDataSets.getCustomTypeDataSet(env); + DataSet ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env); + + DataSet> joinDs = + ds1.join(ds2) + .where(new KeySelector() { + @Override + public Integer getKey(CustomType value) { + return value.myInt; + } + }) + .equalTo(new KeySelector(){ + + @Override + public Integer getKey(CustomType value) throws Exception { + return value.myInt; + } + }); List> result = joinDs.collect(); From 48dd3e6c17503f9da7feb33d4cc7a9c5f9d1037a Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Fri, 17 Jun 2016 16:51:00 +0200 Subject: [PATCH 2/2] [FLINK-4083] [java,DataSet] Review Feedback included. --- .../test/javaApiOperators/JoinITCase.java | 195 +++++++++--------- 1 file changed, 96 insertions(+), 99 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java index 56c7131e0ea4d..f47d3e3b880d0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java @@ -44,6 +44,7 @@ import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.util.Collector; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -54,7 +55,7 @@ @RunWith(Parameterized.class) public class JoinITCase extends MultipleProgramsTestBase { - public JoinITCase(TestExecutionMode mode){ + public JoinITCase(TestExecutionMode mode) { super(mode); } @@ -70,9 +71,9 @@ public void testUDFJoinOnTuplesWithKeyFieldPositions() throws Exception { DataSet> ds2 = CollectionDataSets.get5TupleDataSet(env); DataSet> joinDs = ds1.join(ds2) - .where(1) - .equalTo(1) - .with(new T3T5FlatJoin()); + .where(1) + .equalTo(1) + .with(new T3T5FlatJoin()); List> result = joinDs.collect(); @@ -95,9 +96,9 @@ public void testeUDFJoinOnTuplesWithMultipleKeyFieldPositions() throws Exception DataSet> ds2 = CollectionDataSets.get5TupleDataSet(env); DataSet> joinDs = ds1.join(ds2) - .where(0, 1) - .equalTo(0, 4) - .with(new T3T5FlatJoin()); + .where(0, 1) + .equalTo(0, 4) + .with(new T3T5FlatJoin()); List> result = joinDs.collect(); @@ -121,10 +122,10 @@ public void testDefaultJoinOnTuples() throws Exception { DataSet> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); DataSet> ds2 = CollectionDataSets.get5TupleDataSet(env); - DataSet,Tuple5>> joinDs = + DataSet, Tuple5>> joinDs = ds1.join(ds2) - .where(0) - .equalTo(2); + .where(0) + .equalTo(2); List, Tuple5>> result = joinDs.collect(); @@ -172,9 +173,9 @@ public void testJoinWithTiny() throws Exception { DataSet> ds2 = CollectionDataSets.get5TupleDataSet(env); DataSet> joinDs = ds1.joinWithTiny(ds2) - .where(1) - .equalTo(1) - .with(new T3T5FlatJoin()); + .where(1) + .equalTo(1) + .with(new T3T5FlatJoin()); List> result = joinDs.collect(); @@ -197,9 +198,9 @@ public void testJoinThatReturnsTheLeftInputObject() throws Exception { DataSet> ds2 = CollectionDataSets.get5TupleDataSet(env); DataSet> joinDs = ds1.join(ds2) - .where(1) - .equalTo(1) - .with(new LeftReturningJoin()); + .where(1) + .equalTo(1) + .with(new LeftReturningJoin()); List> result = joinDs.collect(); @@ -222,9 +223,9 @@ public void testJoinThatReturnsTheRightInputObject() throws Exception { DataSet> ds2 = CollectionDataSets.get5TupleDataSet(env); DataSet> joinDs = ds1.join(ds2) - .where(1) - .equalTo(1) - .with(new RightReturningJoin()); + .where(1) + .equalTo(1) + .with(new RightReturningJoin()); List> result = joinDs.collect(); @@ -249,10 +250,10 @@ public void testJoinWithBroadcastSet() throws Exception { DataSet> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); DataSet> joinDs = ds1.join(ds2) - .where(1) - .equalTo(4) - .with(new T3T5BCJoin()) - .withBroadcastSet(intDs, "ints"); + .where(1) + .equalTo(4) + .with(new T3T5BCJoin()) + .withBroadcastSet(intDs, "ints"); List> result = joinDs.collect(); @@ -266,7 +267,7 @@ public void testJoinWithBroadcastSet() throws Exception { @Test public void testJoinOnACustomTypeInputWithKeyExtractorAndATupleInputWithKeyFieldSelector() - throws Exception{ + throws Exception { /* * Join on a tuple input with key field selector and a custom type input with key extractor */ @@ -277,9 +278,9 @@ public void testJoinOnACustomTypeInputWithKeyExtractorAndATupleInputWithKeyField DataSet> ds2 = CollectionDataSets.get3TupleDataSet(env); DataSet> joinDs = ds1.join(ds2) - .where(new KeySelector1()) - .equalTo(0) - .with(new CustT3Join()); + .where(new KeySelector1()) + .equalTo(0) + .with(new CustT3Join()); List> result = joinDs.collect(); @@ -310,12 +311,12 @@ public void testProjectOnATuple1Input() throws Exception { DataSet> ds2 = CollectionDataSets.get5TupleDataSet(env); DataSet> joinDs = ds1.join(ds2) - .where(1) - .equalTo(1) - .projectFirst(2,1) - .projectSecond(3) - .projectFirst(0) - .projectSecond(4,1); + .where(1) + .equalTo(1) + .projectFirst(2, 1) + .projectSecond(3) + .projectFirst(0) + .projectSecond(4, 1); List> result = joinDs.collect(); @@ -338,12 +339,12 @@ public void testProjectJoinOnATuple2Input() throws Exception { DataSet> ds2 = CollectionDataSets.get5TupleDataSet(env); DataSet> joinDs = ds1.join(ds2) - .where(1) - .equalTo(1) - .projectSecond(3) - .projectFirst(2,1) - .projectSecond(4,1) - .projectFirst(0); + .where(1) + .equalTo(1) + .projectSecond(3) + .projectFirst(2, 1) + .projectSecond(4, 1) + .projectFirst(0); List> result = joinDs.collect(); @@ -367,8 +368,8 @@ public void testJoinOnATupleInputWithKeyFieldSelectorAndACustomTypeInputWithKeyE DataSet ds2 = CollectionDataSets.getCustomTypeDataSet(env); DataSet> joinDs = ds1.join(ds2) - .where(1).equalTo(new KeySelector2()) - .with(new T3CustJoin()); + .where(1).equalTo(new KeySelector2()) + .with(new T3CustJoin()); List> result = joinDs.collect(); @@ -433,7 +434,7 @@ public Integer getKey(CustomType value) { return value.myInt; } }) - .equalTo(new KeySelector(){ + .equalTo(new KeySelector() { @Override public Integer getKey(CustomType value) throws Exception { @@ -452,7 +453,7 @@ public Integer getKey(CustomType value) throws Exception { compareResultAsTuples(result, expected); } - @Test(expected = InvalidProgramException.class) + @Test public void testDefaultJoinOnTwoCustomTypeInputsWithInnerClassKeyExtractorsDisabledClosureCleaner() throws Exception { /* * (Default) Join on two custom type inputs with key extractors, check if disableing closure cleaning works @@ -463,32 +464,27 @@ public void testDefaultJoinOnTwoCustomTypeInputsWithInnerClassKeyExtractorsDisab DataSet ds1 = CollectionDataSets.getCustomTypeDataSet(env); DataSet ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env); - - DataSet> joinDs = - ds1.join(ds2) - .where(new KeySelector() { - @Override - public Integer getKey(CustomType value) { - return value.myInt; - } - }) - .equalTo(new KeySelector(){ - - @Override - public Integer getKey(CustomType value) throws Exception { - return value.myInt; - } - }); - - List> result = joinDs.collect(); - - String expected = "1,0,Hi,1,0,Hi\n" + - "2,1,Hello,2,1,Hello\n" + - "2,1,Hello,2,2,Hello world\n" + - "2,2,Hello world,2,1,Hello\n" + - "2,2,Hello world,2,2,Hello world\n"; - - compareResultAsTuples(result, expected); + boolean correctExceptionTriggered = false; + try { + DataSet> joinDs = + ds1.join(ds2) + .where(new KeySelector() { + @Override + public Integer getKey(CustomType value) { + return value.myInt; + } + }) + .equalTo(new KeySelector() { + + @Override + public Integer getKey(CustomType value) throws Exception { + return value.myInt; + } + }); + } catch (InvalidProgramException ex) { + correctExceptionTriggered = (ex.getCause() instanceof java.io.NotSerializableException); + } + Assert.assertTrue(correctExceptionTriggered); } public static class KeySelector5 implements KeySelector { @@ -517,9 +513,9 @@ public void testUDFJoinOnTuplesWithTupleReturningKeySelectors() throws Exception DataSet> ds2 = CollectionDataSets.get5TupleDataSet(env); DataSet> joinDs = ds1.join(ds2) - .where(new KeySelector3()) - .equalTo(new KeySelector4()) - .with(new T3T5FlatJoin()); + .where(new KeySelector3()) + .equalTo(new KeySelector4()) + .with(new T3T5FlatJoin()); List> result = joinDs.collect(); @@ -533,20 +529,20 @@ public void testUDFJoinOnTuplesWithTupleReturningKeySelectors() throws Exception compareResultAsTuples(result, expected); } - public static class KeySelector3 implements KeySelector, Tuple2> { + public static class KeySelector3 implements KeySelector, Tuple2> { private static final long serialVersionUID = 1L; @Override - public Tuple2 getKey(Tuple3 t) { + public Tuple2 getKey(Tuple3 t) { return new Tuple2(t.f0, t.f1); } } - public static class KeySelector4 implements KeySelector, Tuple2> { + public static class KeySelector4 implements KeySelector, Tuple2> { private static final long serialVersionUID = 1L; @Override - public Tuple2 getKey(Tuple5 t) { + public Tuple2 getKey(Tuple5 t) { return new Tuple2(t.f0, t.f4); } } @@ -560,7 +556,7 @@ public void testJoinNestedPojoAgainstTupleSelectedUsingString() throws Exception DataSet ds1 = CollectionDataSets.getSmallPojoDataSet(env); DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); - DataSet >> joinDs = + DataSet>> joinDs = ds1.join(ds2).where("nestedPojo.longNumber").equalTo("f6"); List>> result = joinDs.collect(); @@ -581,7 +577,7 @@ public void testJoinNestedPojoAgainstTupleSelectedUsingInteger() throws Exceptio DataSet ds1 = CollectionDataSets.getSmallPojoDataSet(env); DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); - DataSet >> joinDs = + DataSet>> joinDs = ds1.join(ds2).where("nestedPojo.longNumber").equalTo(6); // <--- difference! List>> result = joinDs.collect(); @@ -602,8 +598,8 @@ public void testSelectingMultipleFieldsUsingExpressionLanguage() throws Exceptio DataSet ds1 = CollectionDataSets.getSmallPojoDataSet(env); DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); - DataSet >> joinDs = - ds1.join(ds2).where("nestedPojo.longNumber", "number", "str").equalTo("f6","f0","f1"); + DataSet>> joinDs = + ds1.join(ds2).where("nestedPojo.longNumber", "number", "str").equalTo("f6", "f0", "f1"); env.setParallelism(1); List>> result = joinDs.collect(); @@ -624,8 +620,8 @@ public void testNestedIntoTuple() throws Exception { DataSet ds1 = CollectionDataSets.getSmallPojoDataSet(env); DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); - DataSet >> joinDs = - ds1.join(ds2).where("nestedPojo.longNumber", "number","nestedTupleWithCustom.f0").equalTo("f6","f0","f2"); + DataSet>> joinDs = + ds1.join(ds2).where("nestedPojo.longNumber", "number", "nestedTupleWithCustom.f0").equalTo("f6", "f0", "f2"); env.setParallelism(1); List>> result = joinDs.collect(); @@ -646,8 +642,8 @@ public void testNestedIntoTupleIntoPojo() throws Exception { DataSet ds1 = CollectionDataSets.getSmallPojoDataSet(env); DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); - DataSet >> joinDs = - ds1.join(ds2).where("nestedTupleWithCustom.f0","nestedTupleWithCustom.f1.myInt","nestedTupleWithCustom.f1.myLong").equalTo("f2","f3","f4"); + DataSet>> joinDs = + ds1.join(ds2).where("nestedTupleWithCustom.f0", "nestedTupleWithCustom.f1.myInt", "nestedTupleWithCustom.f1.myLong").equalTo("f2", "f3", "f4"); env.setParallelism(1); List>> result = joinDs.collect(); @@ -668,7 +664,7 @@ public void testNonPojoToVerifyFullTupleKeys() throws Exception { DataSet, String>> ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env); DataSet, String>> ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env); - DataSet, String>, Tuple2, String> >> joinDs = + DataSet, String>, Tuple2, String>>> joinDs = ds1.join(ds2).where(0).equalTo("f0.f0", "f0.f1"); // key is now Tuple2 env.setParallelism(1); @@ -691,7 +687,7 @@ public void testNonPojoToVerifyNestedTupleElementSelection() throws Exception { DataSet, String>> ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env); DataSet, String>> ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env); - DataSet, String>, Tuple2, String> >> joinDs = + DataSet, String>, Tuple2, String>>> joinDs = ds1.join(ds2).where("f0.f0").equalTo("f0.f0"); // key is now Integer from Tuple2 env.setParallelism(1); @@ -713,14 +709,14 @@ public void testFullPojoWithFullTuple() throws Exception { DataSet ds1 = CollectionDataSets.getSmallPojoDataSet(env); DataSet> ds2 = CollectionDataSets.getSmallTuplebasedDataSetMatchingPojo(env); - DataSet >> joinDs = + DataSet>> joinDs = ds1.join(ds2).where("*").equalTo("*"); env.setParallelism(1); List>> result = joinDs.collect(); String expected = "1 First (10,100,1000,One) 10000,(10000,10,100,1000,One,1,First)\n" + - "2 Second (20,200,2000,Two) 20000,(20000,20,200,2000,Two,2,Second)\n"+ + "2 Second (20,200,2000,Two) 20000,(20000,20,200,2000,Two,2,Second)\n" + "3 Third (30,300,3000,Three) 30000,(30000,30,300,3000,Three,3,Third)\n"; compareResultAsTuples(result, expected); @@ -819,10 +815,10 @@ public static class T3T5FlatJoin implements FlatJoinFunction first, - Tuple5 second, - Collector> out) { + Tuple5 second, + Collector> out) { - out.collect (new Tuple2 (first.f2, second.f3)); + out.collect(new Tuple2(first.f2, second.f3)); } } @@ -831,7 +827,7 @@ public static class LeftReturningJoin implements JoinFunction join(Tuple3 first, - Tuple5 second) { + Tuple5 second) { return first; } @@ -841,7 +837,7 @@ public static class RightReturningJoin implements JoinFunction join(Tuple3 first, - Tuple5 second) { + Tuple5 second) { return second; } @@ -856,7 +852,7 @@ public void open(Configuration config) { Collection ints = this.getRuntimeContext().getBroadcastVariable("ints"); int sum = 0; - for(Integer i : ints) { + for (Integer i : ints) { sum += i; } broadcast = sum; @@ -875,7 +871,7 @@ public Tuple3 join( @Override public void join(Tuple3 first, Tuple5 second, Collector> out) throws Exception { - out.collect(new Tuple3 (first.f2, second.f3, broadcast)); + out.collect(new Tuple3(first.f2, second.f3, broadcast)); } } @@ -883,7 +879,7 @@ public static class T3CustJoin implements JoinFunction join(Tuple3 first, - CustomType second) { + CustomType second) { return new Tuple2(first.f2, second.myString); } @@ -897,7 +893,7 @@ public Tuple2 join(CustomType first, Tuple3(first.myString, second.f2); } } - + public static class TestDistribution implements DataDistribution { public Object boundaries[][] = new Object[][]{ new Object[]{2, 2L}, @@ -906,7 +902,8 @@ public static class TestDistribution implements DataDistribution { new Object[]{21, 6L} }; - public TestDistribution() {} + public TestDistribution() { + } @Override public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) { @@ -932,7 +929,7 @@ public void write(DataOutputView out) throws IOException { public void read(DataInputView in) throws IOException { } - + @Override public boolean equals(Object obj) { return obj instanceof TestDistribution;