From 4bcacd0d4bc69a86b8b4ffb38d9b046db0cabdf1 Mon Sep 17 00:00:00 2001 From: Fabian Hueske Date: Wed, 4 Mar 2015 18:49:22 +0100 Subject: [PATCH] [FLINK-1628] Fix partitioning properties for Joins and CoGroups. --- .../dataproperties/GlobalProperties.java | 8 + .../dataproperties/PartitioningProperty.java | 2 +- .../RequestedGlobalProperties.java | 59 +- .../operators/AbstractJoinDescriptor.java | 49 +- .../compiler/operators/CoGroupDescriptor.java | 72 +- .../operators/OperatorDescriptorDual.java | 44 +- .../operators/SortMergeJoinDescriptor.java | 13 +- .../compiler/FeedbackPropertiesMatchTest.java | 8 +- .../compiler/PartitioningReusageTest.java | 859 ++++++++++++++++++ .../GlobalPropertiesMatchingTest.java | 152 +++- ...equestedGlobalPropertiesFilteringTest.java | 2 +- .../api/common/operators/util/FieldList.java | 15 +- 12 files changed, 1224 insertions(+), 59 deletions(-) create mode 100644 flink-compiler/src/test/java/org/apache/flink/compiler/PartitioningReusageTest.java diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java index ca7e64d68dbc2..31e13ae8fa6c0 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java @@ -183,6 +183,14 @@ public boolean isPartitionedOnFields(FieldSet fields) { return false; } } + + public boolean isExactlyPartitionedOnFields(FieldList fields) { + if (this.partitioning.isPartitionedOnKey() && fields.isExactMatch(this.partitioningFields)) { + return true; + } else { + return false; + } + } public boolean matchesOrderedPartitioning(Ordering o) { if (this.partitioning == PartitioningProperty.RANGE_PARTITIONED) { diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java index 2b66ea0092744..45e323e03b01a 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java @@ -71,7 +71,7 @@ public enum PartitioningProperty { * false otherwise. */ public boolean isPartitioned() { - return this != FULL_REPLICATION && this != FORCED_REBALANCED; + return this != FULL_REPLICATION && this != FORCED_REBALANCED && this != ANY_DISTRIBUTION; } /** diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java index daaa7dcc3be2a..f4334fff3edd8 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.compiler.CompilerException; import org.apache.flink.compiler.plan.Channel; @@ -46,7 +47,7 @@ public final class RequestedGlobalProperties implements Cloneable { private DataDistribution dataDistribution; // optional data distribution, for a range partitioning private Partitioner customPartitioner; // optional, partitioner for custom partitioning - + // -------------------------------------------------------------------------------------------- /** @@ -60,7 +61,9 @@ public RequestedGlobalProperties() { /** * Sets the partitioning property for the global properties. - * + * If the partitionFields are provided as {@link FieldSet} also subsets are valid, + * if provided as {@link FieldList} partitioning fields must exactly match incl. order. + * * @param partitionedFields */ public void setHashPartitioned(FieldSet partitionedFields) { @@ -86,7 +89,14 @@ public void setRangePartitioned(Ordering ordering, DataDistribution dataDistribu this.partitioningFields = null; this.dataDistribution = dataDistribution; } - + + /** + * Sets the partitioning property for the global properties. + * If the partitionFields are provided as {@link FieldSet} also subsets are valid, + * if provided as {@link FieldList} partitioning fields must exactly match incl. order. + * + * @param partitionedFields + */ public void setAnyPartitioning(FieldSet partitionedFields) { if (partitionedFields == null) { throw new NullPointerException(); @@ -119,7 +129,14 @@ public void setForceRebalancing() { this.partitioningFields = null; this.ordering = null; } - + + /** + * Sets the partitioning property for the global properties. + * If the partitionFields are provided as {@link FieldSet} also subsets are valid, + * if provided as {@link FieldList} partitioning fields must exactly match incl. order. + * + * @param partitionedFields + */ public void setCustomPartitioned(FieldSet partitionedFields, Partitioner partitioner) { if (partitionedFields == null || partitioner == null) { throw new NullPointerException(); @@ -130,7 +147,7 @@ public void setCustomPartitioned(FieldSet partitionedFields, Partitioner part this.ordering = null; this.customPartitioner = partitioner; } - + /** * Gets the partitioning property. * @@ -148,7 +165,7 @@ public PartitioningProperty getPartitioning() { public FieldSet getPartitionedFields() { return this.partitioningFields; } - + /** * Gets the key order. * @@ -220,7 +237,13 @@ public RequestedGlobalProperties filterBySemanticProperties(SemanticProperties p return null; case HASH_PARTITIONED: case ANY_PARTITIONING: - FieldSet newFields = new FieldSet(); + FieldSet newFields; + if(this.partitioningFields instanceof FieldList) { + newFields = new FieldList(); + } else { + newFields = new FieldSet(); + } + for (Integer targetField : this.partitioningFields) { int sourceField = props.getForwardingSourceField(input, targetField); if (sourceField >= 0) { @@ -274,11 +297,11 @@ else if (this.partitioning == PartitioningProperty.RANDOM_PARTITIONED) { return true; } else if (this.partitioning == PartitioningProperty.ANY_PARTITIONING) { - return props.isPartitionedOnFields(this.partitioningFields); + return checkCompatiblePartitioningFields(props); } else if (this.partitioning == PartitioningProperty.HASH_PARTITIONED) { return props.getPartitioning() == PartitioningProperty.HASH_PARTITIONED && - props.isPartitionedOnFields(this.partitioningFields); + checkCompatiblePartitioningFields(props); } else if (this.partitioning == PartitioningProperty.RANGE_PARTITIONED) { return props.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED && @@ -289,14 +312,15 @@ else if (this.partitioning == PartitioningProperty.FORCED_REBALANCED) { } else if (this.partitioning == PartitioningProperty.CUSTOM_PARTITIONING) { return props.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING && - props.isPartitionedOnFields(this.partitioningFields) && + checkCompatiblePartitioningFields(props) && props.getCustomPartitioner().equals(this.customPartitioner); + } else { throw new CompilerException("Properties matching logic leaves open cases."); } } - + /** * Parameterizes the ship strategy fields of a channel such that the channel produces the desired global properties. * @@ -307,8 +331,8 @@ public void parameterizeChannel(Channel channel, boolean globalDopChange) { // safety check. Fully replicated input must be preserved. if(channel.getSource().getGlobalProperties().isFullyReplicated() && - ( this.partitioning != PartitioningProperty.FULL_REPLICATION || - this.partitioning != PartitioningProperty.ANY_DISTRIBUTION)) { + !(this.partitioning == PartitioningProperty.FULL_REPLICATION || + this.partitioning == PartitioningProperty.ANY_DISTRIBUTION)) { throw new CompilerException("Fully replicated input must be preserved and may not be converted into another global property."); } @@ -397,4 +421,13 @@ public RequestedGlobalProperties clone() { throw new RuntimeException(cnse); } } + + private boolean checkCompatiblePartitioningFields(GlobalProperties props) { + if(this.partitioningFields instanceof FieldList) { + // partitioningFields as FieldList requires strict checking! + return props.isExactlyPartitionedOnFields((FieldList)this.partitioningFields); + } else { + return props.isPartitionedOnFields(this.partitioningFields); + } + } } diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java index b1c30794d54cf..21dd3f425856d 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java @@ -64,15 +64,15 @@ protected List createPossibleGlobalProperties() { if (repartitionAllowed) { // partition both (hash or custom) if (this.customPartitioner == null) { - + // we accept compatible partitionings of any type RequestedGlobalProperties partitioned_left_any = new RequestedGlobalProperties(); RequestedGlobalProperties partitioned_right_any = new RequestedGlobalProperties(); partitioned_left_any.setAnyPartitioning(this.keys1); partitioned_right_any.setAnyPartitioning(this.keys2); pairs.add(new GlobalPropertiesPair(partitioned_left_any, partitioned_right_any)); - - // we also explicitly add hash partitioning, as a fallback, if the any-pairs do not match + + // add strict hash partitioning of both inputs on their full key sets RequestedGlobalProperties partitioned_left_hash = new RequestedGlobalProperties(); RequestedGlobalProperties partitioned_right_hash = new RequestedGlobalProperties(); partitioned_left_hash.setHashPartitioned(this.keys1); @@ -82,10 +82,10 @@ protected List createPossibleGlobalProperties() { else { RequestedGlobalProperties partitioned_left = new RequestedGlobalProperties(); partitioned_left.setCustomPartitioned(this.keys1, this.customPartitioner); - + RequestedGlobalProperties partitioned_right = new RequestedGlobalProperties(); partitioned_right.setCustomPartitioned(this.keys2, this.customPartitioner); - + return Collections.singletonList(new GlobalPropertiesPair(partitioned_left, partitioned_right)); } @@ -130,10 +130,40 @@ public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlob GlobalProperties produced1, GlobalProperties produced2) { if (requested1.getPartitioning().isPartitionedOnKey() && requested2.getPartitioning().isPartitionedOnKey()) { - return produced1.getPartitioning() == produced2.getPartitioning() && - (produced1.getCustomPartitioner() == null ? - produced2.getCustomPartitioner() == null : - produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner())); + + if(produced1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED && + produced2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { + + // both are hash partitioned, check that partitioning fields are equivalently chosen + return checkEquivalentFieldPositionsInKeyFields( + produced1.getPartitioningFields(), produced2.getPartitioningFields()); + + } + else if(produced1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED && + produced2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) { + + // both are range partitioned, check that partitioning fields are equivalently chosen + return checkEquivalentFieldPositionsInKeyFields( + produced1.getPartitioningFields(), produced2.getPartitioningFields()); + + } + else if(produced1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING && + produced2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING) { + + // both use a custom partitioner. Check that both keys are exactly as specified and that both the same partitioner + return produced1.getPartitioningFields().isExactMatch(this.keys1) && + produced2.getPartitioningFields().isExactMatch(this.keys2) && + produced1.getCustomPartitioner() != null && produced2.getCustomPartitioner() != null && + produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner()); + + } + else { + + // no other partitioning valid, incl. ANY_PARTITIONING. + // For joins we must ensure that both sides are exactly identically partitioned, ANY is not good enough. + return false; + } + } else { return true; } @@ -151,4 +181,5 @@ public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProp gp.clearUniqueFieldCombinations(); return gp; } + } diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java index ff4ca6e0c721c..f8d99f8f69630 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java @@ -99,27 +99,31 @@ public DriverStrategy getStrategy() { @Override protected List createPossibleGlobalProperties() { + if (this.customPartitioner == null) { + + // we accept compatible partitionings of any type RequestedGlobalProperties partitioned_left_any = new RequestedGlobalProperties(); - RequestedGlobalProperties partitioned_left_hash = new RequestedGlobalProperties(); - partitioned_left_any.setAnyPartitioning(this.keys1); - partitioned_left_hash.setHashPartitioned(this.keys1); - RequestedGlobalProperties partitioned_right_any = new RequestedGlobalProperties(); - RequestedGlobalProperties partitioned_right_hash = new RequestedGlobalProperties(); + partitioned_left_any.setAnyPartitioning(this.keys1); partitioned_right_any.setAnyPartitioning(this.keys2); + + // add strict hash partitioning of both inputs on their full key sets + RequestedGlobalProperties partitioned_left_hash = new RequestedGlobalProperties(); + RequestedGlobalProperties partitioned_right_hash = new RequestedGlobalProperties(); + partitioned_left_hash.setHashPartitioned(this.keys1); partitioned_right_hash.setHashPartitioned(this.keys2); - + return Arrays.asList(new GlobalPropertiesPair(partitioned_left_any, partitioned_right_any), new GlobalPropertiesPair(partitioned_left_hash, partitioned_right_hash)); } else { RequestedGlobalProperties partitioned_left = new RequestedGlobalProperties(); partitioned_left.setCustomPartitioned(this.keys1, this.customPartitioner); - + RequestedGlobalProperties partitioned_right = new RequestedGlobalProperties(); partitioned_right.setCustomPartitioned(this.keys2, this.customPartitioner); - + return Collections.singletonList(new GlobalPropertiesPair(partitioned_left, partitioned_right)); } } @@ -135,10 +139,40 @@ protected List createPossibleLocalProperties() { public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2, GlobalProperties produced1, GlobalProperties produced2) { - return produced1.getPartitioning() == produced2.getPartitioning() && - (produced1.getCustomPartitioner() == null ? - produced2.getCustomPartitioner() == null : - produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner())); + + if(produced1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED && + produced2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { + + // both are hash partitioned, check that partitioning fields are equivalently chosen + return checkEquivalentFieldPositionsInKeyFields( + produced1.getPartitioningFields(), produced2.getPartitioningFields()); + + } + else if(produced1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED && + produced2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) { + + // both are range partitioned, check that partitioning fields are equivalently chosen + return checkEquivalentFieldPositionsInKeyFields( + produced1.getPartitioningFields(), produced2.getPartitioningFields()); + + } + else if(produced1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING && + produced2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING) { + + // both use a custom partitioner. Check that both keys are exactly as specified and that both the same partitioner + return produced1.getPartitioningFields().isExactMatch(this.keys1) && + produced2.getPartitioningFields().isExactMatch(this.keys2) && + produced1.getCustomPartitioner() != null && produced2.getCustomPartitioner() != null && + produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner()); + + } + else { + + // no other partitioning valid, incl. ANY_PARTITIONING. + // For co-groups we must ensure that both sides are exactly identically partitioned, ANY is not good enough. + return false; + } + } @Override @@ -150,12 +184,17 @@ public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLoca Ordering prod1 = produced1.getOrdering(); Ordering prod2 = produced2.getOrdering(); - if (prod1 == null || prod2 == null || prod1.getNumberOfFields() < numRelevantFields || - prod2.getNumberOfFields() < prod2.getNumberOfFields()) - { + if (prod1 == null || prod2 == null) { throw new CompilerException("The given properties do not meet this operators requirements."); } - + + // check that order of fields is equivalent + if (!checkEquivalentFieldPositionsInKeyFields( + prod1.getInvolvedIndexes(), prod2.getInvolvedIndexes(), numRelevantFields)) { + return false; + } + + // check that order directions are equivalent for (int i = 0; i < numRelevantFields; i++) { if (prod1.getOrder(i) != prod2.getOrder(i)) { return false; @@ -196,4 +235,5 @@ public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperti LocalProperties comb = LocalProperties.combine(in1, in2); return comb.clearUniqueFieldSets(); } + } diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java index 8eca16e588003..f72e6b53b3a6a 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.compiler.CompilerException; import org.apache.flink.compiler.dag.TwoInputNode; import org.apache.flink.compiler.dataproperties.GlobalProperties; import org.apache.flink.compiler.dataproperties.LocalProperties; @@ -81,7 +82,48 @@ public abstract boolean areCoFulfilled(RequestedLocalProperties requested1, Requ public abstract GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2); public abstract LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2); - + + protected boolean checkEquivalentFieldPositionsInKeyFields(FieldList fields1, FieldList fields2) { + + // check number of produced partitioning fields + if(fields1.size() != fields2.size()) { + return false; + } else { + return checkEquivalentFieldPositionsInKeyFields(fields1, fields2, fields1.size()); + } + } + + protected boolean checkEquivalentFieldPositionsInKeyFields(FieldList fields1, FieldList fields2, int numRelevantFields) { + + // check number of produced partitioning fields + if(fields1.size() < numRelevantFields || fields2.size() < numRelevantFields) { + return false; + } + else { + for(int i=0; i> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> joined = set1 + .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(0,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + + } + + @Test + public void noPreviousPartitioningJoin2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> joined = set1 + .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + + } + + @Test + public void reuseSinglePartitioningJoin1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> joined = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(0,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + + } + + @Test + public void reuseSinglePartitioningJoin2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> joined = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseSinglePartitioningJoin3() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> joined = set1 + .join(set2.partitionByHash(2, 1) + .map(new MockMapper()) + .withForwardedFields("2;1"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseSinglePartitioningJoin4() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> joined = set1 + .partitionByHash(0) + .map(new MockMapper()).withForwardedFields("0") + .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseSinglePartitioningJoin5() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> joined = set1 + .join(set2.partitionByHash(2) + .map(new MockMapper()) + .withForwardedFields("2"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseBothPartitioningJoin1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> joined = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .join(set2.partitionByHash(0,1) + .map(new MockMapper()) + .withForwardedFields("0;1"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(0,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + + @Test + public void reuseBothPartitioningJoin2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> joined = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .join(set2.partitionByHash(1,2) + .map(new MockMapper()) + .withForwardedFields("1;2"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseBothPartitioningJoin3() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> joined = set1 + .partitionByHash(0) + .map(new MockMapper()).withForwardedFields("0") + .join(set2.partitionByHash(2,1) + .map(new MockMapper()) + .withForwardedFields("2;1"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseBothPartitioningJoin4() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> joined = set1 + .partitionByHash(0,2) + .map(new MockMapper()).withForwardedFields("0;2") + .join(set2.partitionByHash(1) + .map(new MockMapper()) + .withForwardedFields("1"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,2).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseBothPartitioningJoin5() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> joined = set1 + .partitionByHash(2) + .map(new MockMapper()).withForwardedFields("2") + .join(set2.partitionByHash(1) + .map(new MockMapper()) + .withForwardedFields("1"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,2).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseBothPartitioningJoin6() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> joined = set1 + .partitionByHash(0) + .map(new MockMapper()).withForwardedFields("0") + .join(set2.partitionByHash(1) + .map(new MockMapper()) + .withForwardedFields("1"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,2).equalTo(1,2).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseBothPartitioningJoin7() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> joined = set1 + .partitionByHash(2) + .map(new MockMapper()).withForwardedFields("2") + .join(set2.partitionByHash(1) + .map(new MockMapper()) + .withForwardedFields("1"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,2).equalTo(1,2).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + + @Test + public void noPreviousPartitioningCoGroup1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> coGrouped = set1 + .coGroup(set2) + .where(0,1).equalTo(0,1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + + } + + @Test + public void noPreviousPartitioningCoGroup2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> coGrouped = set1 + .coGroup(set2) + .where(0,1).equalTo(2,1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + + } + + @Test + public void reuseSinglePartitioningCoGroup1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> coGrouped = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .coGroup(set2) + .where(0,1).equalTo(0,1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + + } + + @Test + public void reuseSinglePartitioningCoGroup2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> coGrouped = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .coGroup(set2) + .where(0,1).equalTo(2,1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseSinglePartitioningCoGroup3() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> coGrouped = set1 + .coGroup(set2.partitionByHash(2, 1) + .map(new MockMapper()) + .withForwardedFields("2;1")) + .where(0,1).equalTo(2, 1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseSinglePartitioningCoGroup4() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> coGrouped = set1 + .partitionByHash(0) + .map(new MockMapper()).withForwardedFields("0") + .coGroup(set2) + .where(0, 1).equalTo(2, 1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseSinglePartitioningCoGroup5() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> coGrouped = set1 + .coGroup(set2.partitionByHash(2) + .map(new MockMapper()) + .withForwardedFields("2")) + .where(0,1).equalTo(2,1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseBothPartitioningCoGroup1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> coGrouped = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .coGroup(set2.partitionByHash(0, 1) + .map(new MockMapper()) + .withForwardedFields("0;1")) + .where(0, 1).equalTo(0, 1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + + @Test + public void reuseBothPartitioningCoGroup2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> coGrouped = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .coGroup(set2.partitionByHash(1, 2) + .map(new MockMapper()) + .withForwardedFields("1;2")) + .where(0, 1).equalTo(2, 1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseBothPartitioningCoGroup3() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> coGrouped = set1 + .partitionByHash(0) + .map(new MockMapper()).withForwardedFields("0") + .coGroup(set2.partitionByHash(2, 1) + .map(new MockMapper()) + .withForwardedFields("2;1")) + .where(0, 1).equalTo(2, 1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseBothPartitioningCoGroup4() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> coGrouped = set1 + .partitionByHash(0,2) + .map(new MockMapper()).withForwardedFields("0;2") + .coGroup(set2.partitionByHash(1) + .map(new MockMapper()) + .withForwardedFields("1")) + .where(0, 2).equalTo(2, 1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseBothPartitioningCoGroup5() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> coGrouped = set1 + .partitionByHash(2) + .map(new MockMapper()).withForwardedFields("2") + .coGroup(set2.partitionByHash(1) + .map(new MockMapper()) + .withForwardedFields("1")) + .where(0, 2).equalTo(2, 1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseBothPartitioningCoGroup6() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> coGrouped = set1 + .partitionByHash(2) + .map(new MockMapper()).withForwardedFields("2") + .coGroup(set2.partitionByHash(2) + .map(new MockMapper()) + .withForwardedFields("2")) + .where(0, 2).equalTo(1, 2).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseBothPartitioningCoGroup7() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet> coGrouped = set1 + .partitionByHash(2) + .map(new MockMapper()).withForwardedFields("2") + .coGroup(set2.partitionByHash(1) + .map(new MockMapper()) + .withForwardedFields("1")) + .where(0, 2).equalTo(1, 2).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + + + private void checkValidJoinInputProperties(DualInputPlanNode join) { + + GlobalProperties inProps1 = join.getInput1().getGlobalProperties(); + GlobalProperties inProps2 = join.getInput2().getGlobalProperties(); + + if(inProps1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED && + inProps2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { + + // check that both inputs are hash partitioned on the same fields + FieldList pFields1 = inProps1.getPartitioningFields(); + FieldList pFields2 = inProps2.getPartitioningFields(); + + assertTrue("Inputs are not the same number of fields. Input 1: "+pFields1+", Input 2: "+pFields2, + pFields1.size() == pFields2.size()); + + FieldList reqPFields1 = join.getKeysForInput1(); + FieldList reqPFields2 = join.getKeysForInput2(); + + for(int i=0; i, Tuple3> { + @Override + public Tuple3 map(Tuple3 value) throws Exception { + return null; + } + } + + public static class MockJoin implements JoinFunction, + Tuple3, Tuple3> { + + @Override + public Tuple3 join(Tuple3 first, Tuple3 second) throws Exception { + return null; + } + } + + public static class MockCoGroup implements CoGroupFunction, + Tuple3, Tuple3> { + + @Override + public void coGroup(Iterable> first, Iterable> second, + Collector> out) throws Exception { + + } + } + +} + diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java index 1890597f27f2e..f6ae01af59b0f 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java @@ -42,30 +42,34 @@ public void testMatchingAnyPartitioning() { GlobalProperties gp1 = new GlobalProperties(); gp1.setAnyPartitioning(new FieldList(2, 6)); assertTrue(req.isMetBy(gp1)); - + GlobalProperties gp2 = new GlobalProperties(); gp2.setAnyPartitioning(new FieldList(6, 2)); assertTrue(req.isMetBy(gp2)); - + GlobalProperties gp3 = new GlobalProperties(); - gp3.setAnyPartitioning(new FieldList(6, 1)); + gp3.setAnyPartitioning(new FieldList(6, 2, 4)); assertFalse(req.isMetBy(gp3)); - + GlobalProperties gp4 = new GlobalProperties(); - gp4.setAnyPartitioning(new FieldList(2)); - assertTrue(req.isMetBy(gp4)); + gp4.setAnyPartitioning(new FieldList(6, 1)); + assertFalse(req.isMetBy(gp4)); + + GlobalProperties gp5 = new GlobalProperties(); + gp5.setAnyPartitioning(new FieldList(2)); + assertTrue(req.isMetBy(gp5)); } - + // match hash partitioning { GlobalProperties gp1 = new GlobalProperties(); gp1.setHashPartitioned(new FieldList(2, 6)); assertTrue(req.isMetBy(gp1)); - + GlobalProperties gp2 = new GlobalProperties(); gp2.setHashPartitioned(new FieldList(6, 2)); assertTrue(req.isMetBy(gp2)); - + GlobalProperties gp3 = new GlobalProperties(); gp3.setHashPartitioned(new FieldList(6, 1)); assertFalse(req.isMetBy(gp3)); @@ -154,6 +158,136 @@ public void testMatchingCustomPartitioning() { fail(e.getMessage()); } } + + @Test + public void testStrictlyMatchingAnyPartitioning() { + + RequestedGlobalProperties req = new RequestedGlobalProperties(); + req.setAnyPartitioning(new FieldList(6, 2)); + + // match any partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setAnyPartitioning(new FieldList(6, 2)); + assertTrue(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setAnyPartitioning(new FieldList(2, 6)); + assertFalse(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setAnyPartitioning(new FieldList(6, 2, 3)); + assertFalse(req.isMetBy(gp3)); + + GlobalProperties gp4 = new GlobalProperties(); + gp3.setAnyPartitioning(new FieldList(6, 1)); + assertFalse(req.isMetBy(gp3)); + + GlobalProperties gp5 = new GlobalProperties(); + gp4.setAnyPartitioning(new FieldList(2)); + assertFalse(req.isMetBy(gp4)); + } + + // match hash partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setHashPartitioned(new FieldList(6, 2)); + assertTrue(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setHashPartitioned(new FieldList(2, 6)); + assertFalse(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setHashPartitioned(new FieldList(6, 1)); + assertFalse(req.isMetBy(gp3)); + } + + // match range partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING)); + assertTrue(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING)); + assertFalse(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING)); + assertFalse(req.isMetBy(gp3)); + + GlobalProperties gp4 = new GlobalProperties(); + gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING)); + assertFalse(req.isMetBy(gp4)); + } + + } + + @Test + public void testStrictlyMatchingHashPartitioning() { + + RequestedGlobalProperties req = new RequestedGlobalProperties(); + req.setHashPartitioned(new FieldList(6, 2)); + + // match any partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setAnyPartitioning(new FieldList(6, 2)); + assertFalse(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setAnyPartitioning(new FieldList(2, 6)); + assertFalse(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setAnyPartitioning(new FieldList(6, 1)); + assertFalse(req.isMetBy(gp3)); + + GlobalProperties gp4 = new GlobalProperties(); + gp4.setAnyPartitioning(new FieldList(2)); + assertFalse(req.isMetBy(gp4)); + } + + // match hash partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setHashPartitioned(new FieldList(6, 2)); + assertTrue(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setHashPartitioned(new FieldList(2, 6)); + assertFalse(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setHashPartitioned(new FieldList(6, 1)); + assertFalse(req.isMetBy(gp3)); + + GlobalProperties gp4 = new GlobalProperties(); + gp4.setHashPartitioned(new FieldList(6, 2, 0)); + assertFalse(req.isMetBy(gp4)); + } + + // match range partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING)); + assertFalse(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING)); + assertFalse(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING)); + assertFalse(req.isMetBy(gp3)); + + GlobalProperties gp4 = new GlobalProperties(); + gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING)); + assertFalse(req.isMetBy(gp4)); + } + + } // -------------------------------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java index 3f9c0db4e9cde..0bb72b81f0347 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java @@ -423,7 +423,7 @@ public void testInvalidInputIndex() { SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo); RequestedGlobalProperties gprops = new RequestedGlobalProperties(); - gprops.setHashPartitioned(new FieldList(0,1)); + gprops.setHashPartitioned(new FieldSet(0,1)); gprops.filterBySemanticProperties(sprops, 1); } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java index 1a76d49dc24e2..ae0a722992b3f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java @@ -117,7 +117,7 @@ public Integer get(int pos) { public FieldList toFieldList() { return this; } - + // -------------------------------------------------------------------------------------------- @Override @@ -158,6 +158,19 @@ public boolean isValidUnorderedPrefix(FieldSet set) { } return true; } + + public boolean isExactMatch(FieldList list) { + if (this.size() != list.size()) { + return false; + } else { + for (int i = 0; i < this.size(); i++) { + if (this.get(i) != list.get(i)) { + return false; + } + } + return true; + } + } // --------------------------------------------------------------------------------------------