Skip to content

Commit

Permalink
[FLINK-1671] [optimizer] Add data exchange mode to optimizer classes
Browse files Browse the repository at this point in the history
This closes #487
  • Loading branch information
StephanEwen committed Mar 17, 2015
1 parent 7f8e156 commit 1c50d87
Show file tree
Hide file tree
Showing 32 changed files with 1,782 additions and 627 deletions.
Expand Up @@ -28,10 +28,12 @@
import java.util.Map;
import java.util.Set;

import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
import org.apache.flink.compiler.dag.SortPartitionNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
import org.apache.flink.compiler.dag.SortPartitionNode;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
Expand Down Expand Up @@ -487,14 +489,14 @@ private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser) throws
LOG.debug("Beginning compilation of program '" + program.getJobName() + '\'');
}

// set the default degree of parallelism
int defaultParallelism = program.getDefaultParallelism() > 0 ?
final ExecutionMode defaultDataExchangeMode = program.getExecutionConfig().getExecutionMode();

final int defaultParallelism = program.getDefaultParallelism() > 0 ?
program.getDefaultParallelism() : this.defaultDegreeOfParallelism;

// log the output
if (LOG.isDebugEnabled()) {
LOG.debug("Using a default degree of parallelism of " + defaultParallelism + '.');
}
// log the default settings
LOG.debug("Using a default parallelism of {}", defaultParallelism);
LOG.debug("Using default data exchange mode {}", defaultDataExchangeMode);

// the first step in the compilation is to create the optimizer plan representation
// this step does the following:
Expand All @@ -505,7 +507,7 @@ private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser) throws
// 4) It makes estimates about the data volume of the data sources and
// propagates those estimates through the plan

GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(defaultParallelism);
GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(defaultParallelism, defaultDataExchangeMode);
program.accept(graphCreator);

// if we have a plan with multiple data sinks, add logical optimizer nodes that have two data-sinks as children
Expand All @@ -527,21 +529,17 @@ private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser) throws

// now that we have all nodes created and recorded which ones consume memory, tell the nodes their minimal
// guaranteed memory, for further cost estimations. we assume an equal distribution of memory among consumer tasks

rootNode.accept(new IdAndEstimatesVisitor(this.statistics));

// Now that the previous step is done, the next step is to traverse the graph again for the two
// steps that cannot directly be performed during the plan enumeration, because we are dealing with DAGs
// rather than a trees. That requires us to deviate at some points from the classical DB optimizer algorithms.
//
// 1) propagate the interesting properties top-down through the graph
// 2) Track information about nodes with multiple outputs that are later on reconnected in a node with
// multiple inputs.
InterestingPropertyVisitor propsVisitor = new InterestingPropertyVisitor(this.costEstimator);
rootNode.accept(propsVisitor);


// We are dealing with operator DAGs, rather than operator trees.
// That requires us to deviate at some points from the classical DB optimizer algorithms.
// This step build some auxiliary structures to help track branches and joins in the DAG
BranchesVisitor branchingVisitor = new BranchesVisitor();
rootNode.accept(branchingVisitor);

// Propagate the interesting properties top-down through the graph
InterestingPropertyVisitor propsVisitor = new InterestingPropertyVisitor(this.costEstimator);
rootNode.accept(propsVisitor);

// perform a sanity check: the root may not have any unclosed branches
if (rootNode.getOpenBranches() != null && rootNode.getOpenBranches().size() > 0) {
Expand Down Expand Up @@ -590,7 +588,7 @@ private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser) throws
* from the plan can be traversed.
*/
public static List<DataSinkNode> createPreOptimizedPlan(Plan program) {
GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(1);
GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(1, null);
program.accept(graphCreator);
return graphCreator.sinks;
}
Expand All @@ -609,40 +607,45 @@ public static List<DataSinkNode> createPreOptimizedPlan(Plan program) {
* estimation and the awareness for optimizer hints, the sizes will be properly estimated and the translated plan
* already respects all optimizer hints.
*/
private static final class GraphCreatingVisitor implements Visitor<Operator<?>> {
public static final class GraphCreatingVisitor implements Visitor<Operator<?>> {

private final Map<Operator<?>, OptimizerNode> con2node; // map from the operator objects to their
// corresponding optimizer nodes

private final List<DataSourceNode> sources; // all data source nodes in the optimizer plan

private final List<DataSinkNode> sinks; // all data sink nodes in the optimizer plan

private final int defaultParallelism; // the default degree of parallelism

private final GraphCreatingVisitor parent; // reference to enclosing creator, in case of a recursive translation


private final ExecutionMode defaultDataExchangeMode;

private final boolean forceDOP;


private GraphCreatingVisitor(int defaultParallelism) {
this(null, false, defaultParallelism, null);
public GraphCreatingVisitor(int defaultParallelism, ExecutionMode defaultDataExchangeMode) {
this(null, false, defaultParallelism, defaultDataExchangeMode, null);
}

private GraphCreatingVisitor(GraphCreatingVisitor parent, boolean forceDOP,
int defaultParallelism, HashMap<Operator<?>, OptimizerNode> closure) {
private GraphCreatingVisitor(GraphCreatingVisitor parent, boolean forceDOP, int defaultParallelism,
ExecutionMode dataExchangeMode, HashMap<Operator<?>, OptimizerNode> closure) {
if (closure == null){
con2node = new HashMap<Operator<?>, OptimizerNode>();
} else {
con2node = closure;
}
this.sources = new ArrayList<DataSourceNode>(4);

this.sinks = new ArrayList<DataSinkNode>(2);
this.defaultParallelism = defaultParallelism;
this.parent = parent;
this.defaultDataExchangeMode = dataExchangeMode;
this.forceDOP = forceDOP;
}

public List<DataSinkNode> getSinks() {
return sinks;
}

@SuppressWarnings("deprecation")
@Override
public boolean preVisit(Operator<?> c) {
Expand All @@ -660,9 +663,7 @@ public boolean preVisit(Operator<?> c) {
n = dsn;
}
else if (c instanceof GenericDataSourceBase) {
DataSourceNode dsn = new DataSourceNode((GenericDataSourceBase<?, ?>) c);
this.sources.add(dsn);
n = dsn;
n = new DataSourceNode((GenericDataSourceBase<?, ?>) c);
}
else if (c instanceof MapOperatorBase) {
n = new MapNode((MapOperatorBase<?, ?, ?>) c);
Expand Down Expand Up @@ -768,8 +769,8 @@ else if (c instanceof SolutionSetPlaceHolder) {
if (par > 0) {
if (this.forceDOP && par != this.defaultParallelism) {
par = this.defaultParallelism;
LOG.warn("The degree-of-parallelism of nested Dataflows (such as step functions in iterations) is " +
"currently fixed to the degree-of-parallelism of the surrounding operator (the iteration).");
LOG.warn("The parallelism of nested dataflows (such as step functions in iterations) is " +
"currently fixed to the parallelism of the surrounding operator (the iteration).");
}
} else {
par = this.defaultParallelism;
Expand All @@ -786,8 +787,8 @@ public void postVisit(Operator<?> c) {
OptimizerNode n = this.con2node.get(c);

// first connect to the predecessors
n.setInput(this.con2node);
n.setBroadcastInputs(this.con2node);
n.setInput(this.con2node, this.defaultDataExchangeMode);
n.setBroadcastInputs(this.con2node, this.defaultDataExchangeMode);

// if the node represents a bulk iteration, we recursively translate the data flow now
if (n instanceof BulkIterationNode) {
Expand All @@ -800,9 +801,9 @@ public void postVisit(Operator<?> c) {

// first, recursively build the data flow for the step function
final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(this, true,
iterNode.getDegreeOfParallelism(), closure);
iterNode.getDegreeOfParallelism(), defaultDataExchangeMode, closure);

BulkPartialSolutionNode partialSolution = null;
BulkPartialSolutionNode partialSolution;

iter.getNextPartialSolution().accept(recursiveCreator);

Expand Down Expand Up @@ -836,21 +837,23 @@ else if (n instanceof WorksetIterationNode) {
final WorksetIterationNode iterNode = (WorksetIterationNode) n;
final DeltaIterationBase<?, ?> iter = iterNode.getIterationContract();

// we need to ensure that both the next-workset and the solution-set-delta depend on the workset. One check is for free
// during the translation, we do the other check here as a pre-condition
// we need to ensure that both the next-workset and the solution-set-delta depend on the workset.
// One check is for free during the translation, we do the other check here as a pre-condition
{
StepFunctionValidator wsf = new StepFunctionValidator();
iter.getNextWorkset().accept(wsf);
if (!wsf.foundWorkset) {
throw new CompilerException("In the given program, the next workset does not depend on the workset. This is a prerequisite in delta iterations.");
throw new CompilerException("In the given program, the next workset does not depend on the workset. " +
"This is a prerequisite in delta iterations.");
}
}

// calculate the closure of the anonymous function
HashMap<Operator<?>, OptimizerNode> closure = new HashMap<Operator<?>, OptimizerNode>(con2node);

// first, recursively build the data flow for the step function
final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(this, true, iterNode.getDegreeOfParallelism(), closure);
final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(
this, true, iterNode.getDegreeOfParallelism(), defaultDataExchangeMode, closure);

// descend from the solution set delta. check that it depends on both the workset
// and the solution set. If it does depend on both, this descend should create both nodes
Expand All @@ -859,7 +862,8 @@ else if (n instanceof WorksetIterationNode) {
final WorksetNode worksetNode = (WorksetNode) recursiveCreator.con2node.get(iter.getWorkset());

if (worksetNode == null) {
throw new CompilerException("In the given program, the solution set delta does not depend on the workset. This is a prerequisite in delta iterations.");
throw new CompilerException("In the given program, the solution set delta does not depend on the workset." +
"This is a prerequisite in delta iterations.");
}

iter.getNextWorkset().accept(recursiveCreator);
Expand Down Expand Up @@ -895,7 +899,8 @@ else if (successor.getClass() == CoGroupNode.class) {
}
}
else {
throw new InvalidProgramException("Error: The only operations allowed on the solution set are Join and CoGroup.");
throw new InvalidProgramException(
"Error: The only operations allowed on the solution set are Join and CoGroup.");
}
}
}
Expand All @@ -905,14 +910,14 @@ else if (successor.getClass() == CoGroupNode.class) {

// set the step function nodes to the iteration node
iterNode.setPartialSolution(solutionSetNode, worksetNode);
iterNode.setNextPartialSolution(solutionSetDeltaNode, nextWorksetNode);
iterNode.setNextPartialSolution(solutionSetDeltaNode, nextWorksetNode, defaultDataExchangeMode);

// go over the contained data flow and mark the dynamic path nodes
StaticDynamicPathIdentifier pathIdentifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight());
iterNode.acceptForStepFunction(pathIdentifier);
}
}
};
}

private static final class StaticDynamicPathIdentifier implements Visitor<OptimizerNode> {

Expand Down Expand Up @@ -944,28 +949,21 @@ public void postVisit(OptimizerNode visitable) {
* Simple visitor that sets the minimal guaranteed memory per task based on the amount of available memory,
* the number of memory consumers, and on the task's degree of parallelism.
*/
private static final class IdAndEstimatesVisitor implements Visitor<OptimizerNode> {
public static final class IdAndEstimatesVisitor implements Visitor<OptimizerNode> {

private final DataStatistics statistics;

private int id = 1;

private IdAndEstimatesVisitor(DataStatistics statistics) {
public IdAndEstimatesVisitor(DataStatistics statistics) {
this.statistics = statistics;
}


@Override
public boolean preVisit(OptimizerNode visitable) {
if (visitable.getId() != -1) {
// been here before
return false;
}

return true;
return visitable.getId() == -1;
}


@Override
public void postVisit(OptimizerNode visitable) {
// the node ids
Expand Down Expand Up @@ -1031,7 +1029,7 @@ public void postVisit(OptimizerNode visitable) {}
* that are not a minimally connected DAG (Such plans are not trees, but at least one node feeds its
* output into more than one other node).
*/
private static final class BranchesVisitor implements Visitor<OptimizerNode> {
public static final class BranchesVisitor implements Visitor<OptimizerNode> {

@Override
public boolean preVisit(OptimizerNode node) {
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
Expand Down Expand Up @@ -66,7 +67,7 @@ public List<PactConnection> getIncomingConnections() {
}

@Override
public void setInput(Map<Operator<?>, OptimizerNode> contractToNode) {}
public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode dataExchangeMode) {}

@Override
protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
Expand Down

0 comments on commit 1c50d87

Please sign in to comment.