Skip to content

Commit

Permalink
[FLINK-1357] [compiler] Add union between static and dynamic path
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Jan 6, 2015
1 parent d2f0c40 commit 0190dd2
Show file tree
Hide file tree
Showing 10 changed files with 412 additions and 105 deletions.
Expand Up @@ -95,6 +95,7 @@
import org.apache.flink.compiler.postpass.OptimizerPostPass; import org.apache.flink.compiler.postpass.OptimizerPostPass;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy; import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.InstantiationUtil;
Expand Down Expand Up @@ -580,9 +581,7 @@ private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser) throws


// finalize the plan // finalize the plan
OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program); OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program);


// swap the binary unions for n-ary unions. this changes no strategies or memory consumers whatsoever, so
// we can do this after the plan finalization
plan.accept(new BinaryUnionReplacer()); plan.accept(new BinaryUnionReplacer());


// post pass the plan. this is the phase where the serialization and comparator code is set // post pass the plan. this is the phase where the serialization and comparator code is set
Expand Down Expand Up @@ -1029,7 +1028,6 @@ public boolean preVisit(OptimizerNode node) {
} }
} }



@Override @Override
public void postVisit(OptimizerNode visitable) {} public void postVisit(OptimizerNode visitable) {}
} }
Expand Down Expand Up @@ -1057,8 +1055,11 @@ public void postVisit(OptimizerNode node) {
} }


/** /**
* Utility class that traverses a plan to collect all nodes and add them to the OptimizedPlan. * Finalization of the plan:
* Besides collecting all nodes, this traversal assigns the memory to the nodes. * - The graph of nodes is double-linked (links from child to parent are inserted)
* - If unions join static and dynamic paths, the cache is marked as a memory consumer
* - Relative memory fractions are assigned to all nodes.
* - All nodes are collected into a set.
*/ */
private static final class PlanFinalizer implements Visitor<PlanNode> { private static final class PlanFinalizer implements Visitor<PlanNode> {


Expand Down Expand Up @@ -1119,9 +1120,7 @@ private OptimizedPlan createFinalPlan(List<SinkPlanNode> sinks, String jobName,
c.setRelativeTempMemory(relativeMem); c.setRelativeTempMemory(relativeMem);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Assigned " + relativeMem + " of total memory to each instance of the temp " + LOG.debug("Assigned " + relativeMem + " of total memory to each instance of the temp " +
"table" + "table for " + c + ".");
" " +
"for " + c + ".");
} }
} }
} }
Expand All @@ -1143,6 +1142,12 @@ public boolean preVisit(PlanNode visitable) {
else if (visitable instanceof SourcePlanNode) { else if (visitable instanceof SourcePlanNode) {
this.sources.add((SourcePlanNode) visitable); this.sources.add((SourcePlanNode) visitable);
} }
else if (visitable instanceof BinaryUnionPlanNode) {
BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable;
if (unionNode.unionsStaticAndDynamicPath()) {
unionNode.setDriverStrategy(DriverStrategy.UNION_WITH_CACHED);
}
}
else if (visitable instanceof BulkPartialSolutionPlanNode) { else if (visitable instanceof BulkPartialSolutionPlanNode) {
// tell the partial solution about the iteration node that contains it // tell the partial solution about the iteration node that contains it
final BulkPartialSolutionPlanNode pspn = (BulkPartialSolutionPlanNode) visitable; final BulkPartialSolutionPlanNode pspn = (BulkPartialSolutionPlanNode) visitable;
Expand Down Expand Up @@ -1229,7 +1234,6 @@ else if (visitable instanceof SolutionSetPlanNode) {
@Override @Override
public void postVisit(PlanNode visitable) {} public void postVisit(PlanNode visitable) {}
} }



/** /**
* A visitor that traverses the graph and collects cascading binary unions into a single n-ary * A visitor that traverses the graph and collects cascading binary unions into a single n-ary
Expand All @@ -1256,24 +1260,50 @@ public boolean preVisit(PlanNode visitable) {
public void postVisit(PlanNode visitable) { public void postVisit(PlanNode visitable) {


if (visitable instanceof BinaryUnionPlanNode) { if (visitable instanceof BinaryUnionPlanNode) {

final BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable; final BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable;
final Channel in1 = unionNode.getInput1(); final Channel in1 = unionNode.getInput1();
final Channel in2 = unionNode.getInput2(); final Channel in2 = unionNode.getInput2();


PlanNode newUnionNode; if (!unionNode.unionsStaticAndDynamicPath()) {

// both on static path, or both on dynamic path. we can collapse them
NAryUnionPlanNode newUnionNode;


List<Channel> inputs = new ArrayList<Channel>(); List<Channel> inputs = new ArrayList<Channel>();
collect(in1, inputs); collect(in1, inputs);
collect(in2, inputs); collect(in2, inputs);


newUnionNode = new NAryUnionPlanNode(unionNode.getOptimizerNode(), inputs, unionNode.getGlobalProperties()); newUnionNode = new NAryUnionPlanNode(unionNode.getOptimizerNode(), inputs,
unionNode.getGlobalProperties(), unionNode.getCumulativeCosts());

newUnionNode.setDegreeOfParallelism(unionNode.getDegreeOfParallelism());


for (Channel c : inputs) { for (Channel c : inputs) {
c.setTarget(newUnionNode); c.setTarget(newUnionNode);
} }


for(Channel channel : unionNode.getOutgoingChannels()){ for (Channel channel : unionNode.getOutgoingChannels()) {
channel.swapUnionNodes(newUnionNode); channel.swapUnionNodes(newUnionNode);
newUnionNode.addOutgoingChannel(channel);
}
}
else {
// union between the static and the dynamic path. we need to handle this for now
// through a special union operator

// make sure that the first input is the cached (static) and the second input is the dynamic
if (in1.isOnDynamicPath()) {
BinaryUnionPlanNode newUnionNode = new BinaryUnionPlanNode(unionNode);

in1.setTarget(newUnionNode);
in2.setTarget(newUnionNode);

for (Channel channel : unionNode.getOutgoingChannels()) {
channel.swapUnionNodes(newUnionNode);
newUnionNode.addOutgoingChannel(channel);
}
}
} }
} }
} }
Expand All @@ -1290,7 +1320,7 @@ private void collect(Channel in, List<Channel> inputs) {


inputs.addAll(((NAryUnionPlanNode) in.getSource()).getListOfInputs()); inputs.addAll(((NAryUnionPlanNode) in.getSource()).getListOfInputs());
} else { } else {
// is not a union node, so we take the channel directly // is not a collapsed union node, so we take the channel directly
inputs.add(in); inputs.add(in);
} }
} }
Expand Down
Expand Up @@ -16,10 +16,8 @@
* limitations under the License. * limitations under the License.
*/ */



package org.apache.flink.compiler.plan; package org.apache.flink.compiler.plan;



import org.apache.flink.compiler.dag.BinaryUnionNode; import org.apache.flink.compiler.dag.BinaryUnionNode;
import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.runtime.operators.DriverStrategy;


Expand All @@ -35,7 +33,28 @@ public BinaryUnionPlanNode(BinaryUnionNode template, Channel in1, Channel in2) {
super(template, "Union", in1, in2, DriverStrategy.UNION); super(template, "Union", in1, in2, DriverStrategy.UNION);
} }


public BinaryUnionPlanNode(BinaryUnionPlanNode toSwapFrom) {
super(toSwapFrom.getOptimizerNode(), "Union-With-Cached", toSwapFrom.getInput2(), toSwapFrom.getInput1(),
DriverStrategy.UNION_WITH_CACHED);

this.globalProps = toSwapFrom.globalProps;
this.localProps = toSwapFrom.localProps;
this.nodeCosts = toSwapFrom.nodeCosts;
this.cumulativeCosts = toSwapFrom.cumulativeCosts;

setDegreeOfParallelism(toSwapFrom.getDegreeOfParallelism());
}

public BinaryUnionNode getOptimizerNode() { public BinaryUnionNode getOptimizerNode() {
return (BinaryUnionNode) this.template; return (BinaryUnionNode) this.template;
} }

public boolean unionsStaticAndDynamicPath() {
return getInput1().isOnDynamicPath() != getInput2().isOnDynamicPath();
}

@Override
public int getMemoryConsumerWeight() {
return 0;
}
} }
Expand Up @@ -23,6 +23,7 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;


import org.apache.flink.compiler.costs.Costs;
import org.apache.flink.compiler.dag.BinaryUnionNode; import org.apache.flink.compiler.dag.BinaryUnionNode;
import org.apache.flink.compiler.dataproperties.GlobalProperties; import org.apache.flink.compiler.dataproperties.GlobalProperties;
import org.apache.flink.compiler.dataproperties.LocalProperties; import org.apache.flink.compiler.dataproperties.LocalProperties;
Expand All @@ -40,12 +41,16 @@ public class NAryUnionPlanNode extends PlanNode {
/** /**
* @param template * @param template
*/ */
public NAryUnionPlanNode(BinaryUnionNode template, List<Channel> inputs, GlobalProperties gProps) { public NAryUnionPlanNode(BinaryUnionNode template, List<Channel> inputs, GlobalProperties gProps,
Costs cumulativeCosts)
{
super(template, "Union", DriverStrategy.NONE); super(template, "Union", DriverStrategy.NONE);


this.inputs = inputs; this.inputs = inputs;
this.globalProps = gProps; this.globalProps = gProps;
this.localProps = new LocalProperties(); this.localProps = new LocalProperties();
this.nodeCosts = new Costs();
this.cumulativeCosts = cumulativeCosts;
} }


@Override @Override
Expand Down
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.compiler;

import static org.junit.Assert.*;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.compiler.plan.BinaryUnionPlanNode;
import org.apache.flink.compiler.plan.BulkIterationPlanNode;
import org.apache.flink.compiler.plan.Channel;
import org.apache.flink.compiler.plan.NAryUnionPlanNode;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plan.SingleInputPlanNode;
import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
import org.junit.Test;

@SuppressWarnings("serial")
public class UnionBetweenDynamicAndStaticPathTest extends CompilerTestBase {

@Test
public void testUnionStaticFirst() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Long> input1 = env.generateSequence(1, 10);
DataSet<Long> input2 = env.generateSequence(1, 10);

IterativeDataSet<Long> iteration = input1.iterate(10);

DataSet<Long> result = iteration.closeWith(
input2.union(input2).union(iteration.union(iteration)));

result.print();
result.print();

Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);

assertEquals(2, op.getDataSinks().size());

BulkIterationPlanNode iterPlan = (BulkIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();

SingleInputPlanNode noopNode = (SingleInputPlanNode) iterPlan.getRootOfStepFunction();
BinaryUnionPlanNode mixedUnion = (BinaryUnionPlanNode) noopNode.getInput().getSource();
NAryUnionPlanNode staticUnion = (NAryUnionPlanNode) mixedUnion.getInput1().getSource();
NAryUnionPlanNode dynamicUnion = (NAryUnionPlanNode) mixedUnion.getInput2().getSource();

assertTrue(mixedUnion.unionsStaticAndDynamicPath());
assertFalse(mixedUnion.getInput1().isOnDynamicPath());
assertTrue(mixedUnion.getInput2().isOnDynamicPath());
assertTrue(mixedUnion.getInput1().getTempMode().isCached());

for (Channel c : staticUnion.getInputs()) {
assertFalse(c.isOnDynamicPath());
}
for (Channel c : dynamicUnion.getInputs()) {
assertTrue(c.isOnDynamicPath());
}

assertEquals(0.5, iterPlan.getRelativeMemoryPerSubTask(), 0.0);
assertEquals(0.5, mixedUnion.getInput1().getRelativeTempMemory(), 0.0);
assertEquals(0.0, mixedUnion.getInput2().getRelativeTempMemory(), 0.0);

new NepheleJobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
public void testUnionStaticSecond() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Long> input1 = env.generateSequence(1, 10);
DataSet<Long> input2 = env.generateSequence(1, 10);

IterativeDataSet<Long> iteration = input1.iterate(10);

DataSet<Long> iterResult = iteration
.closeWith(iteration.union(iteration).union(input2.union(input2)));

iterResult.print();
iterResult.print();


Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);

assertEquals(2, op.getDataSinks().size());

BulkIterationPlanNode iterPlan = (BulkIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();

SingleInputPlanNode noopNode = (SingleInputPlanNode) iterPlan.getRootOfStepFunction();
BinaryUnionPlanNode mixedUnion = (BinaryUnionPlanNode) noopNode.getInput().getSource();
NAryUnionPlanNode staticUnion = (NAryUnionPlanNode) mixedUnion.getInput1().getSource();
NAryUnionPlanNode dynamicUnion = (NAryUnionPlanNode) mixedUnion.getInput2().getSource();

assertTrue(mixedUnion.unionsStaticAndDynamicPath());
assertFalse(mixedUnion.getInput1().isOnDynamicPath());
assertTrue(mixedUnion.getInput2().isOnDynamicPath());
assertTrue(mixedUnion.getInput1().getTempMode().isCached());

assertEquals(0.5, iterPlan.getRelativeMemoryPerSubTask(), 0.0);
assertEquals(0.5, mixedUnion.getInput1().getRelativeTempMemory(), 0.0);
assertEquals(0.0, mixedUnion.getInput2().getRelativeTempMemory(), 0.0);

for (Channel c : staticUnion.getInputs()) {
assertFalse(c.isOnDynamicPath());
}
for (Channel c : dynamicUnion.getInputs()) {
assertTrue(c.isOnDynamicPath());
}

new NepheleJobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}

0 comments on commit 0190dd2

Please sign in to comment.