From 2021f4790715ecb762dbb23438bf0b2b2755845e Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 25 Aug 2016 11:11:44 -0700 Subject: [PATCH 1/3] FLINK-4482 numUnsuccessfulCheckpointsTriggers is accessed without holding triggerLock --- .../checkpoint/CheckpointCoordinator.java | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 3619f482de52f..d9b8824f96628 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -908,25 +908,27 @@ public void startCheckpointScheduler() { } public void stopCheckpointScheduler() { - synchronized (lock) { - triggerRequestQueued = false; - periodicScheduling = false; + synchronized (triggerLock) { + synchronized (lock) { + triggerRequestQueued = false; + periodicScheduling = false; - if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(); - currentPeriodicTrigger = null; - } + if (currentPeriodicTrigger != null) { + currentPeriodicTrigger.cancel(); + currentPeriodicTrigger = null; + } - for (PendingCheckpoint p : pendingCheckpoints.values()) { - try { - p.abortError(new Exception("Checkpoint Coordinator is suspending.")); - } catch (Throwable t) { - LOG.error("Error while disposing pending checkpoint", t); + for (PendingCheckpoint p : pendingCheckpoints.values()) { + try { + p.abortError(new Exception("Checkpoint Coordinator is suspending.")); + } catch (Throwable t) { + LOG.error("Error while disposing pending checkpoint", t); + } } - } - pendingCheckpoints.clear(); - numUnsuccessfulCheckpointsTriggers = 0; + pendingCheckpoints.clear(); + numUnsuccessfulCheckpointsTriggers = 0; + } } } From 8fb878e3802d49c3d13eff53de3385003962a46c Mon Sep 17 00:00:00 2001 From: tedyu Date: Wed, 31 Aug 2016 06:47:27 -0700 Subject: [PATCH 2/3] Declare numUnsuccessfulCheckpointsTriggers as AtomicInteger --- .../checkpoint/CheckpointCoordinator.java | 41 +++++++++---------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index d9b8824f96628..ffc5d5f21edfa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -53,6 +53,7 @@ import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -135,7 +136,7 @@ public class CheckpointCoordinator { private JobStatusListener jobStatusListener; /** The number of consecutive failed trigger attempts */ - private int numUnsuccessfulCheckpointsTriggers; + private AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0); private ScheduledTrigger currentPeriodicTrigger; @@ -401,7 +402,7 @@ CheckpointTriggerResult triggerCheckpoint(long timestamp, CheckpointProperties p checkpointID = checkpointIdCounter.getAndIncrement(); } catch (Throwable t) { - int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers; + int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.addAndGet(1); LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t); return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); } @@ -494,7 +495,7 @@ else if (!props.isSavepoint()) { tasksToTrigger[i].sendMessageToCurrentExecution(message, id); } - numUnsuccessfulCheckpointsTriggers = 0; + numUnsuccessfulCheckpointsTriggers.set(0); return new CheckpointTriggerResult(checkpoint); } catch (Throwable t) { @@ -503,7 +504,7 @@ else if (!props.isSavepoint()) { pendingCheckpoints.remove(checkpointID); } - int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers; + int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.addAndGet(1); LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t); if (!checkpoint.isDiscarded()) { @@ -908,27 +909,25 @@ public void startCheckpointScheduler() { } public void stopCheckpointScheduler() { - synchronized (triggerLock) { - synchronized (lock) { - triggerRequestQueued = false; - periodicScheduling = false; + synchronized (lock) { + triggerRequestQueued = false; + periodicScheduling = false; - if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(); - currentPeriodicTrigger = null; - } + if (currentPeriodicTrigger != null) { + currentPeriodicTrigger.cancel(); + currentPeriodicTrigger = null; + } - for (PendingCheckpoint p : pendingCheckpoints.values()) { - try { - p.abortError(new Exception("Checkpoint Coordinator is suspending.")); - } catch (Throwable t) { - LOG.error("Error while disposing pending checkpoint", t); - } + for (PendingCheckpoint p : pendingCheckpoints.values()) { + try { + p.abortError(new Exception("Checkpoint Coordinator is suspending.")); + } catch (Throwable t) { + LOG.error("Error while disposing pending checkpoint", t); } - - pendingCheckpoints.clear(); - numUnsuccessfulCheckpointsTriggers = 0; } + + pendingCheckpoints.clear(); + numUnsuccessfulCheckpointsTriggers.set(0); } } From aa320e0ddd74a1191189ae6be0e748a511bd1643 Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 1 Sep 2016 05:09:13 -0700 Subject: [PATCH 3/3] Address Stephan's review comments --- .../flink/runtime/checkpoint/CheckpointCoordinator.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index ffc5d5f21edfa..f819bffd0c0c0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -136,7 +136,7 @@ public class CheckpointCoordinator { private JobStatusListener jobStatusListener; /** The number of consecutive failed trigger attempts */ - private AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0); + private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0); private ScheduledTrigger currentPeriodicTrigger; @@ -402,7 +402,7 @@ CheckpointTriggerResult triggerCheckpoint(long timestamp, CheckpointProperties p checkpointID = checkpointIdCounter.getAndIncrement(); } catch (Throwable t) { - int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.addAndGet(1); + int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet(); LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t); return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); } @@ -504,7 +504,7 @@ else if (!props.isSavepoint()) { pendingCheckpoints.remove(checkpointID); } - int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.addAndGet(1); + int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet(); LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t); if (!checkpoint.isDiscarded()) {