From a0ac8a1b3d1810cb578769ae6753487318a91b58 Mon Sep 17 00:00:00 2001 From: Aleksey Plekhanov Date: Thu, 15 Jul 2021 14:45:40 +0300 Subject: [PATCH 1/3] IGNITE-15099 Fix concurrent heartbeat update while in blocking section for system workers --- .../checkpoint/CheckpointContextImpl.java | 13 +++++- .../internal/util/worker/GridWorker.java | 18 +++++++- .../failure/SystemWorkersBlockingTest.java | 44 +++++++++++++++++++ 3 files changed, 72 insertions(+), 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointContextImpl.java index f019955e07767..ce6cddbe390d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointContextImpl.java @@ -101,7 +101,18 @@ public class CheckpointContextImpl implements CheckpointListener.Context { try { GridFutureAdapter res = new GridFutureAdapter<>(); - res.listen(fut -> heartbeatUpdater.updateHeartbeat()); + res.listen(fut -> { + // In some conditions checkpointer can proceed to waitCheckpointEvent() concurrently before + // execution of this listener. To avoid false-positive failure handler trigger we should update + // heartbeat only if we are currently not in blocking section. + if (heartbeatUpdater.heartbeatTs() < U.currentTimeMillis()) { + synchronized (heartbeatUpdater) { + // Double check under the lock. + if (heartbeatUpdater.heartbeatTs() < U.currentTimeMillis()) + heartbeatUpdater.updateHeartbeat(); + } + } + }); asyncRunner.execute(U.wrapIgniteFuture(cmd, res)); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java index 5926b9ca3552d..615d5062a9439 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java @@ -19,6 +19,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -56,6 +57,10 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher { /** Timestamp to be updated by this worker periodically to indicate it's up and running. */ private volatile long heartbeatTs; + /** Atomic field updater to change heartbeat. */ + private static final AtomicLongFieldUpdater HEARTBEAT_UPDATER = + AtomicLongFieldUpdater.newUpdater(GridWorker.class, "heartbeatTs"); + /** Mutex for finish awaiting. */ private final Object mux = new Object(); @@ -273,7 +278,16 @@ public boolean isDone() { /** {@inheritDoc} */ @Override public void updateHeartbeat() { - heartbeatTs = U.currentTimeMillis(); + long curTs = U.currentTimeMillis(); + long hbTs = heartbeatTs; + + // Avoid heartbeat update while in the blocking section. + while (hbTs < curTs) { + if (HEARTBEAT_UPDATER.compareAndSet(this, hbTs, curTs)) + return; + + hbTs = heartbeatTs; + } } /** {@inheritDoc} */ @@ -283,7 +297,7 @@ public boolean isDone() { /** {@inheritDoc} */ @Override public void blockingSectionEnd() { - updateHeartbeat(); + heartbeatTs = U.currentTimeMillis(); } /** Can be called from {@link #runner()} thread to perform idleness handling. */ diff --git a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java index ccfc50750d6b7..7aff722c056c9 100644 --- a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java @@ -126,6 +126,50 @@ public void testBlockingWorker() throws Exception { e -> LatchingGridWorker.class.getName().equals(e.getClassName()))); } + /** + * @throws Exception If failed. + */ + @Test + public void testBlockingSection() throws Exception { + IgniteEx ignite = startGrid(0); + + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch blockingSectionLatch = new CountDownLatch(1); + + GridWorker worker = new GridWorker(ignite.name(), "test-worker", log) { + @Override protected void body() { + blockingSectionBegin(); + + try { + startLatch.countDown(); + + blockingSectionLatch.await(); + } + catch (Exception ignore) { + // No-op. + } + finally { + blockingSectionEnd(); + } + } + }; + + runWorker(worker); + + ignite.context().workersRegistry().register(worker); + + startLatch.await(); + + // Check that concurrent heartbeat update doesn't affect the blocking section. + worker.updateHeartbeat(); + + Thread.sleep(2 * SYSTEM_WORKER_BLOCKED_TIMEOUT); + + blockingSectionLatch.countDown(); + + assertNull(failureError.get()); + } + /** * Tests that repeatedly calling {@link WorkersRegistry#onIdle} in single registered {@link GridWorker} * doesn't lead to infinite loop. From 0b77ae97884fb879db8db82cf365f4de556d5e4e Mon Sep 17 00:00:00 2001 From: Aleksey Plekhanov Date: Thu, 15 Jul 2021 14:46:14 +0300 Subject: [PATCH 2/3] IGNITE-15099 Fix concurrent heartbeat update while in blocking section for system workers --- .../checkpoint/CheckpointContextImpl.java | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointContextImpl.java index ce6cddbe390d6..f019955e07767 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointContextImpl.java @@ -101,18 +101,7 @@ public class CheckpointContextImpl implements CheckpointListener.Context { try { GridFutureAdapter res = new GridFutureAdapter<>(); - res.listen(fut -> { - // In some conditions checkpointer can proceed to waitCheckpointEvent() concurrently before - // execution of this listener. To avoid false-positive failure handler trigger we should update - // heartbeat only if we are currently not in blocking section. - if (heartbeatUpdater.heartbeatTs() < U.currentTimeMillis()) { - synchronized (heartbeatUpdater) { - // Double check under the lock. - if (heartbeatUpdater.heartbeatTs() < U.currentTimeMillis()) - heartbeatUpdater.updateHeartbeat(); - } - } - }); + res.listen(fut -> heartbeatUpdater.updateHeartbeat()); asyncRunner.execute(U.wrapIgniteFuture(cmd, res)); From 5bb82d3ecd6ae1f2be17fb25a93a7c645f589ef3 Mon Sep 17 00:00:00 2001 From: Aleksey Plekhanov Date: Thu, 15 Jul 2021 15:16:03 +0300 Subject: [PATCH 3/3] IGNITE-15099 Fix concurrent heartbeat update while in blocking section for system workers --- .../apache/ignite/failure/SystemWorkersBlockingTest.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java index 7aff722c056c9..57495daec0a7a 100644 --- a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.internal.worker.WorkersRegistry; import org.apache.ignite.testframework.GridTestUtils; @@ -135,6 +136,7 @@ public void testBlockingSection() throws Exception { CountDownLatch startLatch = new CountDownLatch(1); CountDownLatch blockingSectionLatch = new CountDownLatch(1); + CountDownLatch endLatch = new CountDownLatch(1); GridWorker worker = new GridWorker(ignite.name(), "test-worker", log) { @Override protected void body() { @@ -150,6 +152,8 @@ public void testBlockingSection() throws Exception { } finally { blockingSectionEnd(); + + endLatch.countDown(); } } }; @@ -167,7 +171,11 @@ public void testBlockingSection() throws Exception { blockingSectionLatch.countDown(); + endLatch.await(); + assertNull(failureError.get()); + + assertTrue(worker.heartbeatTs() <= U.currentTimeMillis()); } /**