diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java index 5926b9ca3552d..615d5062a9439 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java @@ -19,6 +19,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -56,6 +57,10 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher { /** Timestamp to be updated by this worker periodically to indicate it's up and running. */ private volatile long heartbeatTs; + /** Atomic field updater to change heartbeat. */ + private static final AtomicLongFieldUpdater HEARTBEAT_UPDATER = + AtomicLongFieldUpdater.newUpdater(GridWorker.class, "heartbeatTs"); + /** Mutex for finish awaiting. */ private final Object mux = new Object(); @@ -273,7 +278,16 @@ public boolean isDone() { /** {@inheritDoc} */ @Override public void updateHeartbeat() { - heartbeatTs = U.currentTimeMillis(); + long curTs = U.currentTimeMillis(); + long hbTs = heartbeatTs; + + // Avoid heartbeat update while in the blocking section. + while (hbTs < curTs) { + if (HEARTBEAT_UPDATER.compareAndSet(this, hbTs, curTs)) + return; + + hbTs = heartbeatTs; + } } /** {@inheritDoc} */ @@ -283,7 +297,7 @@ public boolean isDone() { /** {@inheritDoc} */ @Override public void blockingSectionEnd() { - updateHeartbeat(); + heartbeatTs = U.currentTimeMillis(); } /** Can be called from {@link #runner()} thread to perform idleness handling. */ diff --git a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java index ccfc50750d6b7..57495daec0a7a 100644 --- a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.internal.worker.WorkersRegistry; import org.apache.ignite.testframework.GridTestUtils; @@ -126,6 +127,57 @@ public void testBlockingWorker() throws Exception { e -> LatchingGridWorker.class.getName().equals(e.getClassName()))); } + /** + * @throws Exception If failed. + */ + @Test + public void testBlockingSection() throws Exception { + IgniteEx ignite = startGrid(0); + + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch blockingSectionLatch = new CountDownLatch(1); + CountDownLatch endLatch = new CountDownLatch(1); + + GridWorker worker = new GridWorker(ignite.name(), "test-worker", log) { + @Override protected void body() { + blockingSectionBegin(); + + try { + startLatch.countDown(); + + blockingSectionLatch.await(); + } + catch (Exception ignore) { + // No-op. + } + finally { + blockingSectionEnd(); + + endLatch.countDown(); + } + } + }; + + runWorker(worker); + + ignite.context().workersRegistry().register(worker); + + startLatch.await(); + + // Check that concurrent heartbeat update doesn't affect the blocking section. + worker.updateHeartbeat(); + + Thread.sleep(2 * SYSTEM_WORKER_BLOCKED_TIMEOUT); + + blockingSectionLatch.countDown(); + + endLatch.await(); + + assertNull(failureError.get()); + + assertTrue(worker.heartbeatTs() <= U.currentTimeMillis()); + } + /** * Tests that repeatedly calling {@link WorkersRegistry#onIdle} in single registered {@link GridWorker} * doesn't lead to infinite loop.