From c24dcb03eff1259b73a37175911662955f7f3e70 Mon Sep 17 00:00:00 2001 From: Xpray Date: Mon, 23 Apr 2018 15:37:39 +0800 Subject: [PATCH] [FLINK-9216][Streaming] Fix comparator violation --- .../streaming/api/graph/JSONGenerator.java | 12 ++++++---- .../api/graph/StreamGraphGeneratorTest.java | 24 +++++++++++++++++++ 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java index 263e0aabdf197..3f82cf3368d98 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java @@ -61,14 +61,16 @@ public String getJSON() { List operatorIDs = new ArrayList(streamGraph.getVertexIDs()); Collections.sort(operatorIDs, new Comparator() { @Override - public int compare(Integer o1, Integer o2) { + public int compare(Integer idOne, Integer idTwo) { + boolean isIdOneSinkId = streamGraph.getSinkIDs().contains(idOne); + boolean isIdTwoSinkId = streamGraph.getSinkIDs().contains(idTwo); // put sinks at the back - if (streamGraph.getSinkIDs().contains(o1)) { + if (isIdOneSinkId == isIdTwoSinkId) { + return idOne.compareTo(idTwo); + } else if (isIdOneSinkId) { return 1; - } else if (streamGraph.getSinkIDs().contains(o2)) { - return -1; } else { - return o1 - o2; + return -1; } } }); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index 8149d244498b8..51e0f913f9ae3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; @@ -413,6 +414,29 @@ public Integer getKey(Integer value) throws Exception { StreamPartitioner streamPartitioner2 = keyedResultNode.getInEdges().get(1).getPartitioner(); } + /** + * Tests that the json generated by JSONGenerator shall meet with 2 requirements: + * 1. sink nodes are at the back + * 2. if both two nodes are sink nodes or neither of them is sink node, then sort by its id. + */ + @Test + public void itestStreamGraphJsonGeneration() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream source = env.fromElements(1, 2, 3); + for(int i = 0; i < 32; i++) { + if (i % 2 == 0) { + source.addSink(new SinkFunction() { + @Override + public void invoke(Integer value) throws Exception {} + }); + } else { + source.map(x -> x + 1); + } + } + // IllegalArgumentException will be thrown without FLINK-9216 + env.getStreamGraph().getStreamingPlanAsJSON(); + } + private static class OutputTypeConfigurableOperationWithTwoInputs extends AbstractStreamOperator implements TwoInputStreamOperator, OutputTypeConfigurable {