Skip to content

Commit

Permalink
[hotfix][operator] Reorder if conditions in StreamInputProcessor
Browse files Browse the repository at this point in the history
Make the most commonly expected condition the first one.
  • Loading branch information
pnowojski committed May 27, 2019
1 parent 6570920 commit a1e1d81
Showing 1 changed file with 13 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,16 @@ public boolean processInput() throws Exception {
}

private void processElement(StreamElement recordOrMark, int channel) throws Exception {
if (recordOrMark.isWatermark()) {
if (recordOrMark.isRecord()) {
// now we can do the actual processing
StreamRecord<IN> record = recordOrMark.asRecord();
synchronized (lock) {
numRecordsIn.inc();
streamOperator.setKeyContextElement1(record);
streamOperator.processElement(record);
}
}
else if (recordOrMark.isWatermark()) {
// handle watermark
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), channel);
} else if (recordOrMark.isStreamStatus()) {
Expand All @@ -149,14 +158,9 @@ private void processElement(StreamElement recordOrMark, int channel) throws Exce
streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
}
} else {
// now we can do the actual processing
StreamRecord<IN> record = recordOrMark.asRecord();
synchronized (lock) {
numRecordsIn.inc();
streamOperator.setKeyContextElement1(record);
streamOperator.processElement(record);
}
}}
throw new UnsupportedOperationException("Unknown type of StreamElement");
}
}

private void initializeNumRecordsIn() {
if (numRecordsIn == null) {
Expand Down

0 comments on commit a1e1d81

Please sign in to comment.