Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
Expand Down Expand Up @@ -56,6 +57,10 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
/** Timestamp to be updated by this worker periodically to indicate it's up and running. */
private volatile long heartbeatTs;

/** Atomic field updater to change heartbeat. */
private static final AtomicLongFieldUpdater<GridWorker> HEARTBEAT_UPDATER =
AtomicLongFieldUpdater.newUpdater(GridWorker.class, "heartbeatTs");

/** Mutex for finish awaiting. */
private final Object mux = new Object();

Expand Down Expand Up @@ -273,7 +278,16 @@ public boolean isDone() {

/** {@inheritDoc} */
@Override public void updateHeartbeat() {
heartbeatTs = U.currentTimeMillis();
long curTs = U.currentTimeMillis();
long hbTs = heartbeatTs;

// Avoid heartbeat update while in the blocking section.
while (hbTs < curTs) {
if (HEARTBEAT_UPDATER.compareAndSet(this, hbTs, curTs))
return;

hbTs = heartbeatTs;
}
}

/** {@inheritDoc} */
Expand All @@ -283,7 +297,7 @@ public boolean isDone() {

/** {@inheritDoc} */
@Override public void blockingSectionEnd() {
updateHeartbeat();
heartbeatTs = U.currentTimeMillis();
}

/** Can be called from {@link #runner()} thread to perform idleness handling. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.worker.WorkersRegistry;
import org.apache.ignite.testframework.GridTestUtils;
Expand Down Expand Up @@ -126,6 +127,57 @@ public void testBlockingWorker() throws Exception {
e -> LatchingGridWorker.class.getName().equals(e.getClassName())));
}

/**
* @throws Exception If failed.
*/
@Test
public void testBlockingSection() throws Exception {
IgniteEx ignite = startGrid(0);

CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch blockingSectionLatch = new CountDownLatch(1);
CountDownLatch endLatch = new CountDownLatch(1);

GridWorker worker = new GridWorker(ignite.name(), "test-worker", log) {
@Override protected void body() {
blockingSectionBegin();

try {
startLatch.countDown();

blockingSectionLatch.await();
}
catch (Exception ignore) {
// No-op.
}
finally {
blockingSectionEnd();

endLatch.countDown();
}
}
};

runWorker(worker);

ignite.context().workersRegistry().register(worker);

startLatch.await();

// Check that concurrent heartbeat update doesn't affect the blocking section.
worker.updateHeartbeat();

Thread.sleep(2 * SYSTEM_WORKER_BLOCKED_TIMEOUT);

blockingSectionLatch.countDown();

endLatch.await();

assertNull(failureError.get());

assertTrue(worker.heartbeatTs() <= U.currentTimeMillis());
}

/**
* Tests that repeatedly calling {@link WorkersRegistry#onIdle} in single registered {@link GridWorker}
* doesn't lead to infinite loop.
Expand Down