From a1e1d815989eec88c66c653a3f02e04a83848bc9 Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Fri, 17 May 2019 15:08:17 +0200 Subject: [PATCH] [hotfix][operator] Reorder if conditions in StreamInputProcessor Make the most commonly expected condition the first one. --- .../runtime/io/StreamInputProcessor.java | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index ab93bd65ce88d..913116627dc42 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -137,7 +137,16 @@ public boolean processInput() throws Exception { } private void processElement(StreamElement recordOrMark, int channel) throws Exception { - if (recordOrMark.isWatermark()) { + if (recordOrMark.isRecord()) { + // now we can do the actual processing + StreamRecord record = recordOrMark.asRecord(); + synchronized (lock) { + numRecordsIn.inc(); + streamOperator.setKeyContextElement1(record); + streamOperator.processElement(record); + } + } + else if (recordOrMark.isWatermark()) { // handle watermark statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), channel); } else if (recordOrMark.isStreamStatus()) { @@ -149,14 +158,9 @@ private void processElement(StreamElement recordOrMark, int channel) throws Exce streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker()); } } else { - // now we can do the actual processing - StreamRecord record = recordOrMark.asRecord(); - synchronized (lock) { - numRecordsIn.inc(); - streamOperator.setKeyContextElement1(record); - streamOperator.processElement(record); - } - }} + throw new UnsupportedOperationException("Unknown type of StreamElement"); + } + } private void initializeNumRecordsIn() { if (numRecordsIn == null) {