From 88929dcd989c34c3232f29a96af1b32ec1315911 Mon Sep 17 00:00:00 2001 From: Joshi Date: Wed, 1 Jun 2016 13:56:52 -0700 Subject: [PATCH 1/2] [FLINK-4000] Checkpoint dictionaries null after taskmgr failures --- .../api/functions/source/MessageAcknowledgingSourceBase.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java index d3cbfb6f4fff5..3cb2301f8d54b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java @@ -181,7 +181,10 @@ public void restoreState(SerializedCheckpointData[] state) throws Exception { // build a set which contains all processed ids. It may be used to check if we have // already processed an incoming message. for (Tuple2> checkpoint : pendingCheckpoints) { - idsProcessedButNotAcknowledged.addAll(checkpoint.f1); + // FLINK-4000: On Job restart triggered by taskmgr failure, the checkpoint dictionaries may be null. + if(checkpoint != null && checkpoint.f1 != null) { + idsProcessedButNotAcknowledged.addAll(checkpoint.f1); + } } } From 570b6e489cd4d9a20cbf1e3769df7aca463c75ce Mon Sep 17 00:00:00 2001 From: Joshi Date: Wed, 1 Jun 2016 14:58:44 -0700 Subject: [PATCH 2/2] [FLINK-4000] Checkpoint dictionaries null after taskmgr failures --- .../functions/source/MessageAcknowledgingSourceBase.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java index 3cb2301f8d54b..30960812ec228 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java @@ -181,10 +181,10 @@ public void restoreState(SerializedCheckpointData[] state) throws Exception { // build a set which contains all processed ids. It may be used to check if we have // already processed an incoming message. for (Tuple2> checkpoint : pendingCheckpoints) { - // FLINK-4000: On Job restart triggered by taskmgr failure, the checkpoint dictionaries may be null. - if(checkpoint != null && checkpoint.f1 != null) { - idsProcessedButNotAcknowledged.addAll(checkpoint.f1); + if(idsProcessedButNotAcknowledged == null) { + this.open(new Configuration()); // taskmgr failure job restart } + idsProcessedButNotAcknowledged.addAll(checkpoint.f1); } }