From 664a3b3270bd427b8d449b8abf551d1da5e151fd Mon Sep 17 00:00:00 2001 From: kl0u Date: Mon, 16 Jan 2017 15:27:38 +0100 Subject: [PATCH] [FLINK-5450] Fix restore from legacy log message --- .../operators/windowing/WindowOperator.java | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 628d66319ccde..5ed5a4e295f9a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -804,9 +804,6 @@ public void restoreState(FSDataInputStream in) throws Exception { public void registerRestoredLegacyStateState() throws Exception { - LOG.info("{} (taskIdx={}) re-registering state from an older Flink version.", - getClass().getSimpleName(), legacyWindowOperatorType, getRuntimeContext().getIndexOfThisSubtask()); - switch (legacyWindowOperatorType) { case NONE: reregisterStateFromLegacyWindowOperator(); @@ -983,14 +980,22 @@ public void reregisterStateFromLegacyWindowOperator() { // if we restore from an older version, // we have to re-register the recovered state. - if (restoredFromLegacyEventTimeTimers != null) { + if (restoredFromLegacyEventTimeTimers != null && !restoredFromLegacyEventTimeTimers.isEmpty()) { + + LOG.info("{} (taskIdx={}) re-registering event-time timers from an older Flink version.", + getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask()); + for (Timer timer : restoredFromLegacyEventTimeTimers) { setCurrentKey(timer.key); internalTimerService.registerEventTimeTimer(timer.window, timer.timestamp); } } - if (restoredFromLegacyProcessingTimeTimers != null) { + if (restoredFromLegacyProcessingTimeTimers != null && !restoredFromLegacyProcessingTimeTimers.isEmpty()) { + + LOG.info("{} (taskIdx={}) re-registering processing-time timers from an older Flink version.", + getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask()); + for (Timer timer : restoredFromLegacyProcessingTimeTimers) { setCurrentKey(timer.key); internalTimerService.registerProcessingTimeTimer(timer.window, timer.timestamp); @@ -1003,16 +1008,20 @@ public void reregisterStateFromLegacyWindowOperator() { } public void reregisterStateFromLegacyAlignedWindowOperator() throws Exception { - if (restoredFromLegacyAlignedOpRecords != null) { + if (restoredFromLegacyAlignedOpRecords != null && !restoredFromLegacyAlignedOpRecords.isEmpty()) { + + LOG.info("{} (taskIdx={}) re-registering timers from legacy {} from an older Flink version.", + getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), legacyWindowOperatorType); + while (!restoredFromLegacyAlignedOpRecords.isEmpty()) { StreamRecord record = restoredFromLegacyAlignedOpRecords.poll(); setCurrentKey(keySelector.getKey(record.getValue())); processElement(record); } - - // gc friendliness - restoredFromLegacyAlignedOpRecords = null; } + + // gc friendliness + restoredFromLegacyAlignedOpRecords = null; } // ------------------------------------------------------------------------