From bfd038d13b73816adb4dab1387b1f75ecf2f2bfc Mon Sep 17 00:00:00 2001 From: zentol Date: Thu, 5 Jan 2017 14:37:03 +0100 Subject: [PATCH] [FLINK-5380] Fix task metrics reuse for single-operator chains --- .../api/graph/StreamingJobGraphGenerator.java | 6 +-- .../graph/StreamingJobGraphGeneratorTest.java | 38 +++++++++++++++++++ 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index da69b49756f53..9921bcb16b7db 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -234,9 +234,9 @@ private List createChain( config.setChainIndex(chainIndex); config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName()); chainedConfigs.get(startNodeId).put(currentNodeId, config); - if (chainableOutputs.isEmpty()) { - config.setChainEnd(); - } + } + if (chainableOutputs.isEmpty()) { + config.setChainEnd(); } return transitiveOutEdges; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index b817c9326dfd9..4d462d096c158 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -31,11 +32,13 @@ import org.junit.Test; import java.io.IOException; +import java.util.Map; import java.util.Random; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; @SuppressWarnings("serial") public class StreamingJobGraphGeneratorTest extends TestLogger { @@ -170,4 +173,39 @@ public void testDisabledCheckpointing() throws Exception { JobSnapshottingSettings snapshottingSettings = jobGraph.getSnapshotSettings(); assertEquals(Long.MAX_VALUE, snapshottingSettings.getCheckpointInterval()); } + + /** + * Verifies that the chain start/end is correctly set. + */ + @Test + public void testChainStartEndSetting() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // fromElements -> CHAIN(Map -> Print) + env.fromElements(1, 2, 3) + .map(new MapFunction() { + @Override + public Integer map(Integer value) throws Exception { + return value; + } + }) + .print(); + JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph(); + + JobVertex sourceVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0); + JobVertex mapPrintVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1); + + StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration()); + StreamConfig mapConfig = new StreamConfig(mapPrintVertex.getConfiguration()); + Map chainedConfigs = mapConfig.getTransitiveChainedTaskConfigs(getClass().getClassLoader()); + StreamConfig printConfig = chainedConfigs.get(3); + + assertTrue(sourceConfig.isChainStart()); + assertTrue(sourceConfig.isChainEnd()); + + assertTrue(mapConfig.isChainStart()); + assertFalse(mapConfig.isChainEnd()); + + assertFalse(printConfig.isChainStart()); + assertTrue(printConfig.isChainEnd()); + } }