From 143c02a8da9507c3de35d1e6a2078d747a3e889e Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Mon, 23 Jun 2014 14:03:46 +0200 Subject: [PATCH 1/4] integrated forwarded fields --- .../stratosphere/compiler/PactCompiler.java | 1 + .../dag/AbstractPartialSolutionNode.java | 11 ++ .../compiler/dag/BinaryUnionNode.java | 6 + .../compiler/dag/BulkIterationNode.java | 8 +- .../compiler/dag/DataSinkNode.java | 13 +- .../compiler/dag/DataSourceNode.java | 14 +- .../stratosphere/compiler/dag/FilterNode.java | 6 + .../compiler/dag/OptimizerNode.java | 3 + .../compiler/dag/SingleInputNode.java | 32 +++- .../compiler/dag/TwoInputNode.java | 46 ++++- .../compiler/dag/UnaryOperatorNode.java | 8 + .../compiler/dag/WorksetIterationNode.java | 8 +- .../dataproperties/GlobalProperties.java | 45 ++++- .../dataproperties/LocalProperties.java | 49 +++++- .../RequestedGlobalProperties.java | 39 ++++- .../RequestedLocalProperties.java | 32 +++- .../compiler/SemanticPropOptimizerTest.java | 163 ++++++++++++++++++ .../DualInputSemanticProperties.java | 35 +++- .../api/common/operators/Ordering.java | 16 +- .../SingleInputSemanticProperties.java | 14 ++ 20 files changed, 516 insertions(+), 33 deletions(-) create mode 100644 stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/SemanticPropOptimizerTest.java diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java index bf3d6af9fea79..68ed499bb14bd 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java @@ -960,6 +960,7 @@ public static final class InterestingPropertyVisitor implements Visitor getAlternativePlans(CostEstimator estimator) { public boolean isFieldConstant(int input, int fieldNumber) { return false; } - + + @Override + public FieldSet getForwardField(int input, int fieldNumber) { + return null; + } + + @Override + public FieldSet getSourceField(int input, int fieldNumber) { + return null; + } + // -------------------------------------------------------------------------------------------- // Miscellaneous // -------------------------------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java index b6d6b715c29b1..aeba89346f903 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java @@ -24,6 +24,7 @@ import eu.stratosphere.api.common.io.statistics.BaseStatistics; import eu.stratosphere.api.common.operators.base.GenericDataSourceBase; import eu.stratosphere.api.common.operators.Operator; +import eu.stratosphere.api.common.operators.util.FieldSet; import eu.stratosphere.compiler.DataStatistics; import eu.stratosphere.compiler.PactCompiler; import eu.stratosphere.compiler.costs.CostEstimator; @@ -191,7 +192,18 @@ public List getAlternativePlans(CostEstimator estimator) { public boolean isFieldConstant(int input, int fieldNumber) { return false; } - + + + @Override + public FieldSet getForwardField(int input, int fieldNumber) { + return null; + } + + @Override + public FieldSet getSourceField(int input, int fieldNumber) { + return null; + } + @Override public void accept(Visitor visitor) { if (visitor.preVisit(this)) { diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/FilterNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/FilterNode.java index f584c2411dbf0..bc324724d25da 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/FilterNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/FilterNode.java @@ -17,6 +17,7 @@ import java.util.List; import eu.stratosphere.api.common.operators.base.FilterOperatorBase; +import eu.stratosphere.api.common.operators.util.FieldSet; import eu.stratosphere.compiler.DataStatistics; import eu.stratosphere.compiler.operators.FilterDescriptor; import eu.stratosphere.compiler.operators.OperatorDescriptorSingle; @@ -46,6 +47,11 @@ public boolean isFieldConstant(int input, int fieldNumber) { return true; } + @Override + public FieldSet getForwardField(int input, int fieldNumber) { + return new FieldSet(fieldNumber); + } + @Override protected List getPossibleProperties() { return Collections.singletonList(new FilterDescriptor()); diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java index 85a6568c8b9d9..27b4cdbad7dec 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java @@ -273,6 +273,9 @@ protected List computeUnclosedBranchStackForBroadcastI */ public abstract boolean isFieldConstant(int input, int fieldNumber); + public abstract FieldSet getSourceField(int input, int fieldNumber); + + public abstract FieldSet getForwardField(int input, int fieldNumber); // ------------------------------------------------------------------------ // Getters / Setters // ------------------------------------------------------------------------ diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java index 0b872a7742fe1..eff0fc193ac27 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java @@ -155,7 +155,37 @@ public boolean isFieldConstant(int input, int fieldNumber) { return false; } - + + + @Override + public FieldSet getForwardField(int input, int fieldNumber) { + if (input != 0) { + throw new IndexOutOfBoundsException(); + } + + SingleInputOperator c = getPactContract(); + SingleInputSemanticProperties semanticProperties = c.getSemanticProperties(); + + if (semanticProperties != null) { + return semanticProperties.getForwardedField(fieldNumber); + } + return null; + } + + @Override + public FieldSet getSourceField(int input, int fieldNumber) { + if (input != 0) { + throw new IndexOutOfBoundsException(); + } + + SingleInputOperator c = getPactContract(); + SingleInputSemanticProperties semanticProperties = c.getSemanticProperties(); + + if (semanticProperties != null) { + return semanticProperties.getForwardedField(fieldNumber) != null ? semanticProperties.getForwardedField(fieldNumber) : semanticProperties.forwardedFrom(fieldNumber); + } + return null; + } @Override public void setInput(Map, OptimizerNode> contractToNode) throws CompilerException { diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java index 97a92d0cdb9f9..8053afb397de4 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java @@ -698,8 +698,50 @@ public boolean isFieldConstant(int input, int fieldNumber) { return false; } - - + + @Override + public FieldSet getForwardField(int input, int fieldNumber) { + DualInputOperator c = getPactContract(); + DualInputSemanticProperties semanticProperties = c.getSemanticProperties(); + + if (semanticProperties == null) { + return null; + } + + switch(input) { + case 0: + return semanticProperties.getForwardedField1(fieldNumber); + case 1: + return semanticProperties.getForwardedField2(fieldNumber); + default: + throw new IndexOutOfBoundsException(); + } + } + + @Override + public FieldSet getSourceField(int input, int fieldNumber) { + DualInputOperator c = getPactContract(); + DualInputSemanticProperties semanticProperties = c.getSemanticProperties(); + + switch(input) { + case 0: + if (semanticProperties != null) { + return semanticProperties.getForwardedField1(fieldNumber) != null ? semanticProperties.getForwardedField1(fieldNumber) : semanticProperties.forwardedFrom1(fieldNumber); + + } + break; + case 1: + if(semanticProperties != null) { + return semanticProperties.getForwardedField2(fieldNumber) != null ? semanticProperties.getForwardedField2(fieldNumber) : semanticProperties.forwardedFrom2(fieldNumber); + } + break; + default: + throw new IndexOutOfBoundsException(); + } + + return null; + } + // -------------------------------------------------------------------------------------------- // Miscellaneous // -------------------------------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/UnaryOperatorNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/UnaryOperatorNode.java index 4b8f63fec9c56..4089f20551e52 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/UnaryOperatorNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/UnaryOperatorNode.java @@ -56,6 +56,14 @@ public boolean isFieldConstant(int input, int fieldNumber) { return true; } + @Override + public FieldSet getForwardField(int input, int fieldNumber) { + if (input != 0) { + throw new IndexOutOfBoundsException(); + } + return new FieldSet(fieldNumber); + } + @Override protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { // we have no estimates by default diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java index f425695ae4f4c..5d384606e0272 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java @@ -21,6 +21,7 @@ import eu.stratosphere.api.common.operators.base.DeltaIterationBase; import eu.stratosphere.api.common.operators.util.FieldList; +import eu.stratosphere.api.common.operators.util.FieldSet; import eu.stratosphere.compiler.CompilerException; import eu.stratosphere.compiler.DataStatistics; import eu.stratosphere.compiler.PactCompiler.InterestingPropertyVisitor; @@ -203,7 +204,12 @@ public String getName() { public boolean isFieldConstant(int input, int fieldNumber) { return false; } - + + @Override + public FieldSet getForwardField(int input, int fieldNumber) { + return null; + } + protected void readStubAnnotations() {} @Override diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/GlobalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/GlobalProperties.java index 61b7c2a9659d9..016d9bd679ce3 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/GlobalProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/GlobalProperties.java @@ -190,6 +190,18 @@ public void reset() { this.partitioningFields = null; } + public Ordering getOrdering() { + return this.ordering; + } + + public void setOrdering(Ordering ordering) { + this.ordering = ordering; + } + + public void setPartitioningFields(FieldList partitioningFields) { + this.partitioningFields = partitioningFields; + } + /** * Filters these properties by what can be preserved through the given output contract. * @@ -199,17 +211,36 @@ public void reset() { */ public GlobalProperties filterByNodesConstantSet(OptimizerNode node, int input) { // check if partitioning survives + FieldList forwardFields = null; + GlobalProperties returnProps = this; + if (this.ordering != null) { - for (int col : this.ordering.getInvolvedIndexes()) { - if (!node.isFieldConstant(input, col)) { - return new GlobalProperties(); + for (int index : this.ordering.getInvolvedIndexes()) { + forwardFields = node.getForwardField(input, index) == null ? null: node.getForwardField(input, index).toFieldList(); + if (forwardFields == null) { + returnProps = new GlobalProperties(); + } else if (!forwardFields.contains(index)) { + returnProps = returnProps == this ? this.clone() : returnProps; + returnProps.setOrdering(returnProps.getOrdering().replaceOrdering(index, forwardFields.get(0))); } } } if (this.partitioningFields != null) { - for (int colIndex : this.partitioningFields) { - if (!node.isFieldConstant(input, colIndex)) { - return new GlobalProperties(); + for (int index : this.partitioningFields) { + forwardFields = node.getForwardField(input, index) == null ? null: node.getForwardField(input, index).toFieldList(); + if (forwardFields == null) { + returnProps = new GlobalProperties(); + } else if (!forwardFields.contains(index)) { + returnProps = returnProps == this ? this.clone() : returnProps; + FieldList partitioned = new FieldList(); + for (Integer value : returnProps.getPartitioningFields()) { + if (value.intValue() == index) { + partitioned = partitioned.addFields(forwardFields); + } else { + partitioned = partitioned.addField(value); + } + } + returnProps.setPartitioningFields(partitioned); } } } @@ -238,7 +269,7 @@ public GlobalProperties filterByNodesConstantSet(OptimizerNode node, int input) return new GlobalProperties(); } - return this; + return returnProps; } public void parameterizeChannel(Channel channel, boolean globalDopChange) { diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java index f49c1300a6bff..327cbc75a23c6 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java @@ -13,13 +13,13 @@ package eu.stratosphere.compiler.dataproperties; -import java.util.HashSet; -import java.util.Set; import eu.stratosphere.api.common.operators.Ordering; import eu.stratosphere.api.common.operators.util.FieldList; import eu.stratosphere.api.common.operators.util.FieldSet; import eu.stratosphere.compiler.dag.OptimizerNode; +import java.util.Set; +import java.util.HashSet; /** * This class represents local properties of the data. A local property is a property that exists @@ -131,11 +131,14 @@ public LocalProperties filterByNodesConstantSet(OptimizerNode node, int input) { Ordering no = this.ordering; FieldList ngf = this.groupedFields; Set
nuf = this.uniqueFields; - + FieldList forwardList = null; + if (this.ordering != null) { final FieldList involvedIndexes = this.ordering.getInvolvedIndexes(); for (int i = 0; i < involvedIndexes.size(); i++) { - if (!node.isFieldConstant(input, involvedIndexes.get(i))) { + forwardList = node.getForwardField(input, involvedIndexes.get(i)) == null ? null : node.getForwardField(input, involvedIndexes.get(i)).toFieldList(); + + if (forwardList == null) { if (i == 0) { no = null; ngf = null; @@ -144,29 +147,59 @@ public LocalProperties filterByNodesConstantSet(OptimizerNode node, int input) { ngf = no.getInvolvedIndexes(); } break; + } else if (!forwardList.contains(involvedIndexes.get(i))) { + no = this.getOrdering().replaceOrdering(involvedIndexes.get(i), forwardList.get(0)); + ngf = no.getInvolvedIndexes(); } } } else if (this.groupedFields != null) { // check, whether the local key grouping is preserved for (Integer index : this.groupedFields) { - if (!node.isFieldConstant(input, index)) { + forwardList = node.getForwardField(input, index) == null ? null : node.getForwardField(input, index).toFieldList(); + if (forwardList == null) { ngf = null; + break; + } else if (!forwardList.contains(index)) { + FieldList grouped = new FieldList(); + for (Integer value : ngf.toFieldList()) { + if (value.intValue() == index) { + grouped = grouped.addFields(forwardList); + } else { + grouped = grouped.addField(value); + } + } } } } - if (this.uniqueFields != null && this.uniqueFields.size() > 0) { + // check, whether the local key grouping is preserved + if (this.uniqueFields != null) { + boolean modified = false; Set
s = new HashSet
(this.uniqueFields); for (FieldSet fields : this.uniqueFields) { for (Integer index : fields) { - if (!node.isFieldConstant(input, index)) { + forwardList = node.getForwardField(input, index) == null ? null : node.getForwardField(input, index).toFieldList(); + if (forwardList == null) { s.remove(fields); + modified = true; break; + } else if (!forwardList.contains(index)) { + FieldList tmp = new FieldList(); + for (Integer i: fields) { + if (i != index) { + tmp.addField(i); + } else { + tmp.addFields(forwardList); + } + } + s.remove(fields); + s.add(tmp); + modified = true; } } } - if (s.size() != this.uniqueFields.size()) { + if (modified) { nuf = s; } } diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java index e769508cecd1e..389541169d426 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java @@ -15,6 +15,7 @@ import eu.stratosphere.api.common.distributions.DataDistribution; import eu.stratosphere.api.common.operators.Ordering; +import eu.stratosphere.api.common.operators.util.FieldList; import eu.stratosphere.api.common.operators.util.FieldSet; import eu.stratosphere.compiler.CompilerException; import eu.stratosphere.compiler.dag.OptimizerNode; @@ -152,6 +153,13 @@ public void reset() { this.partitioningFields = null; } + public void setPartitioningFields(FieldSet partitioned) { + this.partitioningFields = partitioned; + } + + public void setOrdering(Ordering newOrdering) { + this.ordering = newOrdering; + } /** * Filters these properties by what can be preserved by the given node when propagated down * to the given input. @@ -161,16 +169,39 @@ public void reset() { * @return True, if any non-default value is preserved, false otherwise. */ public RequestedGlobalProperties filterByNodesConstantSet(OptimizerNode node, int input) { + FieldList sourceList; + RequestedGlobalProperties returnProps = this; + // check if partitioning survives if (this.ordering != null) { - for (int col : this.ordering.getInvolvedIndexes()) { - if (!node.isFieldConstant(input, col)) { + for (int index : this.ordering.getInvolvedIndexes()) { + sourceList = node.getSourceField(input, index) == null ? null : node.getSourceField(input, index).toFieldList(); + if (sourceList != null) { + if (!sourceList.contains(index)) { + returnProps = returnProps == this ? this.clone() : returnProps; + returnProps.setOrdering(returnProps.getOrdering().replaceOrdering(index, sourceList.get(0))); + } + } else { return null; } } } else if (this.partitioningFields != null) { - for (int colIndex : this.partitioningFields) { - if (!node.isFieldConstant(input, colIndex)) { + for (Integer index : this.partitioningFields) { + sourceList = node.getSourceField(input, index) == null ? null : node.getSourceField(input, index).toFieldList(); + if (sourceList != null) { + if (!sourceList.contains(index)) { + returnProps = returnProps == this ? this.clone() : returnProps; + FieldList partitioned = new FieldList(); + for (Integer value : returnProps.getPartitionedFields()) { + if (value.intValue() == index) { + partitioned = partitioned.addFields(sourceList); + } else { + partitioned = partitioned.addField(value); + } + } + returnProps.setPartitioningFields(partitioned); + } + } else { return null; } } diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java index aeae0d2f8f2c8..8f4f421f02d79 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java @@ -139,22 +139,44 @@ public void reset() { * @return True, if the resulting properties are non trivial. */ public RequestedLocalProperties filterByNodesConstantSet(OptimizerNode node, int input) { + FieldList sourceList; + RequestedLocalProperties returnProps = this; + if (this.ordering != null) { - final FieldList involvedIndexes = this.ordering.getInvolvedIndexes(); - for (int i = 0; i < involvedIndexes.size(); i++) { - if (!node.isFieldConstant(input, involvedIndexes.get(i))) { + for (int index: this.ordering.getInvolvedIndexes()) { + sourceList = node.getSourceField(input, index) == null ? null : node.getSourceField(input, index).toFieldList(); + if (sourceList != null) { + if (!sourceList.contains(index)) { + returnProps = returnProps == this ? this.clone() : returnProps; + returnProps.setOrdering(returnProps.getOrdering().replaceOrdering(index, sourceList.get(0))); + } + } else { return null; } } } else if (this.groupedFields != null) { // check, whether the local key grouping is preserved for (Integer index : this.groupedFields) { - if (!node.isFieldConstant(input, index)) { + sourceList = node.getSourceField(input, index) == null ? null : node.getSourceField(input, index).toFieldList(); + if (sourceList != null) { + if (!sourceList.contains(index)) { + returnProps = returnProps == this ? this.clone() : returnProps; + FieldList grouped = new FieldList(); + for (Integer value : returnProps.getGroupedFields()) { + if (value.intValue() == index) { + grouped = grouped.addFields(sourceList); + } else { + grouped = grouped.addField(value); + } + } + returnProps.setGroupedFields(grouped); + } + } else { return null; } } } - return this; + return returnProps; } /** diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/SemanticPropOptimizerTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/SemanticPropOptimizerTest.java new file mode 100644 index 0000000000000..8da557b7f9dc6 --- /dev/null +++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/SemanticPropOptimizerTest.java @@ -0,0 +1,163 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.pact.compiler; + +import eu.stratosphere.api.common.operators.base.JoinOperatorBase; +import eu.stratosphere.api.common.operators.base.MapOperatorBase; +import eu.stratosphere.api.common.operators.base.ReduceOperatorBase; +import eu.stratosphere.api.common.operators.util.FieldSet; +import eu.stratosphere.api.java.DataSet; +import eu.stratosphere.api.java.ExecutionEnvironment; +import eu.stratosphere.api.java.functions.JoinFunction; +import eu.stratosphere.api.java.functions.MapFunction; +import eu.stratosphere.api.java.functions.ReduceFunction; +import eu.stratosphere.api.java.operators.translation.JavaPlan; +import eu.stratosphere.api.java.tuple.Tuple3; +import eu.stratosphere.compiler.dataproperties.GlobalProperties; +import eu.stratosphere.compiler.dataproperties.LocalProperties; +import eu.stratosphere.compiler.dataproperties.PartitioningProperty; +import eu.stratosphere.compiler.plan.Channel; +import eu.stratosphere.compiler.plan.DualInputPlanNode; +import eu.stratosphere.compiler.plan.OptimizedPlan; +import eu.stratosphere.compiler.plan.PlanNode; +import eu.stratosphere.compiler.plan.SingleInputPlanNode; +import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; +import eu.stratosphere.util.Visitor; +import org.junit.Assert; +import org.junit.Test; + +public class SemanticPropOptimizerTest extends CompilerTestBase { + + public static class SimpleReducer extends ReduceFunction> { + @Override + public Tuple3 reduce(Tuple3 value1, Tuple3 value2) throws Exception { + return null; + } + } + + public static class SimpleMap extends MapFunction, Tuple3> { + @Override + public Tuple3 map(Tuple3 value) throws Exception { + return null; + } + } + + @Test + public void forwardFieldsTestMapReduce() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + set = set.map(new SimpleMap()).withConstantSet("*") + .groupBy(0) + .reduce(new SimpleReducer()).withConstantSet("0->1") + .map(new SimpleMap()).withConstantSet("*") + .groupBy(1) + .reduce(new SimpleReducer()).withConstantSet("*"); + + set.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + oPlan.accept(new Visitor() { + @Override + public boolean preVisit(PlanNode visitable) { + if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof ReduceOperatorBase) { + for (Channel input: visitable.getInputs()) { + GlobalProperties gprops = visitable.getGlobalProperties(); + LocalProperties lprops = visitable.getLocalProperties(); + + Assert.assertTrue("Reduce should just forward the input if it is already partitioned", + input.getShipStrategy() == ShipStrategyType.FORWARD); + Assert.assertTrue("Wrong GlobalProperties on Reducer", + gprops.isPartitionedOnFields(new FieldSet(1))); + Assert.assertTrue("Wrong GlobalProperties on Reducer", + gprops.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); + Assert.assertTrue("Wrong LocalProperties on Reducer", + lprops.getGroupedFields().contains(1)); + } + } + if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof MapOperatorBase) { + for (Channel input: visitable.getInputs()) { + GlobalProperties gprops = visitable.getGlobalProperties(); + LocalProperties lprops = visitable.getLocalProperties(); + + Assert.assertTrue("Map should just forward the input if it is already partitioned", + input.getShipStrategy() == ShipStrategyType.FORWARD); + Assert.assertTrue("Wrong GlobalProperties on Mapper", + gprops.isPartitionedOnFields(new FieldSet(1))); + Assert.assertTrue("Wrong GlobalProperties on Mapper", + gprops.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); + Assert.assertTrue("Wrong LocalProperties on Mapper", + lprops.getGroupedFields().contains(1)); + } + return false; + } + return true; + } + + @Override + public void postVisit(PlanNode visitable) { + + } + }); + } + + @Test + public void forwardFieldsTestJoin() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> in1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> in2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + in1 = in1.map(new SimpleMap()).withConstantSet("*") + .groupBy(0) + .reduce(new SimpleReducer()).withConstantSet("0->1"); + in2 = in2.map(new SimpleMap()).withConstantSet("*") + .groupBy(1) + .reduce(new SimpleReducer()).withConstantSet("1->2"); + DataSet> out = in1.join(in2).where(1).equalTo(2).with(new JoinFunction, Tuple3, Tuple3>() { + @Override + public Tuple3 join(Tuple3 first, Tuple3 second) throws Exception { + return null; + } + }); + + out.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + oPlan.accept(new Visitor() { + @Override + public boolean preVisit(PlanNode visitable) { + if (visitable instanceof DualInputPlanNode && visitable.getPactContract() instanceof JoinOperatorBase) { + DualInputPlanNode node = ((DualInputPlanNode) visitable); + + final Channel inConn1 = node.getInput1(); + final Channel inConn2 = node.getInput2(); + + Assert.assertTrue("Join should just forward the input if it is already partitioned", + inConn1.getShipStrategy() == ShipStrategyType.FORWARD); + Assert.assertTrue("Join should just forward the input if it is already partitioned", + inConn2.getShipStrategy() == ShipStrategyType.FORWARD); + return false; + } + return true; + } + + @Override + public void postVisit(PlanNode visitable) { + + } + }); + } +} diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java index 1ed399a5d8acb..b7280fa99affe 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java @@ -53,7 +53,40 @@ public DualInputSemanticProperties() { super(); this.init(); } - + + /** + * Finds the source field where the given field was forwarded from. + * @param dest The destination field in the output data. + * @return FieldSet containing the source input fields. + */ + public FieldSet forwardedFrom1(int dest) { + FieldSet fs = null; + for (Map.Entry entry : forwardedFields1.entrySet()) { + if (entry.getValue().contains(dest)) { + if (fs == null) { + fs = new FieldSet(); + } + + fs = fs.addField(entry.getKey()); + } + } + return fs; + } + + public FieldSet forwardedFrom2(int dest) { + FieldSet fs = null; + for (Map.Entry entry : forwardedFields2.entrySet()) { + if (entry.getValue().contains(dest)) { + if (fs == null) { + fs = new FieldSet(); + } + + fs = fs.addField(entry.getKey()); + } + } + return fs; + } + /** * Adds, to the existing information, a field that is forwarded directly * from the source record(s) in the first input to the destination diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/Ordering.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/Ordering.java index a9895ecd91c46..daf51c74856b4 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/Ordering.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/Ordering.java @@ -71,7 +71,21 @@ public Ordering appendOrdering(Integer index, Class> type, Orde this.orders.add(order); return this; } - + + public Ordering replaceOrdering(int oldField, int newField) { + Ordering newOrdering = new Ordering(); + + for (int i = 0; i < indexes.size(); i++) { + if (indexes.get(i).intValue() == oldField) { + newOrdering.appendOrdering(newField, this.types.get(i), this.orders.get(i)); + } else { + newOrdering.appendOrdering(this.indexes.get(i), this.types.get(i), this.orders.get(i)); + } + } + + return newOrdering; + } + // -------------------------------------------------------------------------------------------- public int getNumberOfFields() { diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java index f1d098a09cbfc..86b77ced11732 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java @@ -40,6 +40,20 @@ public SingleInputSemanticProperties() { super(); this.init(); } + + public FieldSet forwardedFrom(int dest) { + FieldSet fs = null; + for (Map.Entry entry : forwardedFields.entrySet()) { + if (entry.getValue().contains(dest)) { + if (fs == null) { + fs = new FieldSet(); + } + + fs = fs.addField(entry.getKey()); + } + } + return fs; + } /** * Adds, to the existing information, a field that is forwarded directly From ede15d4f2b01ba3d073beb5be5895ef7db351707 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Mon, 7 Jul 2014 12:29:44 +0200 Subject: [PATCH 2/4] some refactoring --- .../stratosphere/compiler/dag/SingleInputNode.java | 4 ++-- .../eu/stratosphere/compiler/dag/TwoInputNode.java | 8 ++++---- .../compiler/dataproperties/GlobalProperties.java | 12 ++++++------ .../dataproperties/InterestingProperties.java | 4 ++-- .../compiler/dataproperties/LocalProperties.java | 4 ++-- .../dataproperties/RequestedGlobalProperties.java | 4 ++-- .../dataproperties/RequestedLocalProperties.java | 4 ++-- 7 files changed, 20 insertions(+), 20 deletions(-) diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java index eff0fc193ac27..e589b02725580 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java @@ -457,8 +457,8 @@ protected void instantiateCandidate(OperatorDescriptorSingle dps, Channel in, Li lProps = dps.computeLocalProperties(lProps); // filter by the user code field copies - gProps = gProps.filterByNodesConstantSet(this, 0); - lProps = lProps.filterByNodesConstantSet(this, 0); + gProps = gProps.filterBySemanticProperties(this, 0); + lProps = lProps.filterBySemanticProperties(this, 0); // apply node.initProperties(gProps, lProps); diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java index 8053afb397de4..2a6f32221cc24 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java @@ -534,12 +534,12 @@ protected void instantiate(OperatorDescriptorDual operator, Channel in1, Channel DualInputPlanNode node = operator.instantiate(in1, in2, this); node.setBroadcastInputs(broadcastChannelsCombination); - GlobalProperties gp1 = in1.getGlobalProperties().clone().filterByNodesConstantSet(this, 0); - GlobalProperties gp2 = in2.getGlobalProperties().clone().filterByNodesConstantSet(this, 1); + GlobalProperties gp1 = in1.getGlobalProperties().clone().filterBySemanticProperties(this, 0); + GlobalProperties gp2 = in2.getGlobalProperties().clone().filterBySemanticProperties(this, 1); GlobalProperties combined = operator.computeGlobalProperties(gp1, gp2); - LocalProperties lp1 = in1.getLocalProperties().clone().filterByNodesConstantSet(this, 0); - LocalProperties lp2 = in2.getLocalProperties().clone().filterByNodesConstantSet(this, 1); + LocalProperties lp1 = in1.getLocalProperties().clone().filterBySemanticProperties(this, 0); + LocalProperties lp2 = in2.getLocalProperties().clone().filterBySemanticProperties(this, 1); LocalProperties locals = operator.computeLocalProperties(lp1, lp2); node.initProperties(combined, locals); diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/GlobalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/GlobalProperties.java index 016d9bd679ce3..08ca9ffd8f11d 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/GlobalProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/GlobalProperties.java @@ -203,13 +203,13 @@ public void setPartitioningFields(FieldList partitioningFields) { } /** - * Filters these properties by what can be preserved through the given output contract. - * - * @param contract - * The output contract. - * @return True, if any non-default value is preserved, false otherwise. + * Filters these GlobalProperties by the fields that are constant or forwarded to another output field. + * + * @param node The node representing the contract. + * @param input The index of the input. + * @return The filtered GlobalProperties */ - public GlobalProperties filterByNodesConstantSet(OptimizerNode node, int input) { + public GlobalProperties filterBySemanticProperties(OptimizerNode node, int input) { // check if partitioning survives FieldList forwardFields = null; GlobalProperties returnProps = this; diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/InterestingProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/InterestingProperties.java index 4f2d06b3fdfdb..0606f31955070 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/InterestingProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/InterestingProperties.java @@ -90,13 +90,13 @@ public InterestingProperties filterByCodeAnnotations(OptimizerNode node, int inp InterestingProperties iProps = new InterestingProperties(); for (RequestedGlobalProperties rgp : this.globalProps) { - RequestedGlobalProperties filtered = rgp.filterByNodesConstantSet(node, input); + RequestedGlobalProperties filtered = rgp.filterBySemanticProperties(node, input); if (filtered != null && !filtered.isTrivial()) { iProps.addGlobalProperties(filtered); } } for (RequestedLocalProperties rlp : this.localProps) { - RequestedLocalProperties filtered = rlp.filterByNodesConstantSet(node, input); + RequestedLocalProperties filtered = rlp.filterBySemanticProperties(node, input); if (filtered != null && !filtered.isTrivial()) { iProps.addLocalProperties(filtered); } diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java index 327cbc75a23c6..7b8a8902d5b77 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java @@ -124,9 +124,9 @@ public boolean isTrivial() { * @param node The optimizer node that potentially modifies the properties. * @param input The input of the node which is relevant. * - * @return True, if the resulting properties are non trivial. + * @return The filtered LocalProperties */ - public LocalProperties filterByNodesConstantSet(OptimizerNode node, int input) { + public LocalProperties filterBySemanticProperties(OptimizerNode node, int input) { // check, whether the local order is preserved Ordering no = this.ordering; FieldList ngf = this.groupedFields; diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java index 389541169d426..5641674624ec9 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java @@ -166,9 +166,9 @@ public void setOrdering(Ordering newOrdering) { * * @param node The node representing the contract. * @param input The index of the input. - * @return True, if any non-default value is preserved, false otherwise. + * @return The filtered RequestedGlobalProperties */ - public RequestedGlobalProperties filterByNodesConstantSet(OptimizerNode node, int input) { + public RequestedGlobalProperties filterBySemanticProperties(OptimizerNode node, int input) { FieldList sourceList; RequestedGlobalProperties returnProps = this; diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java index 8f4f421f02d79..f5ca55e5e2d06 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java @@ -136,9 +136,9 @@ public void reset() { * @param node The optimizer node that potentially modifies the properties. * @param input The input of the node which is relevant. * - * @return True, if the resulting properties are non trivial. + * @return The filtered RequestedLocalProperties */ - public RequestedLocalProperties filterByNodesConstantSet(OptimizerNode node, int input) { + public RequestedLocalProperties filterBySemanticProperties(OptimizerNode node, int input) { FieldList sourceList; RequestedLocalProperties returnProps = this; From 20ed492456a2a5ba0c24006f1ace6343dc1ad68f Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Mon, 7 Jul 2014 14:51:04 +0200 Subject: [PATCH 3/4] Local and Globalproperties are now supplied with the semanticproperties directly --- .../dag/AbstractPartialSolutionNode.java | 11 --- .../compiler/dag/BinaryUnionNode.java | 6 -- .../compiler/dag/BulkIterationNode.java | 6 -- .../compiler/dag/DataSinkNode.java | 11 --- .../compiler/dag/DataSourceNode.java | 11 --- .../stratosphere/compiler/dag/FilterNode.java | 6 -- .../compiler/dag/OptimizerNode.java | 24 +++--- .../compiler/dag/SingleInputNode.java | 39 ++-------- .../compiler/dag/TwoInputNode.java | 77 +++++-------------- .../compiler/dag/UnaryOperatorNode.java | 8 -- .../compiler/dag/WorksetIterationNode.java | 6 -- .../dataproperties/GlobalProperties.java | 12 +-- .../dataproperties/InterestingProperties.java | 21 +++-- .../dataproperties/LocalProperties.java | 15 ++-- .../RequestedGlobalProperties.java | 11 +-- .../RequestedLocalProperties.java | 12 +-- .../postpass/GenericFlatTypePostPass.java | 1 + .../DualInputSemanticProperties.java | 24 +++++- .../common/operators/SemanticProperties.java | 10 ++- .../SingleInputSemanticProperties.java | 18 ++++- 20 files changed, 123 insertions(+), 206 deletions(-) diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/AbstractPartialSolutionNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/AbstractPartialSolutionNode.java index cb7d5152379c1..337cb0dd5059e 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/AbstractPartialSolutionNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/AbstractPartialSolutionNode.java @@ -18,7 +18,6 @@ import java.util.Map; import eu.stratosphere.api.common.operators.Operator; -import eu.stratosphere.api.common.operators.util.FieldSet; import eu.stratosphere.compiler.DataStatistics; import eu.stratosphere.compiler.costs.CostEstimator; import eu.stratosphere.compiler.plan.PlanNode; @@ -44,16 +43,6 @@ protected void copyEstimates(OptimizerNode node) { // -------------------------------------------------------------------------------------------- - @Override - public FieldSet getSourceField(int input, int fieldNumber) { - return null; - } - - @Override - public FieldSet getForwardField(int input, int fieldNumber) { - return null; - } - public boolean isOnDynamicPath() { return true; } diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BinaryUnionNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BinaryUnionNode.java index 6c7a0038bf212..7c2a04523317e 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BinaryUnionNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BinaryUnionNode.java @@ -19,7 +19,6 @@ import java.util.Set; import eu.stratosphere.api.common.operators.Union; -import eu.stratosphere.api.common.operators.util.FieldSet; import eu.stratosphere.compiler.CompilerException; import eu.stratosphere.compiler.DataStatistics; import eu.stratosphere.compiler.costs.CostEstimator; @@ -251,11 +250,6 @@ public boolean isFieldConstant(int input, int fieldNumber) { return true; } - @Override - public FieldSet getForwardField(int input, int fieldNumber) { - return new FieldSet(fieldNumber); - } - @Override public void computeOutputEstimates(DataStatistics statistics) { OptimizerNode in1 = getFirstPredecessorNode(); diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java index 25d688ec46516..6f92f34eecadc 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java @@ -21,7 +21,6 @@ import eu.stratosphere.api.common.operators.base.BulkIterationBase; import eu.stratosphere.api.common.operators.util.FieldList; -import eu.stratosphere.api.common.operators.util.FieldSet; import eu.stratosphere.compiler.CompilerException; import eu.stratosphere.compiler.DataStatistics; import eu.stratosphere.compiler.PactCompiler.InterestingPropertyVisitor; @@ -182,11 +181,6 @@ public boolean isFieldConstant(int input, int fieldNumber) { return false; } - @Override - public FieldSet getForwardField(int input, int fieldNumber) { - return null; - } - protected void readStubAnnotations() {} @Override diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java index 3d02967cb4d73..e831aa2ed602b 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java @@ -22,7 +22,6 @@ import eu.stratosphere.api.common.operators.base.GenericDataSinkBase; import eu.stratosphere.api.common.operators.Operator; import eu.stratosphere.api.common.operators.Ordering; -import eu.stratosphere.api.common.operators.util.FieldSet; import eu.stratosphere.compiler.CompilerException; import eu.stratosphere.compiler.DataStatistics; import eu.stratosphere.compiler.costs.CostEstimator; @@ -230,16 +229,6 @@ public boolean isFieldConstant(int input, int fieldNumber) { return false; } - @Override - public FieldSet getForwardField(int input, int fieldNumber) { - return null; - } - - @Override - public FieldSet getSourceField(int input, int fieldNumber) { - return null; - } - // -------------------------------------------------------------------------------------------- // Miscellaneous // -------------------------------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java index aeba89346f903..218938042cc16 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java @@ -24,7 +24,6 @@ import eu.stratosphere.api.common.io.statistics.BaseStatistics; import eu.stratosphere.api.common.operators.base.GenericDataSourceBase; import eu.stratosphere.api.common.operators.Operator; -import eu.stratosphere.api.common.operators.util.FieldSet; import eu.stratosphere.compiler.DataStatistics; import eu.stratosphere.compiler.PactCompiler; import eu.stratosphere.compiler.costs.CostEstimator; @@ -194,16 +193,6 @@ public boolean isFieldConstant(int input, int fieldNumber) { } - @Override - public FieldSet getForwardField(int input, int fieldNumber) { - return null; - } - - @Override - public FieldSet getSourceField(int input, int fieldNumber) { - return null; - } - @Override public void accept(Visitor visitor) { if (visitor.preVisit(this)) { diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/FilterNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/FilterNode.java index bc324724d25da..f584c2411dbf0 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/FilterNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/FilterNode.java @@ -17,7 +17,6 @@ import java.util.List; import eu.stratosphere.api.common.operators.base.FilterOperatorBase; -import eu.stratosphere.api.common.operators.util.FieldSet; import eu.stratosphere.compiler.DataStatistics; import eu.stratosphere.compiler.operators.FilterDescriptor; import eu.stratosphere.compiler.operators.OperatorDescriptorSingle; @@ -47,11 +46,6 @@ public boolean isFieldConstant(int input, int fieldNumber) { return true; } - @Override - public FieldSet getForwardField(int input, int fieldNumber) { - return new FieldSet(fieldNumber); - } - @Override protected List getPossibleProperties() { return Collections.singletonList(new FilterDescriptor()); diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java index 27b4cdbad7dec..50be64798dd71 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java @@ -25,6 +25,7 @@ import eu.stratosphere.api.common.operators.AbstractUdfOperator; import eu.stratosphere.api.common.operators.CompilerHints; import eu.stratosphere.api.common.operators.Operator; +import eu.stratosphere.api.common.operators.SemanticProperties; import eu.stratosphere.api.common.operators.util.FieldSet; import eu.stratosphere.compiler.CompilerException; import eu.stratosphere.compiler.DataStatistics; @@ -262,20 +263,17 @@ protected List computeUnclosedBranchStackForBroadcastI */ @Override public abstract void accept(Visitor visitor); - - /** - * Checks whether a field is modified by the user code or whether it is kept unchanged. - * - * @param input The input number. - * @param fieldNumber The position of the field. - * - * @return True if the field is not changed by the user function, false otherwise. - */ - public abstract boolean isFieldConstant(int input, int fieldNumber); - public abstract FieldSet getSourceField(int input, int fieldNumber); + /* + * Checks whether a field is modified by the user code or whether it is kept unchanged. + * + * @param input The input number. + * @param fieldNumber The position of the field. + * + * @return True if the field is not changed by the user function, false otherwise. + */ + public abstract boolean isFieldConstant(int input, int fieldNumber); - public abstract FieldSet getForwardField(int input, int fieldNumber); // ------------------------------------------------------------------------ // Getters / Setters // ------------------------------------------------------------------------ @@ -681,7 +679,7 @@ protected int[] getConstantKeySet(int input) { } for (int keyColumn : keyColumns) { if (!isFieldConstant(input, keyColumn)) { - return null; + return null; } } return keyColumns; diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java index e589b02725580..34ff3a500d4d5 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java @@ -27,6 +27,7 @@ import com.google.common.collect.Sets; import eu.stratosphere.api.common.operators.Operator; +import eu.stratosphere.api.common.operators.SemanticProperties; import eu.stratosphere.api.common.operators.SingleInputOperator; import eu.stratosphere.api.common.operators.SingleInputSemanticProperties; import eu.stratosphere.api.common.operators.util.FieldSet; @@ -156,37 +157,6 @@ public boolean isFieldConstant(int input, int fieldNumber) { return false; } - - @Override - public FieldSet getForwardField(int input, int fieldNumber) { - if (input != 0) { - throw new IndexOutOfBoundsException(); - } - - SingleInputOperator c = getPactContract(); - SingleInputSemanticProperties semanticProperties = c.getSemanticProperties(); - - if (semanticProperties != null) { - return semanticProperties.getForwardedField(fieldNumber); - } - return null; - } - - @Override - public FieldSet getSourceField(int input, int fieldNumber) { - if (input != 0) { - throw new IndexOutOfBoundsException(); - } - - SingleInputOperator c = getPactContract(); - SingleInputSemanticProperties semanticProperties = c.getSemanticProperties(); - - if (semanticProperties != null) { - return semanticProperties.getForwardedField(fieldNumber) != null ? semanticProperties.getForwardedField(fieldNumber) : semanticProperties.forwardedFrom(fieldNumber); - } - return null; - } - @Override public void setInput(Map, OptimizerNode> contractToNode) throws CompilerException { // see if an internal hint dictates the strategy to use @@ -455,10 +425,11 @@ protected void instantiateCandidate(OperatorDescriptorSingle dps, Channel in, Li LocalProperties lProps = in.getLocalProperties().clone(); gProps = dps.computeGlobalProperties(gProps); lProps = dps.computeLocalProperties(lProps); - + + SemanticProperties props = this.getPactContract().getSemanticProperties(); // filter by the user code field copies - gProps = gProps.filterBySemanticProperties(this, 0); - lProps = lProps.filterBySemanticProperties(this, 0); + gProps = gProps.filterBySemanticProperties(props, 0); + lProps = lProps.filterBySemanticProperties(props, 0); // apply node.initProperties(gProps, lProps); diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java index 2a6f32221cc24..06123d1a469c6 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java @@ -29,6 +29,7 @@ import eu.stratosphere.api.common.operators.DualInputOperator; import eu.stratosphere.api.common.operators.DualInputSemanticProperties; import eu.stratosphere.api.common.operators.Operator; +import eu.stratosphere.api.common.operators.SemanticProperties; import eu.stratosphere.api.common.operators.util.FieldList; import eu.stratosphere.api.common.operators.util.FieldSet; import eu.stratosphere.compiler.CompilerException; @@ -533,13 +534,14 @@ protected void instantiate(OperatorDescriptorDual operator, Channel in1, Channel DualInputPlanNode node = operator.instantiate(in1, in2, this); node.setBroadcastInputs(broadcastChannelsCombination); - - GlobalProperties gp1 = in1.getGlobalProperties().clone().filterBySemanticProperties(this, 0); - GlobalProperties gp2 = in2.getGlobalProperties().clone().filterBySemanticProperties(this, 1); + + SemanticProperties props = this.getPactContract().getSemanticProperties(); + GlobalProperties gp1 = in1.getGlobalProperties().clone().filterBySemanticProperties(props, 0); + GlobalProperties gp2 = in2.getGlobalProperties().clone().filterBySemanticProperties(props, 1); GlobalProperties combined = operator.computeGlobalProperties(gp1, gp2); - LocalProperties lp1 = in1.getLocalProperties().clone().filterBySemanticProperties(this, 0); - LocalProperties lp2 = in2.getLocalProperties().clone().filterBySemanticProperties(this, 1); + LocalProperties lp1 = in1.getLocalProperties().clone().filterBySemanticProperties(props, 0); + LocalProperties lp2 = in2.getLocalProperties().clone().filterBySemanticProperties(props, 1); LocalProperties locals = operator.computeLocalProperties(lp1, lp2); node.initProperties(combined, locals); @@ -674,22 +676,22 @@ public FieldList getInputKeySet(int input) { public boolean isFieldConstant(int input, int fieldNumber) { DualInputOperator c = getPactContract(); DualInputSemanticProperties semanticProperties = c.getSemanticProperties(); - + + if (semanticProperties == null) { + return false; + } + switch(input) { case 0: - if (semanticProperties != null) { - FieldSet fs; - if ((fs = semanticProperties.getForwardedField1(fieldNumber)) != null) { - return fs.contains(fieldNumber); - } + FieldSet fs; + if ((fs = semanticProperties.getForwardedField1(fieldNumber)) != null) { + return fs.contains(fieldNumber); } break; case 1: - if(semanticProperties != null) { - FieldSet fs; - if ((fs = semanticProperties.getForwardedField2(fieldNumber)) != null) { - return fs.contains(fieldNumber); - } + FieldSet fs; + if ((fs = semanticProperties.getForwardedField2(fieldNumber)) != null) { + return fs.contains(fieldNumber); } break; default: @@ -699,49 +701,6 @@ public boolean isFieldConstant(int input, int fieldNumber) { return false; } - @Override - public FieldSet getForwardField(int input, int fieldNumber) { - DualInputOperator c = getPactContract(); - DualInputSemanticProperties semanticProperties = c.getSemanticProperties(); - - if (semanticProperties == null) { - return null; - } - - switch(input) { - case 0: - return semanticProperties.getForwardedField1(fieldNumber); - case 1: - return semanticProperties.getForwardedField2(fieldNumber); - default: - throw new IndexOutOfBoundsException(); - } - } - - @Override - public FieldSet getSourceField(int input, int fieldNumber) { - DualInputOperator c = getPactContract(); - DualInputSemanticProperties semanticProperties = c.getSemanticProperties(); - - switch(input) { - case 0: - if (semanticProperties != null) { - return semanticProperties.getForwardedField1(fieldNumber) != null ? semanticProperties.getForwardedField1(fieldNumber) : semanticProperties.forwardedFrom1(fieldNumber); - - } - break; - case 1: - if(semanticProperties != null) { - return semanticProperties.getForwardedField2(fieldNumber) != null ? semanticProperties.getForwardedField2(fieldNumber) : semanticProperties.forwardedFrom2(fieldNumber); - } - break; - default: - throw new IndexOutOfBoundsException(); - } - - return null; - } - // -------------------------------------------------------------------------------------------- // Miscellaneous // -------------------------------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/UnaryOperatorNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/UnaryOperatorNode.java index 4089f20551e52..4b8f63fec9c56 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/UnaryOperatorNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/UnaryOperatorNode.java @@ -56,14 +56,6 @@ public boolean isFieldConstant(int input, int fieldNumber) { return true; } - @Override - public FieldSet getForwardField(int input, int fieldNumber) { - if (input != 0) { - throw new IndexOutOfBoundsException(); - } - return new FieldSet(fieldNumber); - } - @Override protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { // we have no estimates by default diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java index 5d384606e0272..72b6f883416b3 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java @@ -21,7 +21,6 @@ import eu.stratosphere.api.common.operators.base.DeltaIterationBase; import eu.stratosphere.api.common.operators.util.FieldList; -import eu.stratosphere.api.common.operators.util.FieldSet; import eu.stratosphere.compiler.CompilerException; import eu.stratosphere.compiler.DataStatistics; import eu.stratosphere.compiler.PactCompiler.InterestingPropertyVisitor; @@ -205,11 +204,6 @@ public boolean isFieldConstant(int input, int fieldNumber) { return false; } - @Override - public FieldSet getForwardField(int input, int fieldNumber) { - return null; - } - protected void readStubAnnotations() {} @Override diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/GlobalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/GlobalProperties.java index 08ca9ffd8f11d..600be5d7490d3 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/GlobalProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/GlobalProperties.java @@ -19,10 +19,10 @@ import eu.stratosphere.api.common.operators.Order; import eu.stratosphere.api.common.operators.Ordering; +import eu.stratosphere.api.common.operators.SemanticProperties; import eu.stratosphere.api.common.operators.util.FieldList; import eu.stratosphere.api.common.operators.util.FieldSet; import eu.stratosphere.compiler.CompilerException; -import eu.stratosphere.compiler.dag.OptimizerNode; import eu.stratosphere.compiler.plan.Channel; import eu.stratosphere.compiler.util.Utils; import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; @@ -205,18 +205,18 @@ public void setPartitioningFields(FieldList partitioningFields) { /** * Filters these GlobalProperties by the fields that are constant or forwarded to another output field. * - * @param node The node representing the contract. + * @param props The node representing the contract. * @param input The index of the input. * @return The filtered GlobalProperties */ - public GlobalProperties filterBySemanticProperties(OptimizerNode node, int input) { + public GlobalProperties filterBySemanticProperties(SemanticProperties props, int input) { // check if partitioning survives FieldList forwardFields = null; GlobalProperties returnProps = this; if (this.ordering != null) { for (int index : this.ordering.getInvolvedIndexes()) { - forwardFields = node.getForwardField(input, index) == null ? null: node.getForwardField(input, index).toFieldList(); + forwardFields = props.getForwardFields(input, index) == null ? null: props.getForwardFields(input, index).toFieldList(); if (forwardFields == null) { returnProps = new GlobalProperties(); } else if (!forwardFields.contains(index)) { @@ -227,7 +227,7 @@ public GlobalProperties filterBySemanticProperties(OptimizerNode node, int input } if (this.partitioningFields != null) { for (int index : this.partitioningFields) { - forwardFields = node.getForwardField(input, index) == null ? null: node.getForwardField(input, index).toFieldList(); + forwardFields = props.getForwardFields(input, index) == null ? null: props.getForwardFields(input, index).toFieldList(); if (forwardFields == null) { returnProps = new GlobalProperties(); } else if (!forwardFields.contains(index)) { @@ -251,7 +251,7 @@ public GlobalProperties filterBySemanticProperties(OptimizerNode node, int input for (Iterator
combos = newSet.iterator(); combos.hasNext(); ){ FieldSet current = combos.next(); for (Integer field : current) { - if (!node.isFieldConstant(input, field)) { + if (!props.getForwardFields(input, field).contains(field)) { combos.remove(); break; } diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/InterestingProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/InterestingProperties.java index 0606f31955070..1259d1480c156 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/InterestingProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/InterestingProperties.java @@ -17,7 +17,12 @@ import java.util.Iterator; import java.util.Set; +import eu.stratosphere.api.common.operators.DualInputOperator; +import eu.stratosphere.api.common.operators.SemanticProperties; +import eu.stratosphere.api.common.operators.SingleInputOperator; import eu.stratosphere.compiler.dag.OptimizerNode; +import eu.stratosphere.compiler.dag.SingleInputNode; +import eu.stratosphere.compiler.dag.TwoInputNode; /** * The interesting properties that a node in the optimizer plan hands to its predecessors. It has the @@ -40,9 +45,7 @@ public InterestingProperties() { /** * Private constructor for cloning purposes. - * - * @param maxCostsGlobal The maximal costs for the global properties. - * @param maxCostsLocal The maximal costs for the local properties. + * * @param globalProps The global properties for this new object. * @param localProps The local properties for this new object. */ @@ -88,15 +91,21 @@ public Set getGlobalProperties() { public InterestingProperties filterByCodeAnnotations(OptimizerNode node, int input) { InterestingProperties iProps = new InterestingProperties(); - + SemanticProperties props = null; + if (node instanceof SingleInputNode) { + props = ((SingleInputOperator) node.getPactContract()).getSemanticProperties(); + } else if (node instanceof TwoInputNode) { + props = ((DualInputOperator) node.getPactContract()).getSemanticProperties(); + } + for (RequestedGlobalProperties rgp : this.globalProps) { - RequestedGlobalProperties filtered = rgp.filterBySemanticProperties(node, input); + RequestedGlobalProperties filtered = rgp.filterBySemanticProperties(props, input); if (filtered != null && !filtered.isTrivial()) { iProps.addGlobalProperties(filtered); } } for (RequestedLocalProperties rlp : this.localProps) { - RequestedLocalProperties filtered = rlp.filterBySemanticProperties(node, input); + RequestedLocalProperties filtered = rlp.filterBySemanticProperties(props, input); if (filtered != null && !filtered.isTrivial()) { iProps.addLocalProperties(filtered); } diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java index 7b8a8902d5b77..54d8b1656cbdd 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java @@ -15,9 +15,10 @@ import eu.stratosphere.api.common.operators.Ordering; +import eu.stratosphere.api.common.operators.SemanticProperties; import eu.stratosphere.api.common.operators.util.FieldList; import eu.stratosphere.api.common.operators.util.FieldSet; -import eu.stratosphere.compiler.dag.OptimizerNode; + import java.util.Set; import java.util.HashSet; @@ -121,12 +122,12 @@ public boolean isTrivial() { /** * Filters these properties by what can be preserved through a user function's constant fields set. * - * @param node The optimizer node that potentially modifies the properties. + * @param props The optimizer node that potentially modifies the properties. * @param input The input of the node which is relevant. - * + * * @return The filtered LocalProperties */ - public LocalProperties filterBySemanticProperties(OptimizerNode node, int input) { + public LocalProperties filterBySemanticProperties(SemanticProperties props, int input) { // check, whether the local order is preserved Ordering no = this.ordering; FieldList ngf = this.groupedFields; @@ -136,7 +137,7 @@ public LocalProperties filterBySemanticProperties(OptimizerNode node, int input) if (this.ordering != null) { final FieldList involvedIndexes = this.ordering.getInvolvedIndexes(); for (int i = 0; i < involvedIndexes.size(); i++) { - forwardList = node.getForwardField(input, involvedIndexes.get(i)) == null ? null : node.getForwardField(input, involvedIndexes.get(i)).toFieldList(); + forwardList = props.getForwardFields(input, involvedIndexes.get(i)) == null ? null : props.getForwardFields(input, involvedIndexes.get(i)).toFieldList(); if (forwardList == null) { if (i == 0) { @@ -156,7 +157,7 @@ public LocalProperties filterBySemanticProperties(OptimizerNode node, int input) else if (this.groupedFields != null) { // check, whether the local key grouping is preserved for (Integer index : this.groupedFields) { - forwardList = node.getForwardField(input, index) == null ? null : node.getForwardField(input, index).toFieldList(); + forwardList = props.getForwardFields(input, index) == null ? null : props.getForwardFields(input, index).toFieldList(); if (forwardList == null) { ngf = null; break; @@ -179,7 +180,7 @@ else if (this.groupedFields != null) { Set
s = new HashSet
(this.uniqueFields); for (FieldSet fields : this.uniqueFields) { for (Integer index : fields) { - forwardList = node.getForwardField(input, index) == null ? null : node.getForwardField(input, index).toFieldList(); + forwardList = props.getForwardFields(input, index) == null ? null : props.getForwardFields(input, index).toFieldList(); if (forwardList == null) { s.remove(fields); modified = true; diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java index 5641674624ec9..cea696b349cca 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java @@ -15,10 +15,10 @@ import eu.stratosphere.api.common.distributions.DataDistribution; import eu.stratosphere.api.common.operators.Ordering; +import eu.stratosphere.api.common.operators.SemanticProperties; import eu.stratosphere.api.common.operators.util.FieldList; import eu.stratosphere.api.common.operators.util.FieldSet; import eu.stratosphere.compiler.CompilerException; -import eu.stratosphere.compiler.dag.OptimizerNode; import eu.stratosphere.compiler.plan.Channel; import eu.stratosphere.compiler.util.Utils; import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; @@ -164,18 +164,19 @@ public void setOrdering(Ordering newOrdering) { * Filters these properties by what can be preserved by the given node when propagated down * to the given input. * - * @param node The node representing the contract. + * @param props The node representing the contract. * @param input The index of the input. * @return The filtered RequestedGlobalProperties */ - public RequestedGlobalProperties filterBySemanticProperties(OptimizerNode node, int input) { + public RequestedGlobalProperties filterBySemanticProperties(SemanticProperties props, int input) { FieldList sourceList; RequestedGlobalProperties returnProps = this; + // check if partitioning survives if (this.ordering != null) { for (int index : this.ordering.getInvolvedIndexes()) { - sourceList = node.getSourceField(input, index) == null ? null : node.getSourceField(input, index).toFieldList(); + sourceList = props.getSourceField(input, index) == null ? null : props.getSourceField(input, index).toFieldList(); if (sourceList != null) { if (!sourceList.contains(index)) { returnProps = returnProps == this ? this.clone() : returnProps; @@ -187,7 +188,7 @@ public RequestedGlobalProperties filterBySemanticProperties(OptimizerNode node, } } else if (this.partitioningFields != null) { for (Integer index : this.partitioningFields) { - sourceList = node.getSourceField(input, index) == null ? null : node.getSourceField(input, index).toFieldList(); + sourceList = props.getSourceField(input, index) == null ? null : props.getSourceField(input, index).toFieldList(); if (sourceList != null) { if (!sourceList.contains(index)) { returnProps = returnProps == this ? this.clone() : returnProps; diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java index f5ca55e5e2d06..36a0e9851eea3 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java @@ -16,9 +16,9 @@ import java.util.Arrays; import eu.stratosphere.api.common.operators.Ordering; +import eu.stratosphere.api.common.operators.SemanticProperties; import eu.stratosphere.api.common.operators.util.FieldList; import eu.stratosphere.api.common.operators.util.FieldSet; -import eu.stratosphere.compiler.dag.OptimizerNode; import eu.stratosphere.compiler.plan.Channel; import eu.stratosphere.compiler.util.Utils; import eu.stratosphere.pact.runtime.task.util.LocalStrategy; @@ -133,18 +133,18 @@ public void reset() { * Since interesting properties are filtered top-down, anything that partially destroys the ordering * makes the properties uninteresting. * - * @param node The optimizer node that potentially modifies the properties. + * @param props The optimizer node that potentially modifies the properties. * @param input The input of the node which is relevant. - * + * * @return The filtered RequestedLocalProperties */ - public RequestedLocalProperties filterBySemanticProperties(OptimizerNode node, int input) { + public RequestedLocalProperties filterBySemanticProperties(SemanticProperties props, int input) { FieldList sourceList; RequestedLocalProperties returnProps = this; if (this.ordering != null) { for (int index: this.ordering.getInvolvedIndexes()) { - sourceList = node.getSourceField(input, index) == null ? null : node.getSourceField(input, index).toFieldList(); + sourceList = props.getSourceField(input, index) == null ? null : props.getSourceField(input, index).toFieldList(); if (sourceList != null) { if (!sourceList.contains(index)) { returnProps = returnProps == this ? this.clone() : returnProps; @@ -157,7 +157,7 @@ public RequestedLocalProperties filterBySemanticProperties(OptimizerNode node, i } else if (this.groupedFields != null) { // check, whether the local key grouping is preserved for (Integer index : this.groupedFields) { - sourceList = node.getSourceField(input, index) == null ? null : node.getSourceField(input, index).toFieldList(); + sourceList = props.getSourceField(input, index) == null ? null : props.getSourceField(input, index).toFieldList(); if (sourceList != null) { if (!sourceList.contains(index)) { returnProps = returnProps == this ? this.clone() : returnProps; diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/GenericFlatTypePostPass.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/GenericFlatTypePostPass.java index 38dd7eccbb26b..324a2a274733c 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/GenericFlatTypePostPass.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/GenericFlatTypePostPass.java @@ -516,6 +516,7 @@ private void addSchemaToSchema(T sourceSchema, T targetSchema, OptimizerNode opt try { for (Map.Entry entry : sourceSchema) { Integer pos = entry.getKey(); + if (optNode.isFieldConstant(input, pos)) { targetSchema.addType(pos, entry.getValue()); } diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java index b7280fa99affe..3f9118759e6c9 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java @@ -238,7 +238,29 @@ public void setReadFields1(FieldSet readFields) { public FieldSet getReadFields1() { return this.readFields1; } - + + @Override + public FieldSet getForwardFields(int input, int field) { + if (input == 0) { + return this.getForwardedField1(field); + } else if (input == 1) { + return this.getForwardedField2(field); + } + return null; + } + + @Override + public FieldSet getSourceField(int input, int field) { + switch(input) { + case 0: + return this.getForwardedField1(field) != null ? this.getForwardedField1(field) : this.forwardedFrom1(field); + case 1: + return this.getForwardedField2(field) != null ? this.getForwardedField2(field) : this.forwardedFrom2(field); + default: + throw new IndexOutOfBoundsException(); + } + } + /** * Adds, to the existing information, field(s) that are read in * the source record(s) from the second input. diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SemanticProperties.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SemanticProperties.java index 50af69f917276..92f7ffc982f13 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SemanticProperties.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SemanticProperties.java @@ -42,16 +42,20 @@ public void addWrittenFields(FieldSet writtenFields) { this.writtenFields = this.writtenFields.addFields(writtenFields); } } - + + public abstract FieldSet getForwardFields(int input, int field); + + public abstract FieldSet getSourceField(int input, int field); + /** * Sets the field(s) that are written in the destination record(s). - * + * * @param writtenFields the position(s) in the destination record(s) */ public void setWrittenFields(FieldSet writtenFields) { this.writtenFields = writtenFields; } - + /** * Gets the field(s) in the destination record(s) that are written. * diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java index 86b77ced11732..efaf1a887012c 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java @@ -35,7 +35,23 @@ public class SingleInputSemanticProperties extends SemanticProperties { */ private FieldSet readFields; - + @Override + public FieldSet getForwardFields(int input, int field) { + if (input != 0) { + throw new IndexOutOfBoundsException(); + } + return this.getForwardedField(field); + } + + @Override + public FieldSet getSourceField(int input, int field) { + if (input != 0) { + throw new IndexOutOfBoundsException(); + } + + return this.getForwardedField(field) != null ? this.getForwardedField(field) : this.forwardedFrom(field); + } + public SingleInputSemanticProperties() { super(); this.init(); From c6359c8ae1655df4b15b5a2ea64522c881af1172 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Mon, 7 Jul 2014 15:39:31 +0200 Subject: [PATCH 4/4] fixed NullPointerException in some tests --- .../java/eu/stratosphere/compiler/dag/OptimizerNode.java | 1 - .../main/java/eu/stratosphere/compiler/dag/TwoInputNode.java | 5 ++--- .../compiler/dataproperties/GlobalProperties.java | 4 ++++ .../compiler/dataproperties/LocalProperties.java | 4 ++++ 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java index 50be64798dd71..264567ce4d5d8 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java @@ -25,7 +25,6 @@ import eu.stratosphere.api.common.operators.AbstractUdfOperator; import eu.stratosphere.api.common.operators.CompilerHints; import eu.stratosphere.api.common.operators.Operator; -import eu.stratosphere.api.common.operators.SemanticProperties; import eu.stratosphere.api.common.operators.util.FieldSet; import eu.stratosphere.compiler.CompilerException; import eu.stratosphere.compiler.DataStatistics; diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java index 06123d1a469c6..2772820877dba 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java @@ -680,16 +680,15 @@ public boolean isFieldConstant(int input, int fieldNumber) { if (semanticProperties == null) { return false; } - + FieldSet fs; switch(input) { case 0: - FieldSet fs; if ((fs = semanticProperties.getForwardedField1(fieldNumber)) != null) { return fs.contains(fieldNumber); } break; case 1: - FieldSet fs; + if ((fs = semanticProperties.getForwardedField2(fieldNumber)) != null) { return fs.contains(fieldNumber); } diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/GlobalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/GlobalProperties.java index 600be5d7490d3..fcca684511275 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/GlobalProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/GlobalProperties.java @@ -214,6 +214,10 @@ public GlobalProperties filterBySemanticProperties(SemanticProperties props, int FieldList forwardFields = null; GlobalProperties returnProps = this; + if (props == null) { + return new GlobalProperties(); + } + if (this.ordering != null) { for (int index : this.ordering.getInvolvedIndexes()) { forwardFields = props.getForwardFields(input, index) == null ? null: props.getForwardFields(input, index).toFieldList(); diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java index 54d8b1656cbdd..2797f52a21f2e 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java @@ -134,6 +134,10 @@ public LocalProperties filterBySemanticProperties(SemanticProperties props, int Set
nuf = this.uniqueFields; FieldList forwardList = null; + if (props == null) { + return new LocalProperties(); + } + if (this.ordering != null) { final FieldList involvedIndexes = this.ordering.getInvolvedIndexes(); for (int i = 0; i < involvedIndexes.size(); i++) {