Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,7 @@ public static final class InterestingPropertyVisitor implements Visitor<Optimize

private CostEstimator estimator; // the cost estimator for maximal costs of an interesting property


/**
* Creates a new visitor that computes the interesting properties for all nodes in the plan.
* It uses the given cost estimator used to compute the maximal costs for an interesting property.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected void copyEstimates(OptimizerNode node) {
public abstract IterationNode getIterationNode();

// --------------------------------------------------------------------------------------------

public boolean isOnDynamicPath() {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ protected void readStubAnnotations() {}
public boolean isFieldConstant(int input, int fieldNumber) {
return true;
}

@Override
public void computeOutputEstimates(DataStatistics statistics) {
OptimizerNode in1 = getFirstPredecessorNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public String getName() {
public boolean isFieldConstant(int input, int fieldNumber) {
return false;
}

protected void readStubAnnotations() {}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
public boolean isFieldConstant(int input, int fieldNumber) {
return false;
}

// --------------------------------------------------------------------------------------------
// Miscellaneous
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
public boolean isFieldConstant(int input, int fieldNumber) {
return false;
}



@Override
public void accept(Visitor<OptimizerNode> visitor) {
if (visitor.preVisit(this)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,15 +262,15 @@ protected List<UnclosedBranchDescriptor> computeUnclosedBranchStackForBroadcastI
*/
@Override
public abstract void accept(Visitor<OptimizerNode> visitor);
/**
* Checks whether a field is modified by the user code or whether it is kept unchanged.
*
* @param input The input number.
* @param fieldNumber The position of the field.
*
* @return True if the field is not changed by the user function, false otherwise.
*/

/*
* Checks whether a field is modified by the user code or whether it is kept unchanged.
*
* @param input The input number.
* @param fieldNumber The position of the field.
*
* @return True if the field is not changed by the user function, false otherwise.
*/
public abstract boolean isFieldConstant(int input, int fieldNumber);

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -678,7 +678,7 @@ protected int[] getConstantKeySet(int input) {
}
for (int keyColumn : keyColumns) {
if (!isFieldConstant(input, keyColumn)) {
return null;
return null;
}
}
return keyColumns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.Sets;

import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.common.operators.SemanticProperties;
import eu.stratosphere.api.common.operators.SingleInputOperator;
import eu.stratosphere.api.common.operators.SingleInputSemanticProperties;
import eu.stratosphere.api.common.operators.util.FieldSet;
Expand Down Expand Up @@ -155,7 +156,6 @@ public boolean isFieldConstant(int input, int fieldNumber) {

return false;
}


@Override
public void setInput(Map<Operator<?>, OptimizerNode> contractToNode) throws CompilerException {
Expand Down Expand Up @@ -425,10 +425,11 @@ protected void instantiateCandidate(OperatorDescriptorSingle dps, Channel in, Li
LocalProperties lProps = in.getLocalProperties().clone();
gProps = dps.computeGlobalProperties(gProps);
lProps = dps.computeLocalProperties(lProps);


SemanticProperties props = this.getPactContract().getSemanticProperties();
// filter by the user code field copies
gProps = gProps.filterByNodesConstantSet(this, 0);
lProps = lProps.filterByNodesConstantSet(this, 0);
gProps = gProps.filterBySemanticProperties(props, 0);
lProps = lProps.filterBySemanticProperties(props, 0);

// apply
node.initProperties(gProps, lProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import eu.stratosphere.api.common.operators.DualInputOperator;
import eu.stratosphere.api.common.operators.DualInputSemanticProperties;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.common.operators.SemanticProperties;
import eu.stratosphere.api.common.operators.util.FieldList;
import eu.stratosphere.api.common.operators.util.FieldSet;
import eu.stratosphere.compiler.CompilerException;
Expand Down Expand Up @@ -533,13 +534,14 @@ protected void instantiate(OperatorDescriptorDual operator, Channel in1, Channel

DualInputPlanNode node = operator.instantiate(in1, in2, this);
node.setBroadcastInputs(broadcastChannelsCombination);

GlobalProperties gp1 = in1.getGlobalProperties().clone().filterByNodesConstantSet(this, 0);
GlobalProperties gp2 = in2.getGlobalProperties().clone().filterByNodesConstantSet(this, 1);

SemanticProperties props = this.getPactContract().getSemanticProperties();
GlobalProperties gp1 = in1.getGlobalProperties().clone().filterBySemanticProperties(props, 0);
GlobalProperties gp2 = in2.getGlobalProperties().clone().filterBySemanticProperties(props, 1);
GlobalProperties combined = operator.computeGlobalProperties(gp1, gp2);

LocalProperties lp1 = in1.getLocalProperties().clone().filterByNodesConstantSet(this, 0);
LocalProperties lp2 = in2.getLocalProperties().clone().filterByNodesConstantSet(this, 1);
LocalProperties lp1 = in1.getLocalProperties().clone().filterBySemanticProperties(props, 0);
LocalProperties lp2 = in2.getLocalProperties().clone().filterBySemanticProperties(props, 1);
LocalProperties locals = operator.computeLocalProperties(lp1, lp2);

node.initProperties(combined, locals);
Expand Down Expand Up @@ -674,22 +676,21 @@ public FieldList getInputKeySet(int input) {
public boolean isFieldConstant(int input, int fieldNumber) {
DualInputOperator<?, ?, ?, ?> c = getPactContract();
DualInputSemanticProperties semanticProperties = c.getSemanticProperties();


if (semanticProperties == null) {
return false;
}
FieldSet fs;
switch(input) {
case 0:
if (semanticProperties != null) {
FieldSet fs;
if ((fs = semanticProperties.getForwardedField1(fieldNumber)) != null) {
return fs.contains(fieldNumber);
}
if ((fs = semanticProperties.getForwardedField1(fieldNumber)) != null) {
return fs.contains(fieldNumber);
}
break;
case 1:
if(semanticProperties != null) {
FieldSet fs;
if ((fs = semanticProperties.getForwardedField2(fieldNumber)) != null) {
return fs.contains(fieldNumber);
}

if ((fs = semanticProperties.getForwardedField2(fieldNumber)) != null) {
return fs.contains(fieldNumber);
}
break;
default:
Expand All @@ -698,8 +699,7 @@ public boolean isFieldConstant(int input, int fieldNumber) {

return false;
}



// --------------------------------------------------------------------------------------------
// Miscellaneous
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public String getName() {
public boolean isFieldConstant(int input, int fieldNumber) {
return false;
}

protected void readStubAnnotations() {}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

import eu.stratosphere.api.common.operators.Order;
import eu.stratosphere.api.common.operators.Ordering;
import eu.stratosphere.api.common.operators.SemanticProperties;
import eu.stratosphere.api.common.operators.util.FieldList;
import eu.stratosphere.api.common.operators.util.FieldSet;
import eu.stratosphere.compiler.CompilerException;
import eu.stratosphere.compiler.dag.OptimizerNode;
import eu.stratosphere.compiler.plan.Channel;
import eu.stratosphere.compiler.util.Utils;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
Expand Down Expand Up @@ -190,26 +190,61 @@ public void reset() {
this.partitioningFields = null;
}

public Ordering getOrdering() {
return this.ordering;
}

public void setOrdering(Ordering ordering) {
this.ordering = ordering;
}

public void setPartitioningFields(FieldList partitioningFields) {
this.partitioningFields = partitioningFields;
}

/**
* Filters these properties by what can be preserved through the given output contract.
*
* @param contract
* The output contract.
* @return True, if any non-default value is preserved, false otherwise.
* Filters these GlobalProperties by the fields that are constant or forwarded to another output field.
*
* @param props The node representing the contract.
* @param input The index of the input.
* @return The filtered GlobalProperties
*/
public GlobalProperties filterByNodesConstantSet(OptimizerNode node, int input) {
public GlobalProperties filterBySemanticProperties(SemanticProperties props, int input) {
// check if partitioning survives
FieldList forwardFields = null;
GlobalProperties returnProps = this;

if (props == null) {
return new GlobalProperties();
}

if (this.ordering != null) {
for (int col : this.ordering.getInvolvedIndexes()) {
if (!node.isFieldConstant(input, col)) {
return new GlobalProperties();
for (int index : this.ordering.getInvolvedIndexes()) {
forwardFields = props.getForwardFields(input, index) == null ? null: props.getForwardFields(input, index).toFieldList();
if (forwardFields == null) {
returnProps = new GlobalProperties();
} else if (!forwardFields.contains(index)) {
returnProps = returnProps == this ? this.clone() : returnProps;
returnProps.setOrdering(returnProps.getOrdering().replaceOrdering(index, forwardFields.get(0)));
}
}
}
if (this.partitioningFields != null) {
for (int colIndex : this.partitioningFields) {
if (!node.isFieldConstant(input, colIndex)) {
return new GlobalProperties();
for (int index : this.partitioningFields) {
forwardFields = props.getForwardFields(input, index) == null ? null: props.getForwardFields(input, index).toFieldList();
if (forwardFields == null) {
returnProps = new GlobalProperties();
} else if (!forwardFields.contains(index)) {
returnProps = returnProps == this ? this.clone() : returnProps;
FieldList partitioned = new FieldList();
for (Integer value : returnProps.getPartitioningFields()) {
if (value.intValue() == index) {
partitioned = partitioned.addFields(forwardFields);
} else {
partitioned = partitioned.addField(value);
}
}
returnProps.setPartitioningFields(partitioned);
}
}
}
Expand All @@ -220,7 +255,7 @@ public GlobalProperties filterByNodesConstantSet(OptimizerNode node, int input)
for (Iterator<FieldSet> combos = newSet.iterator(); combos.hasNext(); ){
FieldSet current = combos.next();
for (Integer field : current) {
if (!node.isFieldConstant(input, field)) {
if (!props.getForwardFields(input, field).contains(field)) {
combos.remove();
break;
}
Expand All @@ -238,7 +273,7 @@ public GlobalProperties filterByNodesConstantSet(OptimizerNode node, int input)
return new GlobalProperties();
}

return this;
return returnProps;
}

public void parameterizeChannel(Channel channel, boolean globalDopChange) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
import java.util.Iterator;
import java.util.Set;

import eu.stratosphere.api.common.operators.DualInputOperator;
import eu.stratosphere.api.common.operators.SemanticProperties;
import eu.stratosphere.api.common.operators.SingleInputOperator;
import eu.stratosphere.compiler.dag.OptimizerNode;
import eu.stratosphere.compiler.dag.SingleInputNode;
import eu.stratosphere.compiler.dag.TwoInputNode;

/**
* The interesting properties that a node in the optimizer plan hands to its predecessors. It has the
Expand All @@ -40,9 +45,7 @@ public InterestingProperties() {

/**
* Private constructor for cloning purposes.
*
* @param maxCostsGlobal The maximal costs for the global properties.
* @param maxCostsLocal The maximal costs for the local properties.
*
* @param globalProps The global properties for this new object.
* @param localProps The local properties for this new object.
*/
Expand Down Expand Up @@ -88,15 +91,21 @@ public Set<RequestedGlobalProperties> getGlobalProperties() {
public InterestingProperties filterByCodeAnnotations(OptimizerNode node, int input)
{
InterestingProperties iProps = new InterestingProperties();

SemanticProperties props = null;
if (node instanceof SingleInputNode) {
props = ((SingleInputOperator<?, ?, ?>) node.getPactContract()).getSemanticProperties();
} else if (node instanceof TwoInputNode) {
props = ((DualInputOperator<?, ?, ?, ?>) node.getPactContract()).getSemanticProperties();
}

for (RequestedGlobalProperties rgp : this.globalProps) {
RequestedGlobalProperties filtered = rgp.filterByNodesConstantSet(node, input);
RequestedGlobalProperties filtered = rgp.filterBySemanticProperties(props, input);
if (filtered != null && !filtered.isTrivial()) {
iProps.addGlobalProperties(filtered);
}
}
for (RequestedLocalProperties rlp : this.localProps) {
RequestedLocalProperties filtered = rlp.filterByNodesConstantSet(node, input);
RequestedLocalProperties filtered = rlp.filterBySemanticProperties(props, input);
if (filtered != null && !filtered.isTrivial()) {
iProps.addLocalProperties(filtered);
}
Expand Down
Loading