diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java index bf3d6af9fea79..68ed499bb14bd 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java @@ -960,6 +960,7 @@ public static final class InterestingPropertyVisitor implements Visitor getAlternativePlans(CostEstimator estimator) { public boolean isFieldConstant(int input, int fieldNumber) { return false; } - + // -------------------------------------------------------------------------------------------- // Miscellaneous // -------------------------------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java index b6d6b715c29b1..218938042cc16 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java @@ -191,7 +191,8 @@ public List getAlternativePlans(CostEstimator estimator) { public boolean isFieldConstant(int input, int fieldNumber) { return false; } - + + @Override public void accept(Visitor visitor) { if (visitor.preVisit(this)) { diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java index 85a6568c8b9d9..264567ce4d5d8 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java @@ -262,15 +262,15 @@ protected List computeUnclosedBranchStackForBroadcastI */ @Override public abstract void accept(Visitor visitor); - - /** - * Checks whether a field is modified by the user code or whether it is kept unchanged. - * - * @param input The input number. - * @param fieldNumber The position of the field. - * - * @return True if the field is not changed by the user function, false otherwise. - */ + + /* + * Checks whether a field is modified by the user code or whether it is kept unchanged. + * + * @param input The input number. + * @param fieldNumber The position of the field. + * + * @return True if the field is not changed by the user function, false otherwise. + */ public abstract boolean isFieldConstant(int input, int fieldNumber); // ------------------------------------------------------------------------ @@ -678,7 +678,7 @@ protected int[] getConstantKeySet(int input) { } for (int keyColumn : keyColumns) { if (!isFieldConstant(input, keyColumn)) { - return null; + return null; } } return keyColumns; diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java index 0b872a7742fe1..34ff3a500d4d5 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java @@ -27,6 +27,7 @@ import com.google.common.collect.Sets; import eu.stratosphere.api.common.operators.Operator; +import eu.stratosphere.api.common.operators.SemanticProperties; import eu.stratosphere.api.common.operators.SingleInputOperator; import eu.stratosphere.api.common.operators.SingleInputSemanticProperties; import eu.stratosphere.api.common.operators.util.FieldSet; @@ -155,7 +156,6 @@ public boolean isFieldConstant(int input, int fieldNumber) { return false; } - @Override public void setInput(Map, OptimizerNode> contractToNode) throws CompilerException { @@ -425,10 +425,11 @@ protected void instantiateCandidate(OperatorDescriptorSingle dps, Channel in, Li LocalProperties lProps = in.getLocalProperties().clone(); gProps = dps.computeGlobalProperties(gProps); lProps = dps.computeLocalProperties(lProps); - + + SemanticProperties props = this.getPactContract().getSemanticProperties(); // filter by the user code field copies - gProps = gProps.filterByNodesConstantSet(this, 0); - lProps = lProps.filterByNodesConstantSet(this, 0); + gProps = gProps.filterBySemanticProperties(props, 0); + lProps = lProps.filterBySemanticProperties(props, 0); // apply node.initProperties(gProps, lProps); diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java index 97a92d0cdb9f9..2772820877dba 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java @@ -29,6 +29,7 @@ import eu.stratosphere.api.common.operators.DualInputOperator; import eu.stratosphere.api.common.operators.DualInputSemanticProperties; import eu.stratosphere.api.common.operators.Operator; +import eu.stratosphere.api.common.operators.SemanticProperties; import eu.stratosphere.api.common.operators.util.FieldList; import eu.stratosphere.api.common.operators.util.FieldSet; import eu.stratosphere.compiler.CompilerException; @@ -533,13 +534,14 @@ protected void instantiate(OperatorDescriptorDual operator, Channel in1, Channel DualInputPlanNode node = operator.instantiate(in1, in2, this); node.setBroadcastInputs(broadcastChannelsCombination); - - GlobalProperties gp1 = in1.getGlobalProperties().clone().filterByNodesConstantSet(this, 0); - GlobalProperties gp2 = in2.getGlobalProperties().clone().filterByNodesConstantSet(this, 1); + + SemanticProperties props = this.getPactContract().getSemanticProperties(); + GlobalProperties gp1 = in1.getGlobalProperties().clone().filterBySemanticProperties(props, 0); + GlobalProperties gp2 = in2.getGlobalProperties().clone().filterBySemanticProperties(props, 1); GlobalProperties combined = operator.computeGlobalProperties(gp1, gp2); - LocalProperties lp1 = in1.getLocalProperties().clone().filterByNodesConstantSet(this, 0); - LocalProperties lp2 = in2.getLocalProperties().clone().filterByNodesConstantSet(this, 1); + LocalProperties lp1 = in1.getLocalProperties().clone().filterBySemanticProperties(props, 0); + LocalProperties lp2 = in2.getLocalProperties().clone().filterBySemanticProperties(props, 1); LocalProperties locals = operator.computeLocalProperties(lp1, lp2); node.initProperties(combined, locals); @@ -674,22 +676,21 @@ public FieldList getInputKeySet(int input) { public boolean isFieldConstant(int input, int fieldNumber) { DualInputOperator c = getPactContract(); DualInputSemanticProperties semanticProperties = c.getSemanticProperties(); - + + if (semanticProperties == null) { + return false; + } + FieldSet fs; switch(input) { case 0: - if (semanticProperties != null) { - FieldSet fs; - if ((fs = semanticProperties.getForwardedField1(fieldNumber)) != null) { - return fs.contains(fieldNumber); - } + if ((fs = semanticProperties.getForwardedField1(fieldNumber)) != null) { + return fs.contains(fieldNumber); } break; case 1: - if(semanticProperties != null) { - FieldSet fs; - if ((fs = semanticProperties.getForwardedField2(fieldNumber)) != null) { - return fs.contains(fieldNumber); - } + + if ((fs = semanticProperties.getForwardedField2(fieldNumber)) != null) { + return fs.contains(fieldNumber); } break; default: @@ -698,8 +699,7 @@ public boolean isFieldConstant(int input, int fieldNumber) { return false; } - - + // -------------------------------------------------------------------------------------------- // Miscellaneous // -------------------------------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java index f425695ae4f4c..72b6f883416b3 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java @@ -203,7 +203,7 @@ public String getName() { public boolean isFieldConstant(int input, int fieldNumber) { return false; } - + protected void readStubAnnotations() {} @Override diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/GlobalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/GlobalProperties.java index 61b7c2a9659d9..fcca684511275 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/GlobalProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/GlobalProperties.java @@ -19,10 +19,10 @@ import eu.stratosphere.api.common.operators.Order; import eu.stratosphere.api.common.operators.Ordering; +import eu.stratosphere.api.common.operators.SemanticProperties; import eu.stratosphere.api.common.operators.util.FieldList; import eu.stratosphere.api.common.operators.util.FieldSet; import eu.stratosphere.compiler.CompilerException; -import eu.stratosphere.compiler.dag.OptimizerNode; import eu.stratosphere.compiler.plan.Channel; import eu.stratosphere.compiler.util.Utils; import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; @@ -190,26 +190,61 @@ public void reset() { this.partitioningFields = null; } + public Ordering getOrdering() { + return this.ordering; + } + + public void setOrdering(Ordering ordering) { + this.ordering = ordering; + } + + public void setPartitioningFields(FieldList partitioningFields) { + this.partitioningFields = partitioningFields; + } + /** - * Filters these properties by what can be preserved through the given output contract. - * - * @param contract - * The output contract. - * @return True, if any non-default value is preserved, false otherwise. + * Filters these GlobalProperties by the fields that are constant or forwarded to another output field. + * + * @param props The node representing the contract. + * @param input The index of the input. + * @return The filtered GlobalProperties */ - public GlobalProperties filterByNodesConstantSet(OptimizerNode node, int input) { + public GlobalProperties filterBySemanticProperties(SemanticProperties props, int input) { // check if partitioning survives + FieldList forwardFields = null; + GlobalProperties returnProps = this; + + if (props == null) { + return new GlobalProperties(); + } + if (this.ordering != null) { - for (int col : this.ordering.getInvolvedIndexes()) { - if (!node.isFieldConstant(input, col)) { - return new GlobalProperties(); + for (int index : this.ordering.getInvolvedIndexes()) { + forwardFields = props.getForwardFields(input, index) == null ? null: props.getForwardFields(input, index).toFieldList(); + if (forwardFields == null) { + returnProps = new GlobalProperties(); + } else if (!forwardFields.contains(index)) { + returnProps = returnProps == this ? this.clone() : returnProps; + returnProps.setOrdering(returnProps.getOrdering().replaceOrdering(index, forwardFields.get(0))); } } } if (this.partitioningFields != null) { - for (int colIndex : this.partitioningFields) { - if (!node.isFieldConstant(input, colIndex)) { - return new GlobalProperties(); + for (int index : this.partitioningFields) { + forwardFields = props.getForwardFields(input, index) == null ? null: props.getForwardFields(input, index).toFieldList(); + if (forwardFields == null) { + returnProps = new GlobalProperties(); + } else if (!forwardFields.contains(index)) { + returnProps = returnProps == this ? this.clone() : returnProps; + FieldList partitioned = new FieldList(); + for (Integer value : returnProps.getPartitioningFields()) { + if (value.intValue() == index) { + partitioned = partitioned.addFields(forwardFields); + } else { + partitioned = partitioned.addField(value); + } + } + returnProps.setPartitioningFields(partitioned); } } } @@ -220,7 +255,7 @@ public GlobalProperties filterByNodesConstantSet(OptimizerNode node, int input) for (Iterator
combos = newSet.iterator(); combos.hasNext(); ){ FieldSet current = combos.next(); for (Integer field : current) { - if (!node.isFieldConstant(input, field)) { + if (!props.getForwardFields(input, field).contains(field)) { combos.remove(); break; } @@ -238,7 +273,7 @@ public GlobalProperties filterByNodesConstantSet(OptimizerNode node, int input) return new GlobalProperties(); } - return this; + return returnProps; } public void parameterizeChannel(Channel channel, boolean globalDopChange) { diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/InterestingProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/InterestingProperties.java index 4f2d06b3fdfdb..1259d1480c156 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/InterestingProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/InterestingProperties.java @@ -17,7 +17,12 @@ import java.util.Iterator; import java.util.Set; +import eu.stratosphere.api.common.operators.DualInputOperator; +import eu.stratosphere.api.common.operators.SemanticProperties; +import eu.stratosphere.api.common.operators.SingleInputOperator; import eu.stratosphere.compiler.dag.OptimizerNode; +import eu.stratosphere.compiler.dag.SingleInputNode; +import eu.stratosphere.compiler.dag.TwoInputNode; /** * The interesting properties that a node in the optimizer plan hands to its predecessors. It has the @@ -40,9 +45,7 @@ public InterestingProperties() { /** * Private constructor for cloning purposes. - * - * @param maxCostsGlobal The maximal costs for the global properties. - * @param maxCostsLocal The maximal costs for the local properties. + * * @param globalProps The global properties for this new object. * @param localProps The local properties for this new object. */ @@ -88,15 +91,21 @@ public Set getGlobalProperties() { public InterestingProperties filterByCodeAnnotations(OptimizerNode node, int input) { InterestingProperties iProps = new InterestingProperties(); - + SemanticProperties props = null; + if (node instanceof SingleInputNode) { + props = ((SingleInputOperator) node.getPactContract()).getSemanticProperties(); + } else if (node instanceof TwoInputNode) { + props = ((DualInputOperator) node.getPactContract()).getSemanticProperties(); + } + for (RequestedGlobalProperties rgp : this.globalProps) { - RequestedGlobalProperties filtered = rgp.filterByNodesConstantSet(node, input); + RequestedGlobalProperties filtered = rgp.filterBySemanticProperties(props, input); if (filtered != null && !filtered.isTrivial()) { iProps.addGlobalProperties(filtered); } } for (RequestedLocalProperties rlp : this.localProps) { - RequestedLocalProperties filtered = rlp.filterByNodesConstantSet(node, input); + RequestedLocalProperties filtered = rlp.filterBySemanticProperties(props, input); if (filtered != null && !filtered.isTrivial()) { iProps.addLocalProperties(filtered); } diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java index f49c1300a6bff..2797f52a21f2e 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java @@ -13,13 +13,14 @@ package eu.stratosphere.compiler.dataproperties; -import java.util.HashSet; -import java.util.Set; import eu.stratosphere.api.common.operators.Ordering; +import eu.stratosphere.api.common.operators.SemanticProperties; import eu.stratosphere.api.common.operators.util.FieldList; import eu.stratosphere.api.common.operators.util.FieldSet; -import eu.stratosphere.compiler.dag.OptimizerNode; + +import java.util.Set; +import java.util.HashSet; /** * This class represents local properties of the data. A local property is a property that exists @@ -121,21 +122,28 @@ public boolean isTrivial() { /** * Filters these properties by what can be preserved through a user function's constant fields set. * - * @param node The optimizer node that potentially modifies the properties. + * @param props The optimizer node that potentially modifies the properties. * @param input The input of the node which is relevant. - * - * @return True, if the resulting properties are non trivial. + * + * @return The filtered LocalProperties */ - public LocalProperties filterByNodesConstantSet(OptimizerNode node, int input) { + public LocalProperties filterBySemanticProperties(SemanticProperties props, int input) { // check, whether the local order is preserved Ordering no = this.ordering; FieldList ngf = this.groupedFields; Set
nuf = this.uniqueFields; - + FieldList forwardList = null; + + if (props == null) { + return new LocalProperties(); + } + if (this.ordering != null) { final FieldList involvedIndexes = this.ordering.getInvolvedIndexes(); for (int i = 0; i < involvedIndexes.size(); i++) { - if (!node.isFieldConstant(input, involvedIndexes.get(i))) { + forwardList = props.getForwardFields(input, involvedIndexes.get(i)) == null ? null : props.getForwardFields(input, involvedIndexes.get(i)).toFieldList(); + + if (forwardList == null) { if (i == 0) { no = null; ngf = null; @@ -144,29 +152,59 @@ public LocalProperties filterByNodesConstantSet(OptimizerNode node, int input) { ngf = no.getInvolvedIndexes(); } break; + } else if (!forwardList.contains(involvedIndexes.get(i))) { + no = this.getOrdering().replaceOrdering(involvedIndexes.get(i), forwardList.get(0)); + ngf = no.getInvolvedIndexes(); } } } else if (this.groupedFields != null) { // check, whether the local key grouping is preserved for (Integer index : this.groupedFields) { - if (!node.isFieldConstant(input, index)) { + forwardList = props.getForwardFields(input, index) == null ? null : props.getForwardFields(input, index).toFieldList(); + if (forwardList == null) { ngf = null; + break; + } else if (!forwardList.contains(index)) { + FieldList grouped = new FieldList(); + for (Integer value : ngf.toFieldList()) { + if (value.intValue() == index) { + grouped = grouped.addFields(forwardList); + } else { + grouped = grouped.addField(value); + } + } } } } - if (this.uniqueFields != null && this.uniqueFields.size() > 0) { + // check, whether the local key grouping is preserved + if (this.uniqueFields != null) { + boolean modified = false; Set
s = new HashSet
(this.uniqueFields); for (FieldSet fields : this.uniqueFields) { for (Integer index : fields) { - if (!node.isFieldConstant(input, index)) { + forwardList = props.getForwardFields(input, index) == null ? null : props.getForwardFields(input, index).toFieldList(); + if (forwardList == null) { s.remove(fields); + modified = true; break; + } else if (!forwardList.contains(index)) { + FieldList tmp = new FieldList(); + for (Integer i: fields) { + if (i != index) { + tmp.addField(i); + } else { + tmp.addFields(forwardList); + } + } + s.remove(fields); + s.add(tmp); + modified = true; } } } - if (s.size() != this.uniqueFields.size()) { + if (modified) { nuf = s; } } diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java index e769508cecd1e..cea696b349cca 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java @@ -15,9 +15,10 @@ import eu.stratosphere.api.common.distributions.DataDistribution; import eu.stratosphere.api.common.operators.Ordering; +import eu.stratosphere.api.common.operators.SemanticProperties; +import eu.stratosphere.api.common.operators.util.FieldList; import eu.stratosphere.api.common.operators.util.FieldSet; import eu.stratosphere.compiler.CompilerException; -import eu.stratosphere.compiler.dag.OptimizerNode; import eu.stratosphere.compiler.plan.Channel; import eu.stratosphere.compiler.util.Utils; import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; @@ -152,25 +153,56 @@ public void reset() { this.partitioningFields = null; } + public void setPartitioningFields(FieldSet partitioned) { + this.partitioningFields = partitioned; + } + + public void setOrdering(Ordering newOrdering) { + this.ordering = newOrdering; + } /** * Filters these properties by what can be preserved by the given node when propagated down * to the given input. * - * @param node The node representing the contract. + * @param props The node representing the contract. * @param input The index of the input. - * @return True, if any non-default value is preserved, false otherwise. + * @return The filtered RequestedGlobalProperties */ - public RequestedGlobalProperties filterByNodesConstantSet(OptimizerNode node, int input) { + public RequestedGlobalProperties filterBySemanticProperties(SemanticProperties props, int input) { + FieldList sourceList; + RequestedGlobalProperties returnProps = this; + + // check if partitioning survives if (this.ordering != null) { - for (int col : this.ordering.getInvolvedIndexes()) { - if (!node.isFieldConstant(input, col)) { + for (int index : this.ordering.getInvolvedIndexes()) { + sourceList = props.getSourceField(input, index) == null ? null : props.getSourceField(input, index).toFieldList(); + if (sourceList != null) { + if (!sourceList.contains(index)) { + returnProps = returnProps == this ? this.clone() : returnProps; + returnProps.setOrdering(returnProps.getOrdering().replaceOrdering(index, sourceList.get(0))); + } + } else { return null; } } } else if (this.partitioningFields != null) { - for (int colIndex : this.partitioningFields) { - if (!node.isFieldConstant(input, colIndex)) { + for (Integer index : this.partitioningFields) { + sourceList = props.getSourceField(input, index) == null ? null : props.getSourceField(input, index).toFieldList(); + if (sourceList != null) { + if (!sourceList.contains(index)) { + returnProps = returnProps == this ? this.clone() : returnProps; + FieldList partitioned = new FieldList(); + for (Integer value : returnProps.getPartitionedFields()) { + if (value.intValue() == index) { + partitioned = partitioned.addFields(sourceList); + } else { + partitioned = partitioned.addField(value); + } + } + returnProps.setPartitioningFields(partitioned); + } + } else { return null; } } diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java index aeae0d2f8f2c8..36a0e9851eea3 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java @@ -16,9 +16,9 @@ import java.util.Arrays; import eu.stratosphere.api.common.operators.Ordering; +import eu.stratosphere.api.common.operators.SemanticProperties; import eu.stratosphere.api.common.operators.util.FieldList; import eu.stratosphere.api.common.operators.util.FieldSet; -import eu.stratosphere.compiler.dag.OptimizerNode; import eu.stratosphere.compiler.plan.Channel; import eu.stratosphere.compiler.util.Utils; import eu.stratosphere.pact.runtime.task.util.LocalStrategy; @@ -133,28 +133,50 @@ public void reset() { * Since interesting properties are filtered top-down, anything that partially destroys the ordering * makes the properties uninteresting. * - * @param node The optimizer node that potentially modifies the properties. + * @param props The optimizer node that potentially modifies the properties. * @param input The input of the node which is relevant. - * - * @return True, if the resulting properties are non trivial. + * + * @return The filtered RequestedLocalProperties */ - public RequestedLocalProperties filterByNodesConstantSet(OptimizerNode node, int input) { + public RequestedLocalProperties filterBySemanticProperties(SemanticProperties props, int input) { + FieldList sourceList; + RequestedLocalProperties returnProps = this; + if (this.ordering != null) { - final FieldList involvedIndexes = this.ordering.getInvolvedIndexes(); - for (int i = 0; i < involvedIndexes.size(); i++) { - if (!node.isFieldConstant(input, involvedIndexes.get(i))) { + for (int index: this.ordering.getInvolvedIndexes()) { + sourceList = props.getSourceField(input, index) == null ? null : props.getSourceField(input, index).toFieldList(); + if (sourceList != null) { + if (!sourceList.contains(index)) { + returnProps = returnProps == this ? this.clone() : returnProps; + returnProps.setOrdering(returnProps.getOrdering().replaceOrdering(index, sourceList.get(0))); + } + } else { return null; } } } else if (this.groupedFields != null) { // check, whether the local key grouping is preserved for (Integer index : this.groupedFields) { - if (!node.isFieldConstant(input, index)) { + sourceList = props.getSourceField(input, index) == null ? null : props.getSourceField(input, index).toFieldList(); + if (sourceList != null) { + if (!sourceList.contains(index)) { + returnProps = returnProps == this ? this.clone() : returnProps; + FieldList grouped = new FieldList(); + for (Integer value : returnProps.getGroupedFields()) { + if (value.intValue() == index) { + grouped = grouped.addFields(sourceList); + } else { + grouped = grouped.addField(value); + } + } + returnProps.setGroupedFields(grouped); + } + } else { return null; } } } - return this; + return returnProps; } /** diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/GenericFlatTypePostPass.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/GenericFlatTypePostPass.java index 38dd7eccbb26b..324a2a274733c 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/GenericFlatTypePostPass.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/GenericFlatTypePostPass.java @@ -516,6 +516,7 @@ private void addSchemaToSchema(T sourceSchema, T targetSchema, OptimizerNode opt try { for (Map.Entry entry : sourceSchema) { Integer pos = entry.getKey(); + if (optNode.isFieldConstant(input, pos)) { targetSchema.addType(pos, entry.getValue()); } diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/SemanticPropOptimizerTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/SemanticPropOptimizerTest.java new file mode 100644 index 0000000000000..8da557b7f9dc6 --- /dev/null +++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/SemanticPropOptimizerTest.java @@ -0,0 +1,163 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.pact.compiler; + +import eu.stratosphere.api.common.operators.base.JoinOperatorBase; +import eu.stratosphere.api.common.operators.base.MapOperatorBase; +import eu.stratosphere.api.common.operators.base.ReduceOperatorBase; +import eu.stratosphere.api.common.operators.util.FieldSet; +import eu.stratosphere.api.java.DataSet; +import eu.stratosphere.api.java.ExecutionEnvironment; +import eu.stratosphere.api.java.functions.JoinFunction; +import eu.stratosphere.api.java.functions.MapFunction; +import eu.stratosphere.api.java.functions.ReduceFunction; +import eu.stratosphere.api.java.operators.translation.JavaPlan; +import eu.stratosphere.api.java.tuple.Tuple3; +import eu.stratosphere.compiler.dataproperties.GlobalProperties; +import eu.stratosphere.compiler.dataproperties.LocalProperties; +import eu.stratosphere.compiler.dataproperties.PartitioningProperty; +import eu.stratosphere.compiler.plan.Channel; +import eu.stratosphere.compiler.plan.DualInputPlanNode; +import eu.stratosphere.compiler.plan.OptimizedPlan; +import eu.stratosphere.compiler.plan.PlanNode; +import eu.stratosphere.compiler.plan.SingleInputPlanNode; +import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; +import eu.stratosphere.util.Visitor; +import org.junit.Assert; +import org.junit.Test; + +public class SemanticPropOptimizerTest extends CompilerTestBase { + + public static class SimpleReducer extends ReduceFunction> { + @Override + public Tuple3 reduce(Tuple3 value1, Tuple3 value2) throws Exception { + return null; + } + } + + public static class SimpleMap extends MapFunction, Tuple3> { + @Override + public Tuple3 map(Tuple3 value) throws Exception { + return null; + } + } + + @Test + public void forwardFieldsTestMapReduce() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> set = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + set = set.map(new SimpleMap()).withConstantSet("*") + .groupBy(0) + .reduce(new SimpleReducer()).withConstantSet("0->1") + .map(new SimpleMap()).withConstantSet("*") + .groupBy(1) + .reduce(new SimpleReducer()).withConstantSet("*"); + + set.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + oPlan.accept(new Visitor() { + @Override + public boolean preVisit(PlanNode visitable) { + if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof ReduceOperatorBase) { + for (Channel input: visitable.getInputs()) { + GlobalProperties gprops = visitable.getGlobalProperties(); + LocalProperties lprops = visitable.getLocalProperties(); + + Assert.assertTrue("Reduce should just forward the input if it is already partitioned", + input.getShipStrategy() == ShipStrategyType.FORWARD); + Assert.assertTrue("Wrong GlobalProperties on Reducer", + gprops.isPartitionedOnFields(new FieldSet(1))); + Assert.assertTrue("Wrong GlobalProperties on Reducer", + gprops.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); + Assert.assertTrue("Wrong LocalProperties on Reducer", + lprops.getGroupedFields().contains(1)); + } + } + if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof MapOperatorBase) { + for (Channel input: visitable.getInputs()) { + GlobalProperties gprops = visitable.getGlobalProperties(); + LocalProperties lprops = visitable.getLocalProperties(); + + Assert.assertTrue("Map should just forward the input if it is already partitioned", + input.getShipStrategy() == ShipStrategyType.FORWARD); + Assert.assertTrue("Wrong GlobalProperties on Mapper", + gprops.isPartitionedOnFields(new FieldSet(1))); + Assert.assertTrue("Wrong GlobalProperties on Mapper", + gprops.getPartitioning() == PartitioningProperty.HASH_PARTITIONED); + Assert.assertTrue("Wrong LocalProperties on Mapper", + lprops.getGroupedFields().contains(1)); + } + return false; + } + return true; + } + + @Override + public void postVisit(PlanNode visitable) { + + } + }); + } + + @Test + public void forwardFieldsTestJoin() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> in1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet> in2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + in1 = in1.map(new SimpleMap()).withConstantSet("*") + .groupBy(0) + .reduce(new SimpleReducer()).withConstantSet("0->1"); + in2 = in2.map(new SimpleMap()).withConstantSet("*") + .groupBy(1) + .reduce(new SimpleReducer()).withConstantSet("1->2"); + DataSet> out = in1.join(in2).where(1).equalTo(2).with(new JoinFunction, Tuple3, Tuple3>() { + @Override + public Tuple3 join(Tuple3 first, Tuple3 second) throws Exception { + return null; + } + }); + + out.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + oPlan.accept(new Visitor() { + @Override + public boolean preVisit(PlanNode visitable) { + if (visitable instanceof DualInputPlanNode && visitable.getPactContract() instanceof JoinOperatorBase) { + DualInputPlanNode node = ((DualInputPlanNode) visitable); + + final Channel inConn1 = node.getInput1(); + final Channel inConn2 = node.getInput2(); + + Assert.assertTrue("Join should just forward the input if it is already partitioned", + inConn1.getShipStrategy() == ShipStrategyType.FORWARD); + Assert.assertTrue("Join should just forward the input if it is already partitioned", + inConn2.getShipStrategy() == ShipStrategyType.FORWARD); + return false; + } + return true; + } + + @Override + public void postVisit(PlanNode visitable) { + + } + }); + } +} diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java index 1ed399a5d8acb..3f9118759e6c9 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java @@ -53,7 +53,40 @@ public DualInputSemanticProperties() { super(); this.init(); } - + + /** + * Finds the source field where the given field was forwarded from. + * @param dest The destination field in the output data. + * @return FieldSet containing the source input fields. + */ + public FieldSet forwardedFrom1(int dest) { + FieldSet fs = null; + for (Map.Entry entry : forwardedFields1.entrySet()) { + if (entry.getValue().contains(dest)) { + if (fs == null) { + fs = new FieldSet(); + } + + fs = fs.addField(entry.getKey()); + } + } + return fs; + } + + public FieldSet forwardedFrom2(int dest) { + FieldSet fs = null; + for (Map.Entry entry : forwardedFields2.entrySet()) { + if (entry.getValue().contains(dest)) { + if (fs == null) { + fs = new FieldSet(); + } + + fs = fs.addField(entry.getKey()); + } + } + return fs; + } + /** * Adds, to the existing information, a field that is forwarded directly * from the source record(s) in the first input to the destination @@ -205,7 +238,29 @@ public void setReadFields1(FieldSet readFields) { public FieldSet getReadFields1() { return this.readFields1; } - + + @Override + public FieldSet getForwardFields(int input, int field) { + if (input == 0) { + return this.getForwardedField1(field); + } else if (input == 1) { + return this.getForwardedField2(field); + } + return null; + } + + @Override + public FieldSet getSourceField(int input, int field) { + switch(input) { + case 0: + return this.getForwardedField1(field) != null ? this.getForwardedField1(field) : this.forwardedFrom1(field); + case 1: + return this.getForwardedField2(field) != null ? this.getForwardedField2(field) : this.forwardedFrom2(field); + default: + throw new IndexOutOfBoundsException(); + } + } + /** * Adds, to the existing information, field(s) that are read in * the source record(s) from the second input. diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/Ordering.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/Ordering.java index a9895ecd91c46..daf51c74856b4 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/Ordering.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/Ordering.java @@ -71,7 +71,21 @@ public Ordering appendOrdering(Integer index, Class> type, Orde this.orders.add(order); return this; } - + + public Ordering replaceOrdering(int oldField, int newField) { + Ordering newOrdering = new Ordering(); + + for (int i = 0; i < indexes.size(); i++) { + if (indexes.get(i).intValue() == oldField) { + newOrdering.appendOrdering(newField, this.types.get(i), this.orders.get(i)); + } else { + newOrdering.appendOrdering(this.indexes.get(i), this.types.get(i), this.orders.get(i)); + } + } + + return newOrdering; + } + // -------------------------------------------------------------------------------------------- public int getNumberOfFields() { diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SemanticProperties.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SemanticProperties.java index 50af69f917276..92f7ffc982f13 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SemanticProperties.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SemanticProperties.java @@ -42,16 +42,20 @@ public void addWrittenFields(FieldSet writtenFields) { this.writtenFields = this.writtenFields.addFields(writtenFields); } } - + + public abstract FieldSet getForwardFields(int input, int field); + + public abstract FieldSet getSourceField(int input, int field); + /** * Sets the field(s) that are written in the destination record(s). - * + * * @param writtenFields the position(s) in the destination record(s) */ public void setWrittenFields(FieldSet writtenFields) { this.writtenFields = writtenFields; } - + /** * Gets the field(s) in the destination record(s) that are written. * diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java index f1d098a09cbfc..efaf1a887012c 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java @@ -35,11 +35,41 @@ public class SingleInputSemanticProperties extends SemanticProperties { */ private FieldSet readFields; - + @Override + public FieldSet getForwardFields(int input, int field) { + if (input != 0) { + throw new IndexOutOfBoundsException(); + } + return this.getForwardedField(field); + } + + @Override + public FieldSet getSourceField(int input, int field) { + if (input != 0) { + throw new IndexOutOfBoundsException(); + } + + return this.getForwardedField(field) != null ? this.getForwardedField(field) : this.forwardedFrom(field); + } + public SingleInputSemanticProperties() { super(); this.init(); } + + public FieldSet forwardedFrom(int dest) { + FieldSet fs = null; + for (Map.Entry entry : forwardedFields.entrySet()) { + if (entry.getValue().contains(dest)) { + if (fs == null) { + fs = new FieldSet(); + } + + fs = fs.addField(entry.getKey()); + } + } + return fs; + } /** * Adds, to the existing information, a field that is forwarded directly