From bd96ba8d1bbdc494ac88b98a6469255572f4a9fc Mon Sep 17 00:00:00 2001 From: Fabian Hueske Date: Sat, 25 Apr 2015 01:30:11 +0200 Subject: [PATCH] [FLINK-1682] Ported optimizer unit tests from Record API to Java API This closes #627 --- .../optimizer/AdditionalOperatorsTest.java | 64 +-- .../optimizer/BranchingPlansCompilerTest.java | 509 +++++++----------- .../BroadcastVariablePipelinebreakerTest.java | 7 +- .../CachedMatchStrategyCompilerTest.java | 7 +- .../CoGroupSolutionSetFirstTest.java | 3 +- .../optimizer/DisjointDataFlowsTest.java | 7 +- .../optimizer/DistinctCompilationTest.java | 10 +- .../flink/optimizer/GroupOrderTest.java | 100 ++-- .../optimizer/HardPlansCompilationTest.java | 55 +- .../optimizer/IterationsCompilerTest.java | 22 +- .../flink/optimizer/NestedIterationsTest.java | 9 +- .../optimizer/ParallelismChangeTest.java | 250 ++++----- .../optimizer/PartitionPushdownTest.java | 5 +- .../optimizer/PartitioningReusageTest.java | 57 +- .../flink/optimizer/PipelineBreakerTest.java | 16 +- .../optimizer/PropertyDataSourceTest.java | 49 +- .../apache/flink/optimizer/ReduceAllTest.java | 31 +- .../optimizer/ReplicatingDataSourceTest.java | 2 +- .../SemanticPropertiesAPIToPlanTest.java | 5 +- .../flink/optimizer/SortPartialReuseTest.java | 19 +- .../UnionBetweenDynamicAndStaticPathTest.java | 9 +- .../UnionPropertyPropagationTest.java | 48 +- .../flink/optimizer/UnionReplacementTest.java | 5 +- .../WorksetIterationCornerCasesTest.java | 24 +- ...orksetIterationsRecordApiCompilerTest.java | 110 ++-- .../CoGroupCustomPartitioningTest.java | 4 +- ...tomPartitioningGlobalOptimizationTest.java | 4 +- .../GroupingKeySelectorTranslationTest.java | 6 +- .../GroupingPojoTranslationTest.java | 8 +- .../GroupingTupleTranslationTest.java | 8 +- .../JoinCustomPartitioningTest.java | 4 +- .../optimizer/java/PartitionOperatorTest.java | 4 +- .../IdentityCoGrouper.java} | 15 +- .../IdentityCrosser.java} | 12 +- .../testfunctions/IdentityGroupReducer.java | 2 - .../IdentityGroupReducerCombinable.java} | 22 +- .../IdentityJoiner.java} | 14 +- .../optimizer/util/DummyCoGroupStub.java | 42 -- .../optimizer/util/DummyInputFormat.java | 42 -- .../flink/optimizer/util/DummyMatchStub.java | 37 -- .../flink/optimizer/util/IdentityReduce.java | 40 -- .../optimizer/util/OperatorResolver.java | 15 +- 42 files changed, 649 insertions(+), 1053 deletions(-) rename flink-optimizer/src/test/java/org/apache/flink/optimizer/{util/DummyNonPreservingMatchStub.java => testfunctions/IdentityCoGrouper.java} (69%) rename flink-optimizer/src/test/java/org/apache/flink/optimizer/{util/DummyCrossStub.java => testfunctions/IdentityCrosser.java} (75%) rename flink-optimizer/src/test/java/org/apache/flink/optimizer/{util/IdentityMap.java => testfunctions/IdentityGroupReducerCombinable.java} (64%) rename flink-optimizer/src/test/java/org/apache/flink/optimizer/{util/DummyOutputFormat.java => testfunctions/IdentityJoiner.java} (74%) delete mode 100644 flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java delete mode 100644 flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java delete mode 100644 flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java delete mode 100644 flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java index 0c5053636c2ce..a4e74a9d15dcc 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java @@ -21,18 +21,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.record.operators.CrossOperator; -import org.apache.flink.api.java.record.operators.CrossWithLargeOperator; -import org.apache.flink.api.java.record.operators.CrossWithSmallOperator; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.operators.translation.JavaPlan; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.plan.Channel; import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.util.DummyCrossStub; -import org.apache.flink.optimizer.util.DummyInputFormat; -import org.apache.flink.optimizer.util.DummyOutputFormat; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; @@ -41,27 +37,23 @@ * Tests that validate optimizer choices when using operators that are requesting certain specific execution * strategies. */ -@SuppressWarnings({"serial", "deprecation"}) +@SuppressWarnings({"serial"}) public class AdditionalOperatorsTest extends CompilerTestBase { @Test public void testCrossWithSmall() { // construct the plan - FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1"); - FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2"); - - CrossOperator cross = CrossWithSmallOperator.builder(new DummyCrossStub()) - .input1(source1).input2(source2) - .name("Cross").build(); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink"); - - Plan plan = new Plan(sink); - plan.setDefaultParallelism(DEFAULT_PARALLELISM); - - + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + DataSet set1 = env.generateSequence(0,1); + DataSet set2 = env.generateSequence(0,1); + + set1.crossWithTiny(set2).name("Cross") + .output(new DiscardingOutputFormat>()); + try { - OptimizedPlan oPlan = compileNoStats(plan); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan); DualInputPlanNode crossPlanNode = resolver.getNode("Cross"); @@ -72,27 +64,23 @@ public void testCrossWithSmall() { assertEquals(ShipStrategyType.BROADCAST, in2.getShipStrategy()); } catch(CompilerException ce) { ce.printStackTrace(); - fail("The pact compiler is unable to compile this plan correctly."); + fail("The Flink optimizer is unable to compile this plan correctly."); } } @Test public void testCrossWithLarge() { // construct the plan - FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1"); - FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2"); - - CrossOperator cross= CrossWithLargeOperator.builder(new DummyCrossStub()) - .input1(source1).input2(source2) - .name("Cross").build(); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink"); - - Plan plan = new Plan(sink); - plan.setDefaultParallelism(DEFAULT_PARALLELISM); - - + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + DataSet set1 = env.generateSequence(0,1); + DataSet set2 = env.generateSequence(0,1); + + set1.crossWithHuge(set2).name("Cross") + .output(new DiscardingOutputFormat>());; + try { + JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileNoStats(plan); OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java index 94ff41a505cb8..1d5b7c1cfc762 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java @@ -20,15 +20,19 @@ import static org.junit.Assert.*; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Set; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.io.TextOutputFormat; +import org.apache.flink.api.java.operators.DeltaIteration; +import org.apache.flink.api.java.operators.translation.JavaPlan; import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction; +import org.apache.flink.optimizer.testfunctions.IdentityCoGrouper; +import org.apache.flink.optimizer.testfunctions.IdentityCrosser; +import org.apache.flink.optimizer.testfunctions.IdentityJoiner; import org.apache.flink.optimizer.testfunctions.SelectOneReducer; import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.util.Collector; @@ -41,15 +45,6 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichJoinFunction; import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; -import org.apache.flink.api.java.record.operators.BulkIteration; -import org.apache.flink.api.java.record.operators.CoGroupOperator; -import org.apache.flink.api.java.record.operators.CrossOperator; -import org.apache.flink.api.java.record.operators.DeltaIteration; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.JoinOperator; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SinkPlanNode; @@ -59,18 +54,8 @@ import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor; import org.apache.flink.optimizer.testfunctions.IdentityMapper; import org.apache.flink.optimizer.testfunctions.Top1GroupReducer; -import org.apache.flink.optimizer.util.DummyCoGroupStub; -import org.apache.flink.optimizer.util.DummyCrossStub; -import org.apache.flink.optimizer.util.DummyInputFormat; -import org.apache.flink.optimizer.util.DummyMatchStub; -import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub; -import org.apache.flink.optimizer.util.DummyOutputFormat; -import org.apache.flink.optimizer.util.IdentityMap; -import org.apache.flink.optimizer.util.IdentityReduce; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; - -@SuppressWarnings({"serial", "deprecation"}) + +@SuppressWarnings({"serial"}) public class BranchingPlansCompilerTest extends CompilerTestBase { @@ -323,84 +308,53 @@ public void coGroup(Iterable> first, public void testBranchEachContractType() { try { // construct the plan - FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), "file:///test/file1", "Source A"); - FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), "file:///test/file2", "Source B"); - FileDataSource sourceC = new FileDataSource(new DummyInputFormat(), "file:///test/file3", "Source C"); - - MapOperator map1 = MapOperator.builder(new IdentityMap()).input(sourceA).name("Map 1").build(); - - ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0) - .input(map1) - .name("Reduce 1") - .build(); - - JoinOperator match1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) - .input1(sourceB, sourceB, sourceC) - .input2(sourceC) - .name("Match 1") - .build(); - ; - CoGroupOperator cogroup1 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0) - .input1(sourceA) - .input2(sourceB) - .name("CoGroup 1") - .build(); - - CrossOperator cross1 = CrossOperator.builder(new DummyCrossStub()) - .input1(reduce1) - .input2(cogroup1) - .name("Cross 1") - .build(); - - - CoGroupOperator cogroup2 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0) - .input1(cross1) - .input2(cross1) - .name("CoGroup 2") - .build(); - - CoGroupOperator cogroup3 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0) - .input1(map1) - .input2(match1) - .name("CoGroup 3") - .build(); - - - MapOperator map2 = MapOperator.builder(new IdentityMap()).input(cogroup3).name("Map 2").build(); - - CoGroupOperator cogroup4 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0) - .input1(map2) - .input2(match1) - .name("CoGroup 4") - .build(); - - CoGroupOperator cogroup5 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0) - .input1(cogroup2) - .input2(cogroup1) - .name("CoGroup 5") - .build(); - - CoGroupOperator cogroup6 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0) - .input1(reduce1) - .input2(cogroup4) - .name("CoGroup 6") - .build(); - - CoGroupOperator cogroup7 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0) - .input1(cogroup5) - .input2(cogroup6) - .name("CoGroup 7") - .build(); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cogroup7); - sink.addInput(sourceA); - sink.addInput(cogroup3); - sink.addInput(cogroup4); - sink.addInput(cogroup1); - - // return the PACT plan - Plan plan = new Plan(sink, "Branching of each contract type"); - + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + DataSet sourceA = env.generateSequence(0,1); + DataSet sourceB = env.generateSequence(0,1); + DataSet sourceC = env.generateSequence(0,1); + + DataSet map1 = sourceA.map(new IdentityMapper()).name("Map 1"); + + DataSet reduce1 = map1.groupBy("*").reduceGroup(new IdentityGroupReducer()).name("Reduce 1"); + + DataSet join1 = sourceB.union(sourceB).union(sourceC) + .join(sourceC).where("*").equalTo("*") + .with(new IdentityJoiner()).name("Join 1"); + + DataSet coGroup1 = sourceA.coGroup(sourceB).where("*").equalTo("*") + .with(new IdentityCoGrouper()).name("CoGroup 1"); + + DataSet cross1 = reduce1.cross(coGroup1) + .with(new IdentityCrosser()).name("Cross 1"); + + DataSet coGroup2 = cross1.coGroup(cross1).where("*").equalTo("*") + .with(new IdentityCoGrouper()).name("CoGroup 2"); + + DataSet coGroup3 = map1.coGroup(join1).where("*").equalTo("*") + .with(new IdentityCoGrouper()).name("CoGroup 3"); + + DataSet map2 = coGroup3.map(new IdentityMapper()).name("Map 2"); + + DataSet coGroup4 = map2.coGroup(join1).where("*").equalTo("*") + .with(new IdentityCoGrouper()).name("CoGroup 4"); + + DataSet coGroup5 = coGroup2.coGroup(coGroup1).where("*").equalTo("*") + .with(new IdentityCoGrouper()).name("CoGroup 5"); + + DataSet coGroup6 = reduce1.coGroup(coGroup4).where("*").equalTo("*") + .with(new IdentityCoGrouper()).name("CoGroup 6"); + + DataSet coGroup7 = coGroup5.coGroup(coGroup6).where("*").equalTo("*") + .with(new IdentityCoGrouper()).name("CoGroup 7"); + + coGroup7.union(sourceA) + .union(coGroup3) + .union(coGroup4) + .union(coGroup1) + .output(new DiscardingOutputFormat()); + + JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileNoStats(plan); JobGraphGenerator jobGen = new JobGraphGenerator(); @@ -418,47 +372,33 @@ public void testBranchEachContractType() { public void testBranchingUnion() { try { // construct the plan - FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE); - FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE); - - JoinOperator mat1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) - .input1(source1) - .input2(source2) - .name("Match 1") - .build(); - - MapOperator ma1 = MapOperator.builder(new IdentityMap()).input(mat1).name("Map1").build(); - - ReduceOperator r1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0) - .input(ma1) - .name("Reduce 1") - .build(); - - ReduceOperator r2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0) - .input(mat1) - .name("Reduce 2") - .build(); - - MapOperator ma2 = MapOperator.builder(new IdentityMap()).input(mat1).name("Map 2").build(); - - MapOperator ma3 = MapOperator.builder(new IdentityMap()).input(ma2).name("Map 3").build(); - - @SuppressWarnings("unchecked") - JoinOperator mat2 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) - .input1(r1, r2, ma2, ma3) - .input2(ma2) - .name("Match 2") - .build(); - mat2.setParameter(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_MERGE); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat2); - - - // return the PACT plan - Plan plan = new Plan(sink, "Branching Union"); - + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + DataSet source1 = env.generateSequence(0,1); + DataSet source2 = env.generateSequence(0,1); + + DataSet join1 = source1.join(source2).where("*").equalTo("*") + .with(new IdentityJoiner()).name("Join 1"); + + DataSet map1 = join1.map(new IdentityMapper()).name("Map 1"); + + DataSet reduce1 = map1.groupBy("*").reduceGroup(new IdentityGroupReducer()).name("Reduce 1"); + + DataSet reduce2 = join1.groupBy("*").reduceGroup(new IdentityGroupReducer()).name("Reduce 2"); + + DataSet map2 = join1.map(new IdentityMapper()).name("Map 2"); + + DataSet map3 = map2.map(new IdentityMapper()).name("Map 3"); + + DataSet join2 = reduce1.union(reduce2).union(map2).union(map3) + .join(map2, JoinHint.REPARTITION_SORT_MERGE).where("*").equalTo("*") + .with(new IdentityJoiner()).name("Join 2"); + + join2.output(new DiscardingOutputFormat()); + + JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileNoStats(plan); - + JobGraphGenerator jobGen = new JobGraphGenerator(); //Compile plan to verify that no error is thrown @@ -480,22 +420,18 @@ public void testBranchingUnion() { @Test public void testBranchingWithMultipleDataSinksSmall() { try { + String outPath1 = "/tmp/out1"; + String outPath2 = "/tmp/out2"; + // construct the plan - final String out1Path = "file:///test/1"; - final String out2Path = "file:///test/2"; - - FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE); - - FileDataSink sinkA = new FileDataSink(DummyOutputFormat.class, out1Path, sourceA); - FileDataSink sinkB = new FileDataSink(DummyOutputFormat.class, out2Path, sourceA); - - List sinks = new ArrayList(); - sinks.add(sinkA); - sinks.add(sinkB); - - // return the PACT plan - Plan plan = new Plan(sinks, "Plans With Multiple Data Sinks"); - + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + DataSet source1 = env.generateSequence(0,1); + + source1.writeAsText(outPath1); + source1.writeAsText(outPath2); + + JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileNoStats(plan); // ---------- check the optimizer plan ---------- @@ -505,15 +441,16 @@ public void testBranchingWithMultipleDataSinksSmall() { // sinks contain all sink paths Set allSinks = new HashSet(); - allSinks.add(out1Path); - allSinks.add(out2Path); + allSinks.add(outPath1); + allSinks.add(outPath2); for (SinkPlanNode n : oPlan.getDataSinks()) { - String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath(); + String path = ((TextOutputFormat)n.getSinkNode().getOperator() + .getFormatWrapper().getUserCodeObject()).getOutputFilePath().toString(); Assert.assertTrue("Invalid data sink.", allSinks.remove(path)); } - // ---------- compile plan to nephele job graph to verify that no error is thrown ---------- + // ---------- compile plan to job graph to verify that no error is thrown ---------- JobGraphGenerator jobGen = new JobGraphGenerator(); jobGen.compileJobGraph(oPlan); @@ -541,50 +478,38 @@ public void testBranchingDisjointPlan() { final String out3Path = "file:///test/3"; final String out4Path = "file:///test/4"; - FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE); - FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE); - - FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, out1Path, sourceA, "1"); - FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, out2Path, sourceB, "2"); - FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, out3Path, sourceA, "3"); - FileDataSink sink4 = new FileDataSink(DummyOutputFormat.class, out4Path, sourceB, "4"); - - - List sinks = new ArrayList(); - sinks.add(sink1); - sinks.add(sink2); - sinks.add(sink3); - sinks.add(sink4); - - // return the PACT plan - Plan plan = new Plan(sinks, "Disjoint plan with multiple data sinks and branches"); + // construct the plan + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + DataSet sourceA = env.generateSequence(0,1); + DataSet sourceB = env.generateSequence(0,1); + + sourceA.writeAsText(out1Path); + sourceB.writeAsText(out2Path); + sourceA.writeAsText(out3Path); + sourceB.writeAsText(out4Path); + + JavaPlan plan = env.createProgramPlan(); compileNoStats(plan); + } @Test public void testBranchAfterIteration() { - FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2"); - - BulkIteration iteration = new BulkIteration("Loop"); - iteration.setInput(sourceA); - iteration.setMaximumNumberOfIterations(10); - - MapOperator mapper = MapOperator.builder(IdentityMap.class).name("Mapper").input(iteration.getPartialSolution()).build(); - iteration.setNextPartialSolution(mapper); - - FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 1"); - - MapOperator postMap = MapOperator.builder(IdentityMap.class).name("Post Iteration Mapper") - .input(iteration).build(); - - FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, postMap, "Sink 2"); - - List sinks = new ArrayList(); - sinks.add(sink1); - sinks.add(sink2); - - Plan plan = new Plan(sinks); - + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + DataSet sourceA = env.generateSequence(0,1); + + IterativeDataSet loopHead = sourceA.iterate(10); + DataSet loopTail = loopHead.map(new IdentityMapper()).name("Mapper"); + DataSet loopRes = loopHead.closeWith(loopTail); + + loopRes.output(new DiscardingOutputFormat()); + loopRes.map(new IdentityMapper()) + .output(new DiscardingOutputFormat());; + + JavaPlan plan = env.createProgramPlan(); + try { compileNoStats(plan); } @@ -596,31 +521,20 @@ public void testBranchAfterIteration() { @Test public void testBranchBeforeIteration() { - FileDataSource source1 = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1"); - FileDataSource source2 = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2"); - - BulkIteration iteration = new BulkIteration("Loop"); - iteration.setInput(source2); - iteration.setMaximumNumberOfIterations(10); - - MapOperator inMap = MapOperator.builder(new IdentityMap()) - .input(source1) - .name("In Iteration Map") - .setBroadcastVariable("BC", iteration.getPartialSolution()) - .build(); - - iteration.setNextPartialSolution(inMap); - - MapOperator postMap = MapOperator.builder(new IdentityMap()) - .input(source1) - .name("Post Iteration Map") - .setBroadcastVariable("BC", iteration) - .build(); - - FileDataSink sink = new FileDataSink(DummyOutputFormat.class, OUT_FILE, postMap, "Sink"); - - Plan plan = new Plan(sink); - + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + DataSet source1 = env.generateSequence(0,1); + DataSet source2 = env.generateSequence(0,1); + + IterativeDataSet loopHead = source2.iterate(10).name("Loop"); + DataSet loopTail = source1.map(new IdentityMapper()).withBroadcastSet(loopHead, "BC").name("In-Loop Mapper"); + DataSet loopRes = loopHead.closeWith(loopTail); + + DataSet map = source1.map(new IdentityMapper()).withBroadcastSet(loopRes, "BC").name("Post-Loop Mapper"); + map.output(new DiscardingOutputFormat()); + + JavaPlan plan = env.createProgramPlan(); + try { compileNoStats(plan); } @@ -644,31 +558,22 @@ public void testBranchBeforeIteration() { */ @Test public void testClosure() { - FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1"); - FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2"); - - FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceA, "Sink 1"); - FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceB, "Sink 2"); - - BulkIteration iteration = new BulkIteration("Loop"); - iteration.setInput(sourceA); - iteration.setMaximumNumberOfIterations(10); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + DataSet sourceA = env.generateSequence(0,1); + DataSet sourceB = env.generateSequence(0,1); - CrossOperator stepFunction = CrossOperator.builder(DummyCrossStub.class).name("StepFunction"). - input1(iteration.getPartialSolution()). - input2(sourceB). - build(); + sourceA.output(new DiscardingOutputFormat()); + sourceB.output(new DiscardingOutputFormat()); - iteration.setNextPartialSolution(stepFunction); + IterativeDataSet loopHead = sourceA.iterate(10).name("Loop"); - FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 3"); + DataSet loopTail = loopHead.cross(sourceB).with(new IdentityCrosser()); + DataSet loopRes = loopHead.closeWith(loopTail); - List sinks = new ArrayList(); - sinks.add(sink1); - sinks.add(sink2); - sinks.add(sink3); + loopRes.output(new DiscardingOutputFormat()); - Plan plan = new Plan(sinks); + JavaPlan plan = env.createProgramPlan(); try{ compileNoStats(plan); @@ -691,40 +596,24 @@ public void testClosure() { */ @Test public void testClosureDeltaIteration() { - FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1"); - FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2"); - FileDataSource sourceC = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 3"); - - FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceA, "Sink 1"); - FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceC, "Sink 2"); - - DeltaIteration iteration = new DeltaIteration(0, "Loop"); - iteration.setInitialSolutionSet(sourceA); - iteration.setInitialWorkset(sourceB); - iteration.setMaximumNumberOfIterations(10); - - CrossOperator nextWorkset = CrossOperator.builder(DummyCrossStub.class).name("Next workset"). - input1(iteration.getWorkset()). - input2(sourceC). - build(); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + DataSet> sourceA = env.generateSequence(0,1).map(new Duplicator()); + DataSet> sourceB = env.generateSequence(0,1).map(new Duplicator()); + DataSet> sourceC = env.generateSequence(0,1).map(new Duplicator()); - JoinOperator solutionSetDelta = JoinOperator.builder(DummyMatchStub.class, LongValue.class,0,0). - name("Next solution set."). - input1(nextWorkset). - input2(iteration.getSolutionSet()). - build(); + sourceA.output(new DiscardingOutputFormat>()); + sourceC.output(new DiscardingOutputFormat>()); - iteration.setNextWorkset(nextWorkset); - iteration.setSolutionSetDelta(solutionSetDelta); + DeltaIteration, Tuple2> loop = sourceA.iterateDelta(sourceB, 10, 0); - FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 3"); + DataSet> workset = loop.getWorkset().cross(sourceB).with(new IdentityCrosser>()).name("Next work set"); + DataSet> delta = workset.join(loop.getSolutionSet()).where(0).equalTo(0).with(new IdentityJoiner>()).name("Solution set delta"); - List sinks = new ArrayList(); - sinks.add(sink1); - sinks.add(sink2); - sinks.add(sink3); + DataSet> result = loop.closeWith(delta, workset); + result.output(new DiscardingOutputFormat>()); - Plan plan = new Plan(sinks); + JavaPlan plan = env.createProgramPlan(); try{ compileNoStats(plan); @@ -752,44 +641,26 @@ public void testClosureDeltaIteration() { */ @Test public void testDeltaIterationWithStaticInput() { - FileDataSource source = new FileDataSource(DummyInputFormat.class, IN_FILE, "source"); - - MapOperator mappedSource = MapOperator.builder(IdentityMap.class). - input(source). - name("Identity mapped source"). - build(); - - ReduceOperator reducedSource = ReduceOperator.builder(IdentityReduce.class). - input(source). - name("Identity reduce source"). - build(); - - DeltaIteration iteration = new DeltaIteration(0,"Loop"); - iteration.setMaximumNumberOfIterations(10); - iteration.setInitialSolutionSet(source); - iteration.setInitialWorkset(mappedSource); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + DataSet> source = env.generateSequence(0,1).map(new Duplicator()); - JoinOperator nextWorkset = JoinOperator.builder(DummyNonPreservingMatchStub.class, IntValue.class, 0,0). - input1(iteration.getWorkset()). - input2(reducedSource). - name("Next work set"). - build(); + DataSet> map = source + .map(new IdentityMapper>()); + DataSet> reduce = source + .reduceGroup(new IdentityGroupReducer>()); - JoinOperator solutionSetDelta = JoinOperator.builder(DummyNonPreservingMatchStub.class, IntValue.class, 0, - 0). - input1(iteration.getSolutionSet()). - input2(nextWorkset). - name("Solution set delta"). - build(); + DeltaIteration, Tuple2> loop = source.iterateDelta(map, 10, 0); - iteration.setNextWorkset(nextWorkset); - iteration.setSolutionSetDelta(solutionSetDelta); + DataSet> workset = loop.getWorkset().join(reduce).where(0).equalTo(0) + .with(new IdentityJoiner>()).name("Next work set"); + DataSet> delta = loop.getSolutionSet().join(workset).where(0).equalTo(0) + .with(new IdentityJoiner>()).name("Solution set delta"); - FileDataSink sink = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Iteration sink"); - List sinks = new ArrayList(); - sinks.add(sink); + DataSet> result = loop.closeWith(delta, workset); + result.output(new DiscardingOutputFormat>()); - Plan plan = new Plan(sinks); + JavaPlan plan = env.createProgramPlan(); try{ compileNoStats(plan); @@ -871,7 +742,7 @@ public String join(String first, String second) { .withBroadcastSet(input3, "bc1") .withBroadcastSet(input1, "bc2") .withBroadcastSet(result1, "bc3") - .print(); + .output(new DiscardingOutputFormat()); Plan plan = env.createProgramPlan(); @@ -900,7 +771,7 @@ public void testBCVariableClosure() { IterativeDataSet iteration = initialSolution.iterate(100); iteration.closeWith(iteration.map(new IdentityMapper()).withBroadcastSet(reduced, "red")) - .print(); + .output(new DiscardingOutputFormat()); Plan plan = env.createProgramPlan(); @@ -927,9 +798,12 @@ public void testMultipleIterations() { IterativeDataSet iteration2 = input.iterate(20); IterativeDataSet iteration3 = input.iterate(17); - iteration1.closeWith(iteration1.map(new IdentityMapper()).withBroadcastSet(reduced, "bc1")).print(); - iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer()).withBroadcastSet(reduced, "bc2")).print(); - iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer()).withBroadcastSet(reduced, "bc3")).print(); + iteration1.closeWith(iteration1.map(new IdentityMapper()).withBroadcastSet(reduced, "bc1")) + .output(new DiscardingOutputFormat()); + iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer()).withBroadcastSet(reduced, "bc2")) + .output(new DiscardingOutputFormat()); + iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer()).withBroadcastSet(reduced, "bc3")) + .output(new DiscardingOutputFormat()); Plan plan = env.createProgramPlan(); @@ -953,9 +827,12 @@ public void testMultipleIterationsWithClosueBCVars() { IterativeDataSet iteration3 = input.iterate(17); - iteration1.closeWith(iteration1.map(new IdentityMapper())).print(); - iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer())).print(); - iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer())).print(); + iteration1.closeWith(iteration1.map(new IdentityMapper())) + .output(new DiscardingOutputFormat()); + iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer())) + .output(new DiscardingOutputFormat()); + iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer())) + .output(new DiscardingOutputFormat()); Plan plan = env.createProgramPlan(); @@ -979,7 +856,7 @@ public void testBranchesOnlyInBCVariables1() { input .map(new IdentityMapper()).withBroadcastSet(bc_input, "name1") .map(new IdentityMapper()).withBroadcastSet(bc_input, "name2") - .print(); + .output(new DiscardingOutputFormat()); Plan plan = env.createProgramPlan(); compileNoStats(plan); @@ -1019,7 +896,7 @@ public void testBranchesOnlyInBCVariables2() { .map(new IdentityMapper>()) .withBroadcastSet(bc_input1, "bc1") .union(joinResult) - .print(); + .output(new DiscardingOutputFormat>()); Plan plan = env.createProgramPlan(); compileNoStats(plan); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java index 57c53fff915b7..b0ecfe5624806 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.*; import org.apache.flink.api.common.Plan; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; import org.apache.flink.api.java.DataSet; @@ -43,7 +44,8 @@ public void testNoBreakerForIndependentVariable() { DataSet source1 = env.fromElements("test"); DataSet source2 = env.fromElements("test"); - source1.map(new IdentityMapper()).withBroadcastSet(source2, "some name").print(); + source1.map(new IdentityMapper()).withBroadcastSet(source2, "some name") + .output(new DiscardingOutputFormat()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -66,7 +68,8 @@ public void testBreakerForDependentVariable() { DataSet source1 = env.fromElements("test"); - source1.map(new IdentityMapper()).map(new IdentityMapper()).withBroadcastSet(source1, "some name").print(); + source1.map(new IdentityMapper()).map(new IdentityMapper()).withBroadcastSet(source1, "some name") + .output(new DiscardingOutputFormat()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java index 1a4cd18c302bc..e79550813e7e2 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; import org.apache.flink.api.common.Plan; @@ -215,7 +216,7 @@ private Plan getTestPlanRightStatic(String strategy) { Configuration joinStrategy = new Configuration(); joinStrategy.setString(Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH); - if(strategy != "") { + if(!strategy.equals("")) { joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy); } @@ -223,7 +224,7 @@ private Plan getTestPlanRightStatic(String strategy) { DataSet> output = iteration.closeWith(inner); - output.print(); + output.output(new DiscardingOutputFormat>()); return env.createProgramPlan(); @@ -250,7 +251,7 @@ private Plan getTestPlanLeftStatic(String strategy) { DataSet> output = iteration.closeWith(inner); - output.print(); + output.output(new DiscardingOutputFormat>()); return env.createProgramPlan(); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java index 61d407a3aebb8..f066b3624f96a 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.functions.RichCoGroupFunction; import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Assert; import org.junit.Test; @@ -64,7 +65,7 @@ public void testCoGroupSolutionSet() { DataSet> feedback = iteration.getWorkset().map(new SimpleMap()); DataSet> result = iteration.closeWith(delta, feedback); - result.print(); + result.output(new DiscardingOutputFormat>()); Plan plan = env.createProgramPlan(); OptimizedPlan oPlan = null; diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java index bb3aa47a1bdba..68953c09bdc6e 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.optimizer.util.CompilerTestBase; @@ -36,8 +37,10 @@ public void testDisjointFlows() { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // generate two different flows - env.generateSequence(1, 10).print(); - env.generateSequence(1, 10).print(); + env.generateSequence(1, 10) + .output(new DiscardingOutputFormat()); + env.generateSequence(1, 10) + .output(new DiscardingOutputFormat()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java index 7865861389c36..5827d9c225cc7 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.DistinctOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.plan.OptimizedPlan; @@ -50,7 +51,7 @@ public void testDistinctPlain() { data .distinct().name("reducer") - .print().name("sink"); + .output(new DiscardingOutputFormat>()).name("sink"); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -85,7 +86,6 @@ public void testDistinctPlain() { assertEquals(8, sinkNode.getParallelism()); } catch (Exception e) { - System.err.println(e.getMessage()); e.printStackTrace(); fail(e.getClass().getSimpleName() + " in test: " + e.getMessage()); } @@ -104,7 +104,7 @@ public void testDistinctWithSelectorFunctionKey() { .distinct(new KeySelector, String>() { public String getKey(Tuple2 value) { return value.f0; } }).name("reducer") - .print().name("sink"); + .output(new DiscardingOutputFormat>()).name("sink"); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -146,7 +146,6 @@ public void testDistinctWithSelectorFunctionKey() { assertEquals(8, sinkNode.getParallelism()); } catch (Exception e) { - System.err.println(e.getMessage()); e.printStackTrace(); fail(e.getClass().getSimpleName() + " in test: " + e.getMessage()); } @@ -164,7 +163,7 @@ public void testDistinctWithFieldPositionKeyCombinable() { DistinctOperator> reduced = data .distinct(1).name("reducer"); - reduced.print().name("sink"); + reduced.output(new DiscardingOutputFormat>()).name("sink"); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -199,7 +198,6 @@ public void testDistinctWithFieldPositionKeyCombinable() { assertEquals(8, sinkNode.getParallelism()); } catch (Exception e) { - System.err.println(e.getMessage()); e.printStackTrace(); fail(e.getClass().getSimpleName() + " in test: " + e.getMessage()); } diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java index 76b3b0e71e631..73284235e42ab 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java @@ -20,30 +20,24 @@ import static org.junit.Assert.fail; -import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.api.java.record.operators.CoGroupOperator; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.ReduceOperator; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.operators.translation.JavaPlan; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.optimizer.plan.Channel; import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.util.DummyCoGroupStub; -import org.apache.flink.optimizer.util.DummyInputFormat; -import org.apache.flink.optimizer.util.DummyOutputFormat; -import org.apache.flink.optimizer.util.IdentityReduce; +import org.apache.flink.optimizer.testfunctions.IdentityCoGrouper; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.util.LocalStrategy; import org.apache.flink.optimizer.util.CompilerTestBase; -import org.apache.flink.types.DoubleValue; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.StringValue; import org.junit.Assert; import org.junit.Test; @@ -51,25 +45,24 @@ * This test case has been created to validate that correct strategies are used if orders within groups are * requested. */ -@SuppressWarnings({"serial", "deprecation"}) +@SuppressWarnings({"serial"}) public class GroupOrderTest extends CompilerTestBase { @Test public void testReduceWithGroupOrder() { // construct the plan - FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); - - ReduceOperator reduce = ReduceOperator.builder(new IdentityReduce()).keyField(IntValue.class, 2).name("Reduce").input(source).build(); - Ordering groupOrder = new Ordering(5, StringValue.class, Order.DESCENDING); - reduce.setGroupOrder(groupOrder); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, reduce, "Sink"); - - - Plan plan = new Plan(sink, "Test Temp Task"); - plan.setDefaultParallelism(DEFAULT_PARALLELISM); - + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + DataSet> set1 = env.readCsvFile("/tmp/fake.csv") + .types(Long.class, Long.class, Long.class, Long.class); + + set1.groupBy(1).sortGroup(3, Order.DESCENDING) + .reduceGroup(new IdentityGroupReducer>()).name("Reduce") + .output(new DiscardingOutputFormat>()).name("Sink"); + + JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan; + try { oPlan = compileNoStats(plan); } catch(CompilerException ce) { @@ -89,38 +82,35 @@ public void testReduceWithGroupOrder() { Channel c = reducer.getInput(); Assert.assertEquals(LocalStrategy.SORT, c.getLocalStrategy()); - FieldList ship = new FieldList(2); - FieldList local = new FieldList(2, 5); + FieldList ship = new FieldList(1); + FieldList local = new FieldList(1, 3); Assert.assertEquals(ship, c.getShipStrategyKeys()); Assert.assertEquals(local, c.getLocalStrategyKeys()); Assert.assertTrue(c.getLocalStrategySortOrder()[0] == reducer.getSortOrders(0)[0]); // check that we indeed sort descending - Assert.assertTrue(c.getLocalStrategySortOrder()[1] == groupOrder.getFieldSortDirections()[0]); + Assert.assertEquals(false, c.getLocalStrategySortOrder()[1]); } @Test public void testCoGroupWithGroupOrder() { // construct the plan - FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source1"); - FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source2"); - - CoGroupOperator coGroup = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 3, 6) - .keyField(LongValue.class, 0, 0) - .name("CoGroup").input1(source1).input2(source2).build(); - - Ordering groupOrder1 = new Ordering(5, StringValue.class, Order.DESCENDING); - Ordering groupOrder2 = new Ordering(1, StringValue.class, Order.DESCENDING); - groupOrder2.appendOrdering(4, DoubleValue.class, Order.ASCENDING); - coGroup.setGroupOrderForInputOne(groupOrder1); - coGroup.setGroupOrderForInputTwo(groupOrder2); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, coGroup, "Sink"); - - Plan plan = new Plan(sink, "Reduce Group Order Test"); - plan.setDefaultParallelism(DEFAULT_PARALLELISM); - + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + DataSet> set1 = env.readCsvFile("/tmp/fake1.csv") + .types(Long.class, Long.class, Long.class, Long.class, Long.class, Long.class, Long.class); + DataSet> set2 = env.readCsvFile("/tmp/fake2.csv") + .types(Long.class, Long.class, Long.class, Long.class, Long.class, Long.class, Long.class); + + set1.coGroup(set2).where(3,0).equalTo(6,0) + .sortFirstGroup(5, Order.DESCENDING) + .sortSecondGroup(1, Order.DESCENDING).sortSecondGroup(4, Order.ASCENDING) + .with(new IdentityCoGrouper>()).name("CoGroup") + .output(new DiscardingOutputFormat>()).name("Sink"); + + JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan; + try { oPlan = compileNoStats(plan); } catch(CompilerException ce) { @@ -144,11 +134,11 @@ public void testCoGroupWithGroupOrder() { Assert.assertEquals(LocalStrategy.SORT, c1.getLocalStrategy()); Assert.assertEquals(LocalStrategy.SORT, c2.getLocalStrategy()); - FieldList ship1 = new FieldList(new int[] {3, 0}); - FieldList ship2 = new FieldList(new int[] {6, 0}); + FieldList ship1 = new FieldList(3, 0); + FieldList ship2 = new FieldList(6, 0); - FieldList local1 = new FieldList(new int[] {3, 0, 5}); - FieldList local2 = new FieldList(new int[] {6, 0, 1, 4}); + FieldList local1 = new FieldList(3, 0, 5); + FieldList local2 = new FieldList(6, 0, 1, 4); Assert.assertEquals(ship1, c1.getShipStrategyKeys()); Assert.assertEquals(ship2, c2.getShipStrategyKeys()); @@ -161,8 +151,8 @@ public void testCoGroupWithGroupOrder() { Assert.assertTrue(c2.getLocalStrategySortOrder()[1] == coGroupNode.getSortOrders()[1]); // check that the local group orderings are correct - Assert.assertTrue(c1.getLocalStrategySortOrder()[2] == groupOrder1.getFieldSortDirections()[0]); - Assert.assertTrue(c2.getLocalStrategySortOrder()[2] == groupOrder2.getFieldSortDirections()[0]); - Assert.assertTrue(c2.getLocalStrategySortOrder()[3] == groupOrder2.getFieldSortDirections()[1]); + Assert.assertEquals(false, c1.getLocalStrategySortOrder()[2]); + Assert.assertEquals(false, c2.getLocalStrategySortOrder()[2]); + Assert.assertEquals(true, c2.getLocalStrategySortOrder()[3]); } } diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java index 52e9a2dcc776a..adca504fcad51 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java @@ -18,21 +18,16 @@ package org.apache.flink.optimizer; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.record.operators.CrossOperator; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.operators.translation.JavaPlan; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; -import org.apache.flink.optimizer.util.DummyCrossStub; -import org.apache.flink.optimizer.util.DummyInputFormat; -import org.apache.flink.optimizer.util.DummyOutputFormat; -import org.apache.flink.optimizer.util.IdentityMap; -import org.apache.flink.optimizer.util.IdentityReduce; +import org.apache.flink.optimizer.testfunctions.IdentityCrosser; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityMapper; import org.apache.flink.optimizer.util.CompilerTestBase; -import org.apache.flink.types.IntValue; import org.junit.Test; /** @@ -41,7 +36,7 @@ *
  • Ticket 158 * */ -@SuppressWarnings({"serial", "deprecation"}) +@SuppressWarnings({"serial"}) public class HardPlansCompilationTest extends CompilerTestBase { /** @@ -54,27 +49,21 @@ public class HardPlansCompilationTest extends CompilerTestBase { @Test public void testTicket158() { // construct the plan - FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); - - MapOperator map = MapOperator.builder(new IdentityMap()).name("Map1").input(source).build(); - - ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce1").input(map).build(); - - CrossOperator cross1 = CrossOperator.builder(new DummyCrossStub()).name("Cross1").input1(reduce1).input2(source).build(); - - ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce2").input(cross1).build(); - - CrossOperator cross2 = CrossOperator.builder(new DummyCrossStub()).name("Cross2").input1(reduce2).input2(source).build(); - - ReduceOperator reduce3 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce3").input(cross2).build(); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); - sink.setInput(reduce3); - - Plan plan = new Plan(sink, "Test Temp Task"); - plan.setDefaultParallelism(DEFAULT_PARALLELISM); - + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + DataSet set1 = env.generateSequence(0,1); + + set1.map(new IdentityMapper()).name("Map1") + .groupBy("*").reduceGroup(new IdentityGroupReducer()).name("Reduce1") + .cross(set1).with(new IdentityCrosser()).withForwardedFieldsFirst("*").name("Cross1") + .groupBy("*").reduceGroup(new IdentityGroupReducer()).name("Reduce2") + .cross(set1).with(new IdentityCrosser()).name("Cross2") + .groupBy("*").reduceGroup(new IdentityGroupReducer()).name("Reduce3") + .output(new DiscardingOutputFormat()).name("Sink"); + + JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileNoStats(plan); + JobGraphGenerator jobGen = new JobGraphGenerator(); jobGen.compileJobGraph(oPlan); } diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java index 0afbe93367054..269be6edad654 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.*; import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; import org.apache.flink.api.common.Plan; @@ -72,7 +73,8 @@ public void testSolutionSetDeltaDependsOnBroadcastVariable() { .map(new IdentityMapper>()).withBroadcastSet(iter.getWorkset(), "bc data") .join(iter.getSolutionSet()).where(0).equalTo(1).projectFirst(1).projectSecond(1); - iter.closeWith(result.map(new IdentityMapper>()), result).print(); + iter.closeWith(result.map(new IdentityMapper>()), result) + .output(new DiscardingOutputFormat>()); OptimizedPlan p = compileNoStats(env.createProgramPlan()); @@ -104,7 +106,7 @@ public void testTwoIterationsWithMapperInbetween() throws Exception { DataSet> depResult = doDeltaIteration(mappedBulk, edges); - depResult.print(); + depResult.output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -120,7 +122,6 @@ public void testTwoIterationsWithMapperInbetween() throws Exception { new JobGraphGenerator().compileJobGraph(op); } catch (Exception e) { - System.err.println(e.getMessage()); e.printStackTrace(); fail(e.getMessage()); } @@ -140,7 +141,7 @@ public void testTwoIterationsDirectlyChained() throws Exception { DataSet> depResult = doDeltaIteration(bulkResult, edges); - depResult.print(); + depResult.output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -156,7 +157,6 @@ public void testTwoIterationsDirectlyChained() throws Exception { new JobGraphGenerator().compileJobGraph(op); } catch (Exception e) { - System.err.println(e.getMessage()); e.printStackTrace(); fail(e.getMessage()); } @@ -176,7 +176,7 @@ public void testTwoWorksetIterationsDirectlyChained() throws Exception { DataSet> secondResult = doDeltaIteration(firstResult, edges); - secondResult.print(); + secondResult.output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -192,7 +192,6 @@ public void testTwoWorksetIterationsDirectlyChained() throws Exception { new JobGraphGenerator().compileJobGraph(op); } catch (Exception e) { - System.err.println(e.getMessage()); e.printStackTrace(); fail(e.getMessage()); } @@ -208,7 +207,7 @@ public void testIterationPushingWorkOut() throws Exception { DataSet> input2 = env.readCsvFile("/some/file/path").types(Long.class, Long.class); - doBulkIteration(input1, input2).print(); + doBulkIteration(input1, input2).output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -226,7 +225,6 @@ public void testIterationPushingWorkOut() throws Exception { new JobGraphGenerator().compileJobGraph(op); } catch (Exception e) { - System.err.println(e.getMessage()); e.printStackTrace(); fail(e.getMessage()); } @@ -253,7 +251,7 @@ public void testWorksetIterationPipelineBreakerPlacement() { initialWorkset .join(result, JoinHint.REPARTITION_HASH_FIRST) .where(0).equalTo(0) - .print(); + .output(new DiscardingOutputFormat, Tuple2>>()); Plan p = env.createProgramPlan(); compileNoStats(p); @@ -295,7 +293,7 @@ public void testResetPartialSolution() { DataSet result = iteration.closeWith(width.union(update).union(lastGradient)); - result.print(); + result.output(new DiscardingOutputFormat()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -348,7 +346,7 @@ public static DataSet> doDeltaIteration(DataSet> depResult = depIteration.closeWith(updatedComponentId, updatedComponentId); - + return depResult; } diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java index 34fc0854c1038..3a5145187e4a9 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.operators.IterativeDataSet; import org.apache.flink.api.java.tuple.Tuple2; @@ -52,7 +53,7 @@ public void testRejectNestedBulkIterations() { DataSet outerResult = outerIteration.closeWith(innerResult.map(new IdentityMapper())); - outerResult.print(); + outerResult.output(new DiscardingOutputFormat()); Plan p = env.createProgramPlan(); @@ -88,7 +89,7 @@ public void testRejectNestedWorksetIterations() { DataSet> outerResult = outerIteration.closeWith(innerResult, innerResult); - outerResult.print(); + outerResult.output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); @@ -126,7 +127,7 @@ public void testBulkIterationInClosure() { DataSet mainResult = mainIteration.closeWith(joined); - mainResult.print(); + mainResult.output(new DiscardingOutputFormat()); Plan p = env.createProgramPlan(); @@ -164,7 +165,7 @@ public void testDeltaIterationInClosure() { DataSet> mainResult = mainIteration.closeWith(joined, joined); - mainResult.print(); + mainResult.output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java index 8236f1050fa14..9ddff3382ca55 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java @@ -17,6 +17,10 @@ */ package org.apache.flink.optimizer; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.operators.translation.JavaPlan; import org.apache.flink.optimizer.plan.Channel; import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; @@ -24,22 +28,13 @@ import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; -import org.apache.flink.optimizer.util.DummyInputFormat; -import org.apache.flink.optimizer.util.DummyMatchStub; -import org.apache.flink.optimizer.util.DummyOutputFormat; -import org.apache.flink.optimizer.util.IdentityMap; -import org.apache.flink.optimizer.util.IdentityReduce; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityJoiner; +import org.apache.flink.optimizer.testfunctions.IdentityMapper; import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Assert; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.JoinOperator; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.util.LocalStrategy; -import org.apache.flink.types.IntValue; import org.apache.flink.util.Visitor; import org.junit.Test; @@ -50,7 +45,7 @@ * parallelism between tasks is increased or decreased. * */ -@SuppressWarnings({"serial", "deprecation"}) +@SuppressWarnings({"serial"}) public class ParallelismChangeTest extends CompilerTestBase { /** @@ -62,34 +57,24 @@ public class ParallelismChangeTest extends CompilerTestBase { */ @Test public void checkPropertyHandlingWithIncreasingGlobalParallelism1() { - final int degOfPar = DEFAULT_PARALLELISM; - + final int p = DEFAULT_PARALLELISM; + // construct the plan - FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); - source.setParallelism(degOfPar); - - MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build(); - map1.setParallelism(degOfPar); - map1.setInput(source); - - ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build(); - reduce1.setParallelism(degOfPar); - reduce1.setInput(map1); - - MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build(); - map2.setParallelism(degOfPar * 2); - map2.setInput(reduce1); - - ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build(); - reduce2.setParallelism(degOfPar * 2); - reduce2.setInput(map2); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); - sink.setParallelism(degOfPar * 2); - sink.setInput(reduce2); - - Plan plan = new Plan(sink, "Test Increasing parallelism"); - + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(p); + DataSet set1 = env.generateSequence(0,1).setParallelism(p); + + set1.map(new IdentityMapper()) + .withForwardedFields("*").setParallelism(p).name("Map1") + .groupBy("*").reduceGroup(new IdentityGroupReducer()) + .withForwardedFields("*").setParallelism(p).name("Reduce1") + .map(new IdentityMapper()) + .withForwardedFields("*").setParallelism(p * 2).name("Map2") + .groupBy("*").reduceGroup(new IdentityGroupReducer()) + .withForwardedFields("*").setParallelism(p * 2).name("Reduce2") + .output(new DiscardingOutputFormat()).setParallelism(p * 2).name("Sink"); + + JavaPlan plan = env.createProgramPlan(); // submit the plan to the compiler OptimizedPlan oPlan = compileNoStats(plan); @@ -116,33 +101,24 @@ public void checkPropertyHandlingWithIncreasingGlobalParallelism1() { */ @Test public void checkPropertyHandlingWithIncreasingGlobalParallelism2() { - final int degOfPar = DEFAULT_PARALLELISM; - + final int p = DEFAULT_PARALLELISM; + // construct the plan - FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); - source.setParallelism(degOfPar); - - MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build(); - map1.setParallelism(degOfPar); - map1.setInput(source); - - ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build(); - reduce1.setParallelism(degOfPar); - reduce1.setInput(map1); - - MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build(); - map2.setParallelism(degOfPar); - map2.setInput(reduce1); - - ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build(); - reduce2.setParallelism(degOfPar * 2); - reduce2.setInput(map2); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); - sink.setParallelism(degOfPar * 2); - sink.setInput(reduce2); - - Plan plan = new Plan(sink, "Test Increasing parallelism"); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(p); + DataSet set1 = env.generateSequence(0,1).setParallelism(p); + + set1.map(new IdentityMapper()) + .withForwardedFields("*").setParallelism(p).name("Map1") + .groupBy("*").reduceGroup(new IdentityGroupReducer()) + .withForwardedFields("*").setParallelism(p).name("Reduce1") + .map(new IdentityMapper()) + .withForwardedFields("*").setParallelism(p).name("Map2") + .groupBy("*").reduceGroup(new IdentityGroupReducer()) + .withForwardedFields("*").setParallelism(p * 2).name("Reduce2") + .output(new DiscardingOutputFormat()).setParallelism(p * 2).name("Sink"); + + JavaPlan plan = env.createProgramPlan(); // submit the plan to the compiler OptimizedPlan oPlan = compileNoStats(plan); @@ -170,34 +146,24 @@ public void checkPropertyHandlingWithIncreasingGlobalParallelism2() { */ @Test public void checkPropertyHandlingWithIncreasingLocalParallelism() { - final int degOfPar = 2 * DEFAULT_PARALLELISM; - + final int p = DEFAULT_PARALLELISM * 2; + // construct the plan - FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); - source.setParallelism(degOfPar); - - MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build(); - map1.setParallelism(degOfPar); - map1.setInput(source); - - ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build(); - reduce1.setParallelism(degOfPar); - reduce1.setInput(map1); - - MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build(); - map2.setParallelism(degOfPar * 2); - map2.setInput(reduce1); - - ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build(); - reduce2.setParallelism(degOfPar * 2); - reduce2.setInput(map2); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); - sink.setParallelism(degOfPar * 2); - sink.setInput(reduce2); - - Plan plan = new Plan(sink, "Test Increasing parallelism"); - + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(p); + DataSet set1 = env.generateSequence(0,1).setParallelism(p); + + set1.map(new IdentityMapper()) + .withForwardedFields("*").setParallelism(p).name("Map1") + .groupBy("*").reduceGroup(new IdentityGroupReducer()) + .withForwardedFields("*").setParallelism(p).name("Reduce1") + .map(new IdentityMapper()) + .withForwardedFields("*").setParallelism(p * 2).name("Map2") + .groupBy("*").reduceGroup(new IdentityGroupReducer()) + .withForwardedFields("*").setParallelism(p * 2).name("Reduce2") + .output(new DiscardingOutputFormat()).setParallelism(p * 2).name("Sink"); + + JavaPlan plan = env.createProgramPlan(); // submit the plan to the compiler OptimizedPlan oPlan = compileNoStats(plan); @@ -217,38 +183,27 @@ public void checkPropertyHandlingWithIncreasingLocalParallelism() { (ShipStrategyType.PARTITION_HASH == mapIn && ShipStrategyType.FORWARD == reduceIn)); } - - @Test public void checkPropertyHandlingWithDecreasingParallelism() { - final int degOfPar = DEFAULT_PARALLELISM; - + final int p = DEFAULT_PARALLELISM; + // construct the plan - FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); - source.setParallelism(degOfPar * 2); - - MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build(); - map1.setParallelism(degOfPar * 2); - map1.setInput(source); - - ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build(); - reduce1.setParallelism(degOfPar * 2); - reduce1.setInput(map1); - - MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build(); - map2.setParallelism(degOfPar); - map2.setInput(reduce1); - - ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build(); - reduce2.setParallelism(degOfPar); - reduce2.setInput(map2); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); - sink.setParallelism(degOfPar); - sink.setInput(reduce2); - - Plan plan = new Plan(sink, "Test Increasing parallelism"); - + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(p); + + env + .generateSequence(0, 1).setParallelism(p * 2) + .map(new IdentityMapper()) + .withForwardedFields("*").setParallelism(p * 2).name("Map1") + .groupBy("*").reduceGroup(new IdentityGroupReducer()) + .withForwardedFields("*").setParallelism(p * 2).name("Reduce1") + .map(new IdentityMapper()) + .withForwardedFields("*").setParallelism(p).name("Map2") + .groupBy("*").reduceGroup(new IdentityGroupReducer()) + .withForwardedFields("*").setParallelism(p).name("Reduce2") + .output(new DiscardingOutputFormat()).setParallelism(p).name("Sink"); + + JavaPlan plan = env.createProgramPlan(); // submit the plan to the compiler OptimizedPlan oPlan = compileNoStats(plan); @@ -284,40 +239,29 @@ public void checkPropertyHandlingWithDecreasingParallelism() { */ @Test public void checkPropertyHandlingWithTwoInputs() { + // construct the plan + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); - FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE); - FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE); - - ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0) - .input(sourceA) - .build(); - ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0) - .input(sourceB) - .build(); - - JoinOperator mat = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) - .input1(redA) - .input2(redB) - .build(); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat); - - sourceA.setParallelism(5); - sourceB.setParallelism(7); - redA.setParallelism(5); - redB.setParallelism(7); - - mat.setParallelism(5); - - sink.setParallelism(5); - - - // return the PACT plan - Plan plan = new Plan(sink, "Partition on DoP Change"); - + DataSet set1 = env.generateSequence(0,1).setParallelism(5); + DataSet set2 = env.generateSequence(0,1).setParallelism(7); + + DataSet reduce1 = set1 + .groupBy("*").reduceGroup(new IdentityGroupReducer()) + .withForwardedFields("*").setParallelism(5); + DataSet reduce2 = set2 + .groupBy("*").reduceGroup(new IdentityGroupReducer()) + .withForwardedFields("*").setParallelism(7); + + reduce1.join(reduce2).where("*").equalTo("*") + .with(new IdentityJoiner()).setParallelism(5) + .output(new DiscardingOutputFormat()).setParallelism(5); + + JavaPlan plan = env.createProgramPlan(); + // submit the plan to the compiler OptimizedPlan oPlan = compileNoStats(plan); - + JobGraphGenerator jobGen = new JobGraphGenerator(); //Compile plan to verify that no error is thrown diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java index 72effc19d06cd..365726d7b55c1 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; @@ -46,7 +47,7 @@ public void testPartitioningNotPushedDown() { input .groupBy(0, 1).sum(2) .groupBy(0).sum(1) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -82,7 +83,7 @@ public void testPartitioningReused() { input .groupBy(0).sum(1) .groupBy(0, 1).sum(2) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java index f42eb02651118..41e0eb953f3e5 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.translation.JavaPlan; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.optimizer.dataproperties.GlobalProperties; @@ -52,7 +53,7 @@ public void noPreviousPartitioningJoin1() { .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,1).equalTo(0,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -73,7 +74,7 @@ public void noPreviousPartitioningJoin2() { .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,1).equalTo(2,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -96,7 +97,7 @@ public void reuseSinglePartitioningJoin1() { .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,1).equalTo(0,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -119,7 +120,7 @@ public void reuseSinglePartitioningJoin2() { .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,1).equalTo(2,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -142,7 +143,7 @@ public void reuseSinglePartitioningJoin3() { JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,1).equalTo(2,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -164,7 +165,7 @@ public void reuseSinglePartitioningJoin4() { .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,1).equalTo(2,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -187,7 +188,7 @@ public void reuseSinglePartitioningJoin5() { JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,1).equalTo(2,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -212,7 +213,7 @@ public void reuseBothPartitioningJoin1() { JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,1).equalTo(0,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -238,7 +239,7 @@ public void reuseBothPartitioningJoin2() { JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,1).equalTo(2,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -263,7 +264,7 @@ public void reuseBothPartitioningJoin3() { JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,1).equalTo(2,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -288,7 +289,7 @@ public void reuseBothPartitioningJoin4() { JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,2).equalTo(2,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -313,7 +314,7 @@ public void reuseBothPartitioningJoin5() { JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,2).equalTo(2,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -338,7 +339,7 @@ public void reuseBothPartitioningJoin6() { JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,2).equalTo(1,2).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -363,7 +364,7 @@ public void reuseBothPartitioningJoin7() { JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,2).equalTo(1,2).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -384,7 +385,7 @@ public void noPreviousPartitioningCoGroup1() { .coGroup(set2) .where(0,1).equalTo(0,1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -405,7 +406,7 @@ public void noPreviousPartitioningCoGroup2() { .coGroup(set2) .where(0,1).equalTo(2,1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -428,7 +429,7 @@ public void reuseSinglePartitioningCoGroup1() { .coGroup(set2) .where(0,1).equalTo(0,1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -451,7 +452,7 @@ public void reuseSinglePartitioningCoGroup2() { .coGroup(set2) .where(0,1).equalTo(2,1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -473,7 +474,7 @@ public void reuseSinglePartitioningCoGroup3() { .withForwardedFields("2;1")) .where(0,1).equalTo(2, 1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -495,7 +496,7 @@ public void reuseSinglePartitioningCoGroup4() { .coGroup(set2) .where(0, 1).equalTo(2, 1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -517,7 +518,7 @@ public void reuseSinglePartitioningCoGroup5() { .withForwardedFields("2")) .where(0,1).equalTo(2,1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -541,7 +542,7 @@ public void reuseBothPartitioningCoGroup1() { .withForwardedFields("0;1")) .where(0, 1).equalTo(0, 1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -566,7 +567,7 @@ public void reuseBothPartitioningCoGroup2() { .withForwardedFields("1;2")) .where(0, 1).equalTo(2, 1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -590,7 +591,7 @@ public void reuseBothPartitioningCoGroup3() { .withForwardedFields("2;1")) .where(0, 1).equalTo(2, 1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -614,7 +615,7 @@ public void reuseBothPartitioningCoGroup4() { .withForwardedFields("1")) .where(0, 2).equalTo(2, 1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -638,7 +639,7 @@ public void reuseBothPartitioningCoGroup5() { .withForwardedFields("1")) .where(0, 2).equalTo(2, 1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -662,7 +663,7 @@ public void reuseBothPartitioningCoGroup6() { .withForwardedFields("2")) .where(0, 2).equalTo(1, 2).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -686,7 +687,7 @@ public void reuseBothPartitioningCoGroup7() { .withForwardedFields("1")) .where(0, 2).equalTo(1, 2).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java index 84f6377b62008..68e8a4152b041 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java @@ -20,6 +20,8 @@ import static org.junit.Assert.*; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.testfunctions.IdentityMapper; import org.apache.flink.optimizer.testfunctions.SelectOneReducer; import org.apache.flink.optimizer.util.CompilerTestBase; @@ -50,7 +52,7 @@ public void testPipelineBreakerWithBroadcastVariable() { .map(new IdentityMapper()) .withBroadcastSet(source, "bc"); - result.print(); + result.output(new DiscardingOutputFormat()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -84,7 +86,7 @@ public void testPipelineBreakerBroadcastedAllReduce() { .withBroadcastSet(bcInput1, "bc1") .withBroadcastSet(bcInput2, "bc2"); - result.print(); + result.output(new DiscardingOutputFormat()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -123,7 +125,7 @@ public void testPipelineBreakerBroadcastedPartialSolution() { .withBroadcastSet(bcInput1, "bc1"); - iteration.closeWith(result).print(); + iteration.closeWith(result).output(new DiscardingOutputFormat()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -154,7 +156,7 @@ public void testPilelineBreakerWithCross() { initialSource .map(new IdentityMapper()) .cross(initialSource).withParameters(conf) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); @@ -176,7 +178,7 @@ public void testPilelineBreakerWithCross() { initialSource .map(new IdentityMapper()) .cross(initialSource).withParameters(conf) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); @@ -199,7 +201,7 @@ public void testPilelineBreakerWithCross() { initialSource .map(new IdentityMapper()) .cross(initialSource).withParameters(conf) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); @@ -222,7 +224,7 @@ public void testPilelineBreakerWithCross() { initialSource .map(new IdentityMapper()) .cross(initialSource).withParameters(conf) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java index d6b9444f7362e..dc9f2e548087b 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.translation.JavaPlan; import org.apache.flink.api.java.tuple.Tuple2; @@ -64,7 +65,7 @@ public void checkSinglePartitionedSource1() { data.getSplitDataProperties() .splitsPartitionedBy(0); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -97,7 +98,7 @@ public void checkSinglePartitionedSource2() { data.getSplitDataProperties() .splitsPartitionedBy(1, 0); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -129,7 +130,7 @@ public void checkSinglePartitionedSource3() { data.getSplitDataProperties() .splitsPartitionedBy("*"); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -161,7 +162,7 @@ public void checkSinglePartitionedSource4() { data.getSplitDataProperties() .splitsPartitionedBy("f1"); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -193,7 +194,7 @@ public void checkSinglePartitionedSource5() { data.getSplitDataProperties() .splitsPartitionedBy("f1.stringField"); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -225,7 +226,7 @@ public void checkSinglePartitionedSource6() { data.getSplitDataProperties() .splitsPartitionedBy("f1.intField; f2"); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -258,7 +259,7 @@ public void checkSinglePartitionedSource7() { data.getSplitDataProperties() .splitsPartitionedBy("byDate", 1, 0); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -293,7 +294,7 @@ public void checkSinglePartitionedGroupedSource1() { .splitsPartitionedBy(0) .splitsGroupedBy(0); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -327,7 +328,7 @@ public void checkSinglePartitionedGroupedSource2() { .splitsPartitionedBy(0) .splitsGroupedBy(1, 0); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -362,7 +363,7 @@ public void checkSinglePartitionedGroupedSource3() { .splitsPartitionedBy(1) .splitsGroupedBy(0); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -396,7 +397,7 @@ public void checkSinglePartitionedGroupedSource4() { .splitsPartitionedBy(0, 1) .splitsGroupedBy(0); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -429,7 +430,7 @@ public void checkSinglePartitionedGroupedSource5() { .splitsPartitionedBy("f2") .splitsGroupedBy("f2"); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -463,7 +464,7 @@ public void checkSinglePartitionedGroupedSource6() { .splitsPartitionedBy("f1.intField") .splitsGroupedBy("f0; f1.intField"); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -497,7 +498,7 @@ public void checkSinglePartitionedGroupedSource7() { .splitsPartitionedBy("f1.intField") .splitsGroupedBy("f1"); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -530,7 +531,7 @@ public void checkSinglePartitionedGroupedSource8() { .splitsPartitionedBy("f1") .splitsGroupedBy("f1.stringField"); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -565,7 +566,7 @@ public void checkSinglePartitionedOrderedSource1() { .splitsPartitionedBy(1) .splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING}); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -599,7 +600,7 @@ public void checkSinglePartitionedOrderedSource2() { .splitsPartitionedBy(1) .splitsOrderedBy(new int[]{1, 0}, new Order[]{Order.ASCENDING, Order.DESCENDING}); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -634,7 +635,7 @@ public void checkSinglePartitionedOrderedSource3() { .splitsPartitionedBy(0) .splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING}); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -668,7 +669,7 @@ public void checkSinglePartitionedOrderedSource4() { .splitsPartitionedBy(0, 1) .splitsOrderedBy(new int[]{1}, new Order[]{Order.DESCENDING}); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -701,7 +702,7 @@ public void checkSinglePartitionedOrderedSource5() { .splitsPartitionedBy("f1.intField") .splitsOrderedBy("f0; f1.intField", new Order[]{Order.ASCENDING, Order.DESCENDING}); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -735,7 +736,7 @@ public void checkSinglePartitionedOrderedSource6() { .splitsPartitionedBy("f1.intField") .splitsOrderedBy("f1", new Order[]{Order.DESCENDING}); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -768,7 +769,7 @@ public void checkSinglePartitionedOrderedSource7() { .splitsPartitionedBy("f1") .splitsOrderedBy("f1.stringField", new Order[]{Order.ASCENDING}); - data.print(); + data.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -808,7 +809,7 @@ public void checkCoPartitionedSources1() { data2.getSplitDataProperties() .splitsPartitionedBy("byDate", 0); - data1.union(data2).print(); + data1.union(data2).output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); @@ -856,7 +857,7 @@ public void checkCoPartitionedSources2() { data2.getSplitDataProperties() .splitsPartitionedBy("byDate", 0); - data1.union(data2).print(); + data1.union(data2).output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java index 3af64fc2f8992..b0dca66ab05ac 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java @@ -20,36 +20,35 @@ import static org.junit.Assert.fail; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.ReduceOperator; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.operators.translation.JavaPlan; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; import org.apache.flink.optimizer.util.CompilerTestBase; -import org.apache.flink.optimizer.util.DummyInputFormat; -import org.apache.flink.optimizer.util.DummyOutputFormat; -import org.apache.flink.optimizer.util.IdentityReduce; import org.junit.Test; /** * This test case has been created to validate a bug that occurred when * the ReduceOperator was used without a grouping key. */ -@SuppressWarnings({"serial", "deprecation"}) +@SuppressWarnings({"serial"}) public class ReduceAllTest extends CompilerTestBase { @Test public void testReduce() { // construct the plan - FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); - ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce()).name("Reduce1").input(source).build(); - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); - sink.setInput(reduce1); - Plan plan = new Plan(sink, "AllReduce Test"); - plan.setDefaultParallelism(DEFAULT_PARALLELISM); - - + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + DataSet set1 = env.generateSequence(0,1); + + set1.reduceGroup(new IdentityGroupReducer()).name("Reduce1") + .output(new DiscardingOutputFormat()).name("Sink"); + + JavaPlan plan = env.createProgramPlan(); + try { OptimizedPlan oPlan = compileNoStats(plan); JobGraphGenerator jobGen = new JobGraphGenerator(); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java index 25643a456534e..26af3800e6dd1 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java @@ -44,7 +44,7 @@ import org.junit.Assert; import org.junit.Test; -@SuppressWarnings({"serial", "deprecation"}) +@SuppressWarnings({"serial", "unchecked"}) public class ReplicatingDataSourceTest extends CompilerTestBase { /** diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java index 3a24ce18f686d..00ada2a1941c9 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.translation.JavaPlan; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.optimizer.dataproperties.GlobalProperties; @@ -57,7 +58,7 @@ public void forwardFieldsTestMapReduce() { .groupBy(1) .reduce(new MockReducer()).withForwardedFields("*"); - set.print(); + set.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -118,7 +119,7 @@ public void forwardFieldsTestJoin() { .reduce(new MockReducer()).withForwardedFields("f1->f2"); DataSet> out = in1.join(in2).where(1).equalTo(2).with(new MockJoin()); - out.print(); + out.output(new DiscardingOutputFormat>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java index 65e5025e4cd37..a94f845e37226 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.*; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; import org.apache.flink.api.common.Plan; @@ -30,7 +31,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable; import org.apache.flink.optimizer.testfunctions.IdentityMapper; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.util.LocalStrategy; @@ -51,12 +52,12 @@ public void testPartialPartitioningReuse() { .map(new IdentityMapper>()).withForwardedFields("0", "1", "2") .groupBy(0, 1) - .reduceGroup(new IdentityGroupReducer>()).withForwardedFields("0", "1", "2") + .reduceGroup(new IdentityGroupReducerCombinable>()).withForwardedFields("0", "1", "2") .groupBy(0) - .reduceGroup(new IdentityGroupReducer>()) - - .print(); + .reduceGroup(new IdentityGroupReducerCombinable>()) + + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -96,12 +97,12 @@ public void testCustomPartitioningNotReused() { .map(new IdentityMapper>()).withForwardedFields("0", "1", "2") .groupBy(0, 1) - .reduceGroup(new IdentityGroupReducer>()).withForwardedFields("0", "1", "2") + .reduceGroup(new IdentityGroupReducerCombinable>()).withForwardedFields("0", "1", "2") .groupBy(1) - .reduceGroup(new IdentityGroupReducer>()) - - .print(); + .reduceGroup(new IdentityGroupReducerCombinable>()) + + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java index 100162658a0f2..f041b2ab5d012 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.IterativeDataSet; import org.apache.flink.optimizer.plan.BinaryUnionPlanNode; import org.apache.flink.optimizer.plan.BulkIterationPlanNode; @@ -50,8 +51,8 @@ public void testUnionStaticFirst() { DataSet result = iteration.closeWith( input2.union(input2).union(iteration.union(iteration))); - result.print(); - result.print(); + result.output(new DiscardingOutputFormat()); + result.output(new DiscardingOutputFormat()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -102,8 +103,8 @@ public void testUnionStaticSecond() { DataSet iterResult = iteration .closeWith(iteration.union(iteration).union(input2.union(input2))); - iterResult.print(); - iterResult.print(); + iterResult.output(new DiscardingOutputFormat()); + iterResult.output(new DiscardingOutputFormat()); Plan p = env.createProgramPlan(); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java index 2e52565bfa980..fee6e172f3473 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java @@ -26,13 +26,12 @@ import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.ReduceOperator; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.operators.translation.JavaPlan; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.optimizer.util.CompilerTestBase; -import org.apache.flink.types.IntValue; import org.apache.flink.util.Collector; import org.apache.flink.util.Visitor; import org.junit.Assert; @@ -45,37 +44,26 @@ import org.apache.flink.optimizer.plan.PlanNode; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; -import org.apache.flink.optimizer.util.DummyInputFormat; -import org.apache.flink.optimizer.util.DummyOutputFormat; -import org.apache.flink.optimizer.util.IdentityReduce; -@SuppressWarnings({"serial", "deprecation"}) +@SuppressWarnings({"serial"}) public class UnionPropertyPropagationTest extends CompilerTestBase { - @SuppressWarnings("unchecked") @Test - public void testUnionPropertyOldApiPropagation() { + public void testUnion1() { // construct the plan + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + DataSet sourceA = env.generateSequence(0,1); + DataSet sourceB = env.generateSequence(0,1); - FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE); - FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE); - - ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0) - .input(sourceA) - .build(); - ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0) - .input(sourceB) - .build(); - - ReduceOperator globalRed = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).build(); - globalRed.addInput(redA); - globalRed.addInput(redB); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, globalRed); - - // return the plan - Plan plan = new Plan(sink, "Union Property Propagation"); + DataSet redA = sourceA.groupBy("*").reduceGroup(new IdentityGroupReducer()); + DataSet redB = sourceB.groupBy("*").reduceGroup(new IdentityGroupReducer()); + + redA.union(redB).groupBy("*").reduceGroup(new IdentityGroupReducer()) + .output(new DiscardingOutputFormat()); + + JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileNoStats(plan); @@ -88,7 +76,7 @@ public void testUnionPropertyOldApiPropagation() { @Override public boolean preVisit(PlanNode visitable) { - if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof ReduceOperator) { + if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof GroupReduceOperatorBase) { for (Channel inConn : visitable.getInputs()) { Assert.assertTrue("Reduce should just forward the input if it is already partitioned", inConn.getShipStrategy() == ShipStrategyType.FORWARD); @@ -107,7 +95,7 @@ public void postVisit(PlanNode visitable) { } @Test - public void testUnionNewApiAssembly() { + public void testUnion2() { final int NUM_INPUTS = 4; // construct the plan it will be multiple flat maps, all unioned diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java index e81e0ec0859a4..65dd2b393ace9 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java @@ -21,6 +21,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.common.Plan; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.optimizer.util.CompilerTestBase; @@ -40,8 +41,8 @@ public void testUnionReplacement() { DataSet union = input1.union(input2); - union.print(); - union.print(); + union.output(new DiscardingOutputFormat()); + union.output(new DiscardingOutputFormat()); Plan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileNoStats(plan); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java index 321ca5aefd8f2..32bd6e9dfaaf9 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java @@ -24,11 +24,13 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.optimizer.testfunctions.IdentityMapper; import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; @@ -40,12 +42,13 @@ public void testWorksetIterationNotDependingOnSolutionSet() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet> input = env.generateSequence(1, 100).map(new Duplicator()); + DataSet> input = env.readCsvFile("/tmp/some.csv").types(Long.class, Long.class); DeltaIteration, Tuple2> iteration = input.iterateDelta(input, 100, 1); - DataSet> iterEnd = iteration.getWorkset().map(new TestMapper>()); - iteration.closeWith(iterEnd, iterEnd).print(); + DataSet> iterEnd = iteration.getWorkset().map(new IdentityMapper>()); + iteration.closeWith(iterEnd, iterEnd) + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -61,18 +64,5 @@ public void testWorksetIterationNotDependingOnSolutionSet() { fail(e.getMessage()); } } - - private static final class Duplicator implements MapFunction> { - @Override - public Tuple2 map(T value) { - return new Tuple2(value, value); - } - } - - private static final class TestMapper implements MapFunction { - @Override - public T map(T value) { - return value; - } - } + } diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java index 27f367f125343..46b935729849f 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java @@ -25,25 +25,21 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.api.java.record.operators.DeltaIteration; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.JoinOperator; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.operators.DeltaIteration; +import org.apache.flink.api.java.operators.JoinOperator; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; -import org.apache.flink.optimizer.util.DummyInputFormat; -import org.apache.flink.optimizer.util.DummyMatchStub; -import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub; -import org.apache.flink.optimizer.util.DummyOutputFormat; -import org.apache.flink.optimizer.util.IdentityMap; -import org.apache.flink.optimizer.util.IdentityReduce; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityJoiner; +import org.apache.flink.optimizer.testfunctions.IdentityMapper; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.optimizer.util.CompilerTestBase; -import org.apache.flink.types.LongValue; import org.junit.Test; @@ -51,7 +47,6 @@ * Tests that validate optimizer choices when using operators that are requesting certain specific execution * strategies. */ -@SuppressWarnings("deprecation") public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase { private static final long serialVersionUID = 1L; @@ -66,7 +61,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase { @Test public void testRecordApiWithDeferredSoltionSetUpdateWithMapper() { - Plan plan = getRecordTestPlan(false, true); + Plan plan = getTestPlan(false, true); OptimizedPlan oPlan; try { @@ -112,7 +107,7 @@ public void testRecordApiWithDeferredSoltionSetUpdateWithMapper() { @Test public void testRecordApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() { - Plan plan = getRecordTestPlan(false, false); + Plan plan = getTestPlan(false, false); OptimizedPlan oPlan; try { @@ -156,7 +151,7 @@ public void testRecordApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() { @Test public void testRecordApiWithDirectSoltionSetUpdate() { - Plan plan = getRecordTestPlan(true, false); + Plan plan = getTestPlan(true, false); OptimizedPlan oPlan; try { @@ -197,52 +192,45 @@ public void testRecordApiWithDirectSoltionSetUpdate() { new JobGraphGenerator().compileJobGraph(oPlan); } - private Plan getRecordTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) { - FileDataSource solutionSetInput = new FileDataSource(new DummyInputFormat(), IN_FILE, "Solution Set"); - FileDataSource worksetInput = new FileDataSource(new DummyInputFormat(), IN_FILE, "Workset"); - - FileDataSource invariantInput = new FileDataSource(new DummyInputFormat(), IN_FILE, "Invariant Input"); - - DeltaIteration iteration = new DeltaIteration(0, ITERATION_NAME); - iteration.setInitialSolutionSet(solutionSetInput); - iteration.setInitialWorkset(worksetInput); - iteration.setMaximumNumberOfIterations(100); - - JoinOperator joinWithInvariant = JoinOperator.builder(new DummyMatchStub(), LongValue.class, 0, 0) - .input1(iteration.getWorkset()) - .input2(invariantInput) - .name(JOIN_WITH_INVARIANT_NAME) - .build(); - - JoinOperator joinWithSolutionSet = JoinOperator.builder( - joinPreservesSolutionSet ? new DummyMatchStub() : new DummyNonPreservingMatchStub(), LongValue.class, 0, 0) - .input1(iteration.getSolutionSet()) - .input2(joinWithInvariant) - .name(JOIN_WITH_SOLUTION_SET) - .build(); - - ReduceOperator nextWorkset = ReduceOperator.builder(new IdentityReduce(), LongValue.class, 0) - .input(joinWithSolutionSet) - .name(NEXT_WORKSET_REDUCER_NAME) - .build(); - - if (mapBeforeSolutionDelta) { - MapOperator mapper = MapOperator.builder(new IdentityMap()) - .input(joinWithSolutionSet) - .name(SOLUTION_DELTA_MAPPER_NAME) - .build(); - iteration.setSolutionSetDelta(mapper); - } else { - iteration.setSolutionSetDelta(joinWithSolutionSet); + private Plan getTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) { + + // construct the plan + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + DataSet> solSetInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Solution Set"); + DataSet> workSetInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Workset"); + DataSet> invariantInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Invariant Input"); + + DeltaIteration, Tuple2> deltaIt = solSetInput.iterateDelta(workSetInput, 100, 0).name(ITERATION_NAME); + + DataSet> join1 = deltaIt.getWorkset().join(invariantInput).where(0).equalTo(0) + .with(new IdentityJoiner>()) + .withForwardedFieldsFirst("*").name(JOIN_WITH_INVARIANT_NAME); + + DataSet> join2 = deltaIt.getSolutionSet().join(join1).where(0).equalTo(0) + .with(new IdentityJoiner>()) + .name(JOIN_WITH_SOLUTION_SET); + if(joinPreservesSolutionSet) { + ((JoinOperator)join2).withForwardedFieldsFirst("*"); } - - iteration.setNextWorkset(nextWorkset); - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, iteration, "Sink"); - - Plan plan = new Plan(sink); - plan.setDefaultParallelism(DEFAULT_PARALLELISM); - return plan; + DataSet> nextWorkset = join2.groupBy(0).reduceGroup(new IdentityGroupReducer>()) + .withForwardedFields("*").name(NEXT_WORKSET_REDUCER_NAME); + + if(mapBeforeSolutionDelta) { + + DataSet> mapper = join2.map(new IdentityMapper>()) + .withForwardedFields("*").name(SOLUTION_DELTA_MAPPER_NAME); + + deltaIt.closeWith(mapper, nextWorkset) + .output(new DiscardingOutputFormat>()); + } + else { + deltaIt.closeWith(join2, nextWorkset) + .output(new DiscardingOutputFormat>()); + } + + return env.createProgramPlan(); } } diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java index d52181d732679..346e70233fffd 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java @@ -33,7 +33,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable; import org.apache.flink.optimizer.testfunctions.IdentityMapper; import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; @@ -246,7 +246,7 @@ public void testIncompatibleHashAndCustomPartitioning() { .distinct(0, 1) .groupBy(1) .sortGroup(0, Order.ASCENDING) - .reduceGroup(new IdentityGroupReducer>()).withForwardedFields("0", "1"); + .reduceGroup(new IdentityGroupReducerCombinable>()).withForwardedFields("0", "1"); grouped .coGroup(partitioned).where(0).equalTo(0) diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java index 5758c86872399..17a76590450bd 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java @@ -32,7 +32,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable; import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.junit.Test; @@ -57,7 +57,7 @@ public void testJoinReduceCombination() { .withPartitioner(partitioner); joined.groupBy(1).withPartitioner(partitioner) - .reduceGroup(new IdentityGroupReducer>()) + .reduceGroup(new IdentityGroupReducerCombinable>()) .print(); Plan p = env.createProgramPlan(); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java index 0408ca9642384..23f4812f78ac9 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java @@ -34,7 +34,7 @@ import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.testfunctions.DummyReducer; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable; import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.junit.Test; @@ -84,7 +84,7 @@ public void testCustomPartitioningKeySelectorGroupReduce() { data.groupBy(new TestKeySelector>()) .withPartitioner(new TestPartitionerInt()) - .reduceGroup(new IdentityGroupReducer>()) + .reduceGroup(new IdentityGroupReducerCombinable>()) .print(); Plan p = env.createProgramPlan(); @@ -115,7 +115,7 @@ public void testCustomPartitioningKeySelectorGroupReduceSorted() { data.groupBy(new TestKeySelector>()) .withPartitioner(new TestPartitionerInt()) .sortGroup(new TestKeySelector>(), Order.ASCENDING) - .reduceGroup(new IdentityGroupReducer>()) + .reduceGroup(new IdentityGroupReducerCombinable>()) .print(); Plan p = env.createProgramPlan(); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java index 74e5c8cc7e750..54033ac6becef 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java @@ -30,7 +30,7 @@ import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.testfunctions.DummyReducer; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable; import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.junit.Test; @@ -76,7 +76,7 @@ public void testCustomPartitioningTupleGroupReduce() { .rebalance().setParallelism(4); data.groupBy("a").withPartitioner(new TestPartitionerInt()) - .reduceGroup(new IdentityGroupReducer()) + .reduceGroup(new IdentityGroupReducerCombinable()) .print(); Plan p = env.createProgramPlan(); @@ -106,7 +106,7 @@ public void testCustomPartitioningTupleGroupReduceSorted() { data.groupBy("a").withPartitioner(new TestPartitionerInt()) .sortGroup("b", Order.ASCENDING) - .reduceGroup(new IdentityGroupReducer()) + .reduceGroup(new IdentityGroupReducerCombinable()) .print(); Plan p = env.createProgramPlan(); @@ -137,7 +137,7 @@ public void testCustomPartitioningTupleGroupReduceSorted2() { data.groupBy("a").withPartitioner(new TestPartitionerInt()) .sortGroup("b", Order.ASCENDING) .sortGroup("c", Order.DESCENDING) - .reduceGroup(new IdentityGroupReducer()) + .reduceGroup(new IdentityGroupReducerCombinable()) .print(); Plan p = env.createProgramPlan(); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java index 72fb81bcd1636..49f44f55a74de 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java @@ -33,7 +33,7 @@ import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.testfunctions.DummyReducer; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable; import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.junit.Test; @@ -108,7 +108,7 @@ public void testCustomPartitioningTupleGroupReduce() { .rebalance().setParallelism(4); data.groupBy(0).withPartitioner(new TestPartitionerInt()) - .reduceGroup(new IdentityGroupReducer>()) + .reduceGroup(new IdentityGroupReducerCombinable>()) .print(); Plan p = env.createProgramPlan(); @@ -138,7 +138,7 @@ public void testCustomPartitioningTupleGroupReduceSorted() { data.groupBy(0).withPartitioner(new TestPartitionerInt()) .sortGroup(1, Order.ASCENDING) - .reduceGroup(new IdentityGroupReducer>()) + .reduceGroup(new IdentityGroupReducerCombinable>()) .print(); Plan p = env.createProgramPlan(); @@ -169,7 +169,7 @@ public void testCustomPartitioningTupleGroupReduceSorted2() { data.groupBy(0).withPartitioner(new TestPartitionerInt()) .sortGroup(1, Order.ASCENDING) .sortGroup(2, Order.DESCENDING) - .reduceGroup(new IdentityGroupReducer>()) + .reduceGroup(new IdentityGroupReducerCombinable>()) .print(); Plan p = env.createProgramPlan(); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java index 8eedee1e3660f..ff429b85439f5 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java @@ -34,7 +34,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable; import org.apache.flink.optimizer.testfunctions.IdentityMapper; import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; @@ -243,7 +243,7 @@ public void testIncompatibleHashAndCustomPartitioning() { .distinct(0, 1) .groupBy(1) .sortGroup(0, Order.ASCENDING) - .reduceGroup(new IdentityGroupReducer>()).withForwardedFields("0", "1"); + .reduceGroup(new IdentityGroupReducerCombinable>()).withForwardedFields("0", "1"); grouped .join(partitioned, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0) diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java index 95ee4de8da80f..9c2d0d2933f11 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java @@ -30,7 +30,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable; import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.junit.Test; @@ -49,7 +49,7 @@ public void testPartitionOperatorPreservesFields() { public int partition(Long key, int numPartitions) { return key.intValue(); } }, 1) .groupBy(1) - .reduceGroup(new IdentityGroupReducer>()) + .reduceGroup(new IdentityGroupReducerCombinable>()) .print(); Plan p = env.createProgramPlan(); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java similarity index 69% rename from flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java rename to flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java index 444b48e0c2cb9..9d8ac2e264b4d 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java @@ -16,20 +16,15 @@ * limitations under the License. */ -package org.apache.flink.optimizer.util; +package org.apache.flink.optimizer.testfunctions; -import java.io.Serializable; - -import org.apache.flink.api.java.record.functions.JoinFunction; -import org.apache.flink.types.Record; +import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.util.Collector; -@SuppressWarnings("deprecation") -public class DummyNonPreservingMatchStub extends JoinFunction implements Serializable { +public class IdentityCoGrouper implements CoGroupFunction { + private static final long serialVersionUID = 1L; @Override - public void join(Record value1, Record value2, Collector out) throws Exception { - out.collect(value1); - } + public void coGroup(Iterable first, Iterable second, Collector out) {} } diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java similarity index 75% rename from flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java rename to flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java index 8ee2285db1a02..54b2785f4b1e9 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java @@ -16,17 +16,17 @@ * limitations under the License. */ -package org.apache.flink.optimizer.util; -import org.apache.flink.api.java.record.functions.CrossFunction; -import org.apache.flink.types.Record; +package org.apache.flink.optimizer.testfunctions; + +import org.apache.flink.api.common.functions.CrossFunction; + +public class IdentityCrosser implements CrossFunction { -@SuppressWarnings("deprecation") -public class DummyCrossStub extends CrossFunction { private static final long serialVersionUID = 1L; @Override - public Record cross(Record first, Record second) throws Exception { + public T cross(T first, T second) { return first; } } diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java index 11fd044a01705..da4ef17f798b1 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java @@ -23,8 +23,6 @@ import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; import org.apache.flink.util.Collector; - -@Combinable public class IdentityGroupReducer extends RichGroupReduceFunction { private static final long serialVersionUID = 1L; diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java similarity index 64% rename from flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java rename to flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java index cccc6cbac0c91..ce24bb63ba37f 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java @@ -16,22 +16,22 @@ * limitations under the License. */ -package org.apache.flink.optimizer.util; +package org.apache.flink.optimizer.testfunctions; -import java.io.Serializable; -import org.apache.flink.api.java.record.functions.MapFunction; -import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept; -import org.apache.flink.types.Record; +import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; import org.apache.flink.util.Collector; -@SuppressWarnings("deprecation") -@ConstantFieldsExcept({}) -public final class IdentityMap extends MapFunction implements Serializable { +@Combinable +public class IdentityGroupReducerCombinable extends RichGroupReduceFunction { + private static final long serialVersionUID = 1L; - + @Override - public void map(Record record, Collector out) throws Exception { - out.collect(record); + public void reduce(Iterable values, Collector out) { + for (T next : values) { + out.collect(next); + } } } diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java similarity index 74% rename from flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java rename to flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java index 1bbe24ce20179..faca2cecae12b 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java @@ -17,18 +17,16 @@ */ -package org.apache.flink.optimizer.util; +package org.apache.flink.optimizer.testfunctions; +import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.java.record.io.DelimitedOutputFormat; -import org.apache.flink.types.Record; +public class IdentityJoiner implements JoinFunction { - -public final class DummyOutputFormat extends DelimitedOutputFormat { private static final long serialVersionUID = 1L; - + @Override - public int serializeRecord(Record rec, byte[] target) throws Exception { - return 0; + public T join(T first, T second) { + return first; } } diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java deleted file mode 100644 index 6a84c44eef190..0000000000000 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.util; - -import java.io.Serializable; -import java.util.Iterator; - -import org.apache.flink.api.java.record.functions.CoGroupFunction; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; - -@SuppressWarnings("deprecation") -public class DummyCoGroupStub extends CoGroupFunction implements Serializable { - private static final long serialVersionUID = 1L; - - @Override - public void coGroup(Iterator records1, Iterator records2, Collector out) { - while (records1.hasNext()) { - out.collect(records1.next()); - } - - while (records2.hasNext()) { - out.collect(records2.next()); - } - } -} diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java deleted file mode 100644 index 0c816e7c1941f..0000000000000 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.util; - -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.api.java.record.io.DelimitedInputFormat; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; - -public final class DummyInputFormat extends DelimitedInputFormat { - private static final long serialVersionUID = 1L; - - private final IntValue integer = new IntValue(1); - - @Override - public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) { - target.setField(0, this.integer); - target.setField(1, this.integer); - return target; - } - - @Override - public FileBaseStatistics getStatistics(BaseStatistics cachedStatistics) { - return (cachedStatistics instanceof FileBaseStatistics) ? (FileBaseStatistics) cachedStatistics : null; - } -} diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java deleted file mode 100644 index d00be6e71e24f..0000000000000 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.util; - -import java.io.Serializable; - -import org.apache.flink.api.java.record.functions.JoinFunction; -import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirstExcept; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; - -@SuppressWarnings("deprecation") -@ConstantFieldsFirstExcept({}) -public class DummyMatchStub extends JoinFunction implements Serializable { - private static final long serialVersionUID = 1L; - - @Override - public void join(Record value1, Record value2, Collector out) throws Exception { - out.collect(value1); - } -} diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java deleted file mode 100644 index f45745d536e5c..0000000000000 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.util; - -import java.io.Serializable; -import java.util.Iterator; - -import org.apache.flink.api.java.record.functions.ReduceFunction; -import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; - -@SuppressWarnings("deprecation") -@ConstantFieldsExcept({}) -public final class IdentityReduce extends ReduceFunction implements Serializable { - private static final long serialVersionUID = 1L; - - @Override - public void reduce(Iterator records, Collector out) throws Exception { - while (records.hasNext()) { - out.collect(records.next()); - } - } -} diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java index 920b713b12823..fe0a533ad5f20 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java @@ -29,14 +29,13 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.operators.Operator; -import org.apache.flink.api.java.record.operators.BulkIteration; -import org.apache.flink.api.java.record.operators.DeltaIteration; +import org.apache.flink.api.common.operators.base.BulkIterationBase; +import org.apache.flink.api.common.operators.base.DeltaIterationBase; import org.apache.flink.util.Visitor; /** * Utility to get operator instances from plans via name. */ -@SuppressWarnings("deprecation") public class OperatorResolver implements Visitor> { private final Map>> map; @@ -109,11 +108,11 @@ public boolean preVisit(Operator visitable) { list.add(visitable); // recurse into bulk iterations - if (visitable instanceof BulkIteration) { - ((BulkIteration) visitable).getNextPartialSolution().accept(this); - } else if (visitable instanceof DeltaIteration) { - ((DeltaIteration) visitable).getSolutionSetDelta().accept(this); - ((DeltaIteration) visitable).getNextWorkset().accept(this); + if (visitable instanceof BulkIterationBase) { + ((BulkIterationBase) visitable).getNextPartialSolution().accept(this); + } else if (visitable instanceof DeltaIterationBase) { + ((DeltaIterationBase) visitable).getSolutionSetDelta().accept(this); + ((DeltaIterationBase) visitable).getNextWorkset().accept(this); } return true;