Skip to content

Commit

Permalink
[FLINK-1682] Ported optimizer unit tests from Record API to Java API
Browse files Browse the repository at this point in the history
This closes #627
  • Loading branch information
fhueske committed May 5, 2015
1 parent adb321d commit bd96ba8
Show file tree
Hide file tree
Showing 42 changed files with 649 additions and 1,053 deletions.
Expand Up @@ -21,18 +21,14 @@
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;


import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.record.operators.CrossOperator; import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.record.operators.CrossWithLargeOperator; import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.record.operators.CrossWithSmallOperator; import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.api.java.record.operators.FileDataSink; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.optimizer.plan.Channel; import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.util.DummyCrossStub;
import org.apache.flink.optimizer.util.DummyInputFormat;
import org.apache.flink.optimizer.util.DummyOutputFormat;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test; import org.junit.Test;
Expand All @@ -41,27 +37,23 @@
* Tests that validate optimizer choices when using operators that are requesting certain specific execution * Tests that validate optimizer choices when using operators that are requesting certain specific execution
* strategies. * strategies.
*/ */
@SuppressWarnings({"serial", "deprecation"}) @SuppressWarnings({"serial"})
public class AdditionalOperatorsTest extends CompilerTestBase { public class AdditionalOperatorsTest extends CompilerTestBase {


@Test @Test
public void testCrossWithSmall() { public void testCrossWithSmall() {
// construct the plan // construct the plan
FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1"); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2"); env.setParallelism(DEFAULT_PARALLELISM);

DataSet<Long> set1 = env.generateSequence(0,1);
CrossOperator cross = CrossWithSmallOperator.builder(new DummyCrossStub()) DataSet<Long> set2 = env.generateSequence(0,1);
.input1(source1).input2(source2)
.name("Cross").build(); set1.crossWithTiny(set2).name("Cross")

.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink");

Plan plan = new Plan(sink);
plan.setDefaultParallelism(DEFAULT_PARALLELISM);


try { try {
OptimizedPlan oPlan = compileNoStats(plan); JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan); OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan);


DualInputPlanNode crossPlanNode = resolver.getNode("Cross"); DualInputPlanNode crossPlanNode = resolver.getNode("Cross");
Expand All @@ -72,27 +64,23 @@ public void testCrossWithSmall() {
assertEquals(ShipStrategyType.BROADCAST, in2.getShipStrategy()); assertEquals(ShipStrategyType.BROADCAST, in2.getShipStrategy());
} catch(CompilerException ce) { } catch(CompilerException ce) {
ce.printStackTrace(); ce.printStackTrace();
fail("The pact compiler is unable to compile this plan correctly."); fail("The Flink optimizer is unable to compile this plan correctly.");
} }
} }


@Test @Test
public void testCrossWithLarge() { public void testCrossWithLarge() {
// construct the plan // construct the plan
FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1"); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2"); env.setParallelism(DEFAULT_PARALLELISM);

DataSet<Long> set1 = env.generateSequence(0,1);
CrossOperator cross= CrossWithLargeOperator.builder(new DummyCrossStub()) DataSet<Long> set2 = env.generateSequence(0,1);
.input1(source1).input2(source2)
.name("Cross").build(); set1.crossWithHuge(set2).name("Cross")

.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());;
FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink");

Plan plan = new Plan(sink);
plan.setDefaultParallelism(DEFAULT_PARALLELISM);


try { try {
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan); OptimizedPlan oPlan = compileNoStats(plan);
OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan); OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan);


Expand Down

0 comments on commit bd96ba8

Please sign in to comment.