Skip to content

Commit

Permalink
[hotfix][operator] Reorder if conditions in StreamInputProcessor
Browse files Browse the repository at this point in the history
Make the most commonly expected condition the first one.
  • Loading branch information
pnowojski committed May 17, 2019
1 parent 010a23f commit aa5a82a
Showing 1 changed file with 13 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,16 @@ public boolean processInput() throws Exception {
}

private void processElement(StreamElement recordOrMark) throws Exception {
if (recordOrMark.isWatermark()) {
if (recordOrMark.isRecord()) {
// now we can do the actual processing
StreamRecord<IN> record = recordOrMark.asRecord();
synchronized (lock) {
numRecordsIn.inc();
streamOperator.setKeyContextElement1(record);
streamOperator.processElement(record);
}
}
else if (recordOrMark.isWatermark()) {
// handle watermark
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
} else if (recordOrMark.isStreamStatus()) {
Expand All @@ -154,14 +163,9 @@ private void processElement(StreamElement recordOrMark) throws Exception {
streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
}
} else {
// now we can do the actual processing
StreamRecord<IN> record = recordOrMark.asRecord();
synchronized (lock) {
numRecordsIn.inc();
streamOperator.setKeyContextElement1(record);
streamOperator.processElement(record);
}
}}
throw new UnsupportedOperationException("Unknown type of StreamElement");
}
}

private void initializeNumRecordsIn() {
if (numRecordsIn == null) {
Expand Down

0 comments on commit aa5a82a

Please sign in to comment.