Skip to content

Commit

Permalink
[FLINK-3052] [optimizer] Fix instantiation of bulk iteration candidates
Browse files Browse the repository at this point in the history
When a candidate for a bulk iteration is instantiated, then the optimizer creates candidates
for the step function. It is then checked that there exists a candidate solution for the step
function whose properties met the properties of the input to the bulk iteration. Sometimes
it is necessary to add a no-op plan node to the end of the step function to generate the
correct properties. These new candidates have to be added to the final set of the accepted
candidates.

This commit adds that these new candidates are properly added to the set of accepted candidates.

Fix test and add new iteration tests

Add predecessor operator and dynamic path information to no op operator in bulk iterations

This closes #1388.
  • Loading branch information
tillrohrmann committed Nov 20, 2015
1 parent 177df41 commit 8dc70f2
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 23 deletions.
Expand Up @@ -28,7 +28,6 @@
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
import org.apache.flink.api.common.operators.base.BulkIterationBase;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.traversals.InterestingPropertyVisitor;
Expand All @@ -48,6 +47,7 @@
import org.apache.flink.optimizer.plan.PlanNode;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
import org.apache.flink.optimizer.util.NoOpUnaryUdfOp;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.util.Visitor;

Expand Down Expand Up @@ -273,7 +273,7 @@ public void computeUnclosedBranchStack() {
this.openBranches = (result == null || result.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : result;
}


@SuppressWarnings("unchecked")
@Override
protected void instantiateCandidate(OperatorDescriptorSingle dps, Channel in, List<Set<? extends NamedChannel>> broadcastPlanChannels,
List<PlanNode> target, CostEstimator estimator, RequestedGlobalProperties globPropsReq, RequestedLocalProperties locPropsReq)
Expand Down Expand Up @@ -321,8 +321,10 @@ else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
Channel toNoOp = new Channel(candidate);
globPropsReq.parameterizeChannel(toNoOp, false, rootConnection.getDataExchangeMode(), false);
locPropsReq.parameterizeChannel(toNoOp);

UnaryOperatorNode rebuildPropertiesNode = new UnaryOperatorNode("Rebuild Partial Solution Properties", FieldList.EMPTY_LIST);

NoOpUnaryUdfOp noOpUnaryUdfOp = new NoOpUnaryUdfOp<>();
noOpUnaryUdfOp.setInput(candidate.getProgramOperator());
UnaryOperatorNode rebuildPropertiesNode = new UnaryOperatorNode("Rebuild Partial Solution Properties", noOpUnaryUdfOp, true);
rebuildPropertiesNode.setParallelism(candidate.getParallelism());

SingleInputPlanNode rebuildPropertiesPlanNode = new SingleInputPlanNode(rebuildPropertiesNode, "Rebuild Partial Solution Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
Expand All @@ -343,8 +345,10 @@ else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
planDeleter.remove();
}
}

candidates.addAll(newCandidates);
}

if (candidates.isEmpty()) {
return;
}
Expand Down
Expand Up @@ -18,10 +18,12 @@

package org.apache.flink.optimizer.dag;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SingleInputOperator;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.optimizer.DataStatistics;
Expand All @@ -30,11 +32,17 @@

public class UnaryOperatorNode extends SingleInputNode {

private final List<OperatorDescriptorSingle> operator;
private final List<OperatorDescriptorSingle> operators;

private final String name;

public UnaryOperatorNode(String name, SingleInputOperator<?, ?, ?> operator, boolean onDynamicPath) {
super(operator);

this.name = name;
this.operators = new ArrayList<>();
this.onDynamicPath = onDynamicPath;
}

public UnaryOperatorNode(String name, FieldSet keys, OperatorDescriptorSingle ... operators) {
this(name, keys, Arrays.asList(operators));
Expand All @@ -43,13 +51,13 @@ public UnaryOperatorNode(String name, FieldSet keys, OperatorDescriptorSingle ..
public UnaryOperatorNode(String name, FieldSet keys, List<OperatorDescriptorSingle> operators) {
super(keys);

this.operator = operators;
this.operators = operators;
this.name = name;
}

@Override
protected List<OperatorDescriptorSingle> getPossibleProperties() {
return this.operator;
return this.operators;
}

@Override
Expand Down
Expand Up @@ -52,6 +52,7 @@
import org.apache.flink.optimizer.plan.WorksetPlanNode;
import org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
import org.apache.flink.optimizer.util.NoOpBinaryUdfOp;
import org.apache.flink.optimizer.util.NoOpUnaryUdfOp;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
Expand Down Expand Up @@ -307,7 +308,8 @@ public void clearInterestingProperties() {
this.nextWorkset.accept(InterestingPropertiesClearer.INSTANCE);
this.solutionSetDelta.accept(InterestingPropertiesClearer.INSTANCE);
}


@SuppressWarnings("unchecked")
@Override
protected void instantiate(OperatorDescriptorDual operator, Channel solutionSetIn, Channel worksetIn,
List<Set<? extends NamedChannel>> broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator,
Expand Down Expand Up @@ -367,9 +369,14 @@ else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
globPropsReqWorkset.parameterizeChannel(toNoOp, false,
nextWorksetRootConnection.getDataExchangeMode(), false);
locPropsReqWorkset.parameterizeChannel(toNoOp);

UnaryOperatorNode rebuildWorksetPropertiesNode = new UnaryOperatorNode("Rebuild Workset Properties",
FieldList.EMPTY_LIST);

NoOpUnaryUdfOp noOpUnaryUdfOp = new NoOpUnaryUdfOp<>();
noOpUnaryUdfOp.setInput(candidate.getProgramOperator());

UnaryOperatorNode rebuildWorksetPropertiesNode = new UnaryOperatorNode(
"Rebuild Workset Properties",
noOpUnaryUdfOp,
true);

rebuildWorksetPropertiesNode.setParallelism(candidate.getParallelism());

Expand Down
Expand Up @@ -36,7 +36,7 @@ public class NoOpUnaryUdfOp<OUT> extends SingleInputOperator<OUT, OUT, NoOpFunct
@SuppressWarnings("rawtypes")
public static final NoOpUnaryUdfOp INSTANCE = new NoOpUnaryUdfOp();

private NoOpUnaryUdfOp() {
public NoOpUnaryUdfOp() {
// pass null here because we override getOutputType to return type
// of input operator
super(new UserCodeClassWrapper<NoOpFunction>(NoOpFunction.class), null, "");
Expand Down
Expand Up @@ -20,11 +20,11 @@

import static org.junit.Assert.*;

import org.apache.flink.api.common.functions.*;
import org.apache.flink.optimizer.dag.TempMode;
import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.junit.Test;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.api.common.Plan;
Expand All @@ -33,11 +33,6 @@
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.tuple.Tuple1;
Expand All @@ -53,6 +48,8 @@
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.util.Collector;

import java.util.Iterator;

@SuppressWarnings({"serial", "unchecked"})
public class IterationsCompilerTest extends CompilerTestBase {

Expand Down Expand Up @@ -157,8 +154,16 @@ public void testTwoIterationsDirectlyChained() throws Exception {
assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);

WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();

assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
BulkIterationPlanNode bipn = (BulkIterationPlanNode)wipn.getInput1().getSource();

// the hash partitioning has been pushed out of the delta iteration into the bulk iteration
assertEquals(ShipStrategyType.FORWARD, wipn.getInput1().getShipStrategy());

// the input of the root step function is the last operator of the step function
// since the work has been pushed out of the bulk iteration, it has to guarantee the hash partitioning
for (Channel c : bipn.getRootOfStepFunction().getInputs()) {
assertEquals(ShipStrategyType.PARTITION_HASH, c.getShipStrategy());
}

assertEquals(DataExchangeMode.BATCH, wipn.getInput1().getDataExchangeMode());
assertEquals(DataExchangeMode.BATCH, wipn.getInput2().getDataExchangeMode());
Expand Down Expand Up @@ -223,7 +228,10 @@ public void testIterationPushingWorkOut() throws Exception {
DataSet<Tuple2<Long, Long>> input1 = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());

DataSet<Tuple2<Long, Long>> input2 = env.readCsvFile("/some/file/path").types(Long.class, Long.class);


// we do two join operations with input1 which is the partial solution
// it is cheaper to push the partitioning out so that the feedback channel and the
// initial input do the partitioning
doBulkIteration(input1, input2).output(new DiscardingOutputFormat<Tuple2<Long,Long>>());

Plan p = env.createProgramPlan();
Expand All @@ -234,10 +242,17 @@ public void testIterationPushingWorkOut() throws Exception {

BulkIterationPlanNode bipn = (BulkIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();

// check that work has not! been pushed out, as the end of the step function does not produce the necessary properties
// check that work has been pushed out
for (Channel c : bipn.getPartialSolutionPlanNode().getOutgoingChannels()) {
assertEquals(ShipStrategyType.FORWARD, c.getShipStrategy());
}

// the end of the step function has to produce the necessary properties
for (Channel c : bipn.getRootOfStepFunction().getInputs()) {
assertEquals(ShipStrategyType.PARTITION_HASH, c.getShipStrategy());
}

assertEquals(ShipStrategyType.PARTITION_HASH, bipn.getInput().getShipStrategy());

new JobGraphGenerator().compileJobGraph(op);
}
Expand All @@ -246,6 +261,44 @@ public void testIterationPushingWorkOut() throws Exception {
fail(e.getMessage());
}
}

@Test
public void testIterationNotPushingWorkOut() throws Exception {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8);

DataSet<Tuple2<Long, Long>> input1 = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());

DataSet<Tuple2<Long, Long>> input2 = env.readCsvFile("/some/file/path").types(Long.class, Long.class);

// Use input1 as partial solution. Partial solution is used in a single join operation --> it is cheaper
// to do the hash partitioning between the partial solution node and the join node
// instead of pushing the partitioning out
doSimpleBulkIteration(input1, input2).output(new DiscardingOutputFormat<Tuple2<Long,Long>>());

Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);

assertEquals(1, op.getDataSinks().size());
assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof BulkIterationPlanNode);

BulkIterationPlanNode bipn = (BulkIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();

// check that work has not been pushed out
for (Channel c : bipn.getPartialSolutionPlanNode().getOutgoingChannels()) {
assertEquals(ShipStrategyType.PARTITION_HASH, c.getShipStrategy());
}

assertEquals(ShipStrategyType.FORWARD, bipn.getInput().getShipStrategy());

new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
public void testWorksetIterationPipelineBreakerPlacement() {
Expand Down Expand Up @@ -322,6 +375,64 @@ public void testResetPartialSolution() {
fail(e.getMessage());
}
}

/**
* Tests that interesting properties can be pushed out of the bulk iteration. This requires
* that a NoOp node is appended to the step function which re-establishes the properties of
* the initial input. If this does not work, then Flink won't find a plan, because the optimizer
* will not consider plans where the partitioning is done after the partial solution node in
* this case (because of pruning).
* @throws Exception
*/
@Test
public void testBulkIterationWithPartialSolutionProperties() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple1<Long>> input1 = env.generateSequence(1, 10).map(new MapFunction<Long, Tuple1<Long>>() {
@Override
public Tuple1<Long> map(Long value) throws Exception {
return new Tuple1<>(value);
}
});

DataSet<Tuple1<Long>> input2 = env.generateSequence(1, 10).map(new MapFunction<Long, Tuple1<Long>>() {
@Override
public Tuple1<Long> map(Long value) throws Exception {
return new Tuple1<>(value);
}
});

DataSet<Tuple1<Long>> distinctInput = input1.distinct();

IterativeDataSet<Tuple1<Long>> iteration = distinctInput.iterate(10);

DataSet<Tuple1<Long>> iterationStep = iteration
.coGroup(input2)
.where(0)
.equalTo(0)
.with(new CoGroupFunction<Tuple1<Long>, Tuple1<Long>, Tuple1<Long>>() {
@Override
public void coGroup(
Iterable<Tuple1<Long>> first,
Iterable<Tuple1<Long>> second,
Collector<Tuple1<Long>> out) throws Exception {
Iterator<Tuple1<Long>> it = first.iterator();

if (it.hasNext()) {
out.collect(it.next());
}
}
});

DataSet<Tuple1<Long>> iterationResult = iteration.closeWith(iterationStep);

iterationResult.output(new DiscardingOutputFormat<Tuple1<Long>>());

Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);

new JobGraphGenerator().compileJobGraph(op);
}

// --------------------------------------------------------------------------------------------

Expand All @@ -339,6 +450,19 @@ public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, L
// close the bulk iteration
return iteration.closeWith(changes);
}

public static DataSet<Tuple2<Long, Long>> doSimpleBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {

// open a bulk iteration
IterativeDataSet<Tuple2<Long, Long>> iteration = vertices.iterate(20);

DataSet<Tuple2<Long, Long>> changes = iteration
.join(edges).where(0).equalTo(0)
.flatMap(new FlatMapJoin());

// close the bulk iteration
return iteration.closeWith(changes);
}


public static DataSet<Tuple2<Long, Long>> doDeltaIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
Expand Down

0 comments on commit 8dc70f2

Please sign in to comment.