diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index 657539c43d669..287c2eb50f1bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -32,7 +32,6 @@ import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.resources.*; -import org.jdk8.backport.*; import org.jetbrains.annotations.*; import java.io.*; @@ -58,8 +57,8 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** Lock to control execution after stop. */ private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock(); - /** Workers count. */ - private final LongAdder workersCnt = new LongAdder(); + /** Stopping flag. */ + private boolean stopping; /** * @param ctx Kernal context. @@ -81,41 +80,37 @@ public GridClosureProcessor(GridKernalContext ctx) { /** {@inheritDoc} */ @SuppressWarnings("BusyWait") @Override public void onKernalStop(boolean cancel) { - busyLock.writeLock(); + boolean interrupted = false; - boolean interrupted = Thread.interrupted(); - - while (workersCnt.sum() != 0) { + // Busy wait is intentional. + while (true) { try { - Thread.sleep(200); + if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS)) + break; + else + Thread.sleep(200); } - catch (InterruptedException ignored) { + catch (InterruptedException ignore) { + // Preserve interrupt status & ignore. + // Note that interrupted flag is cleared. interrupted = true; } } - if (interrupted) - Thread.currentThread().interrupt(); + try { + if (interrupted) + Thread.currentThread().interrupt(); + + stopping = true; + } + finally { + busyLock.writeUnlock(); + } if (log.isDebugEnabled()) log.debug("Stopped closure processor."); } - /** - * @throws IllegalStateException If grid is stopped. - */ - private void enterBusy() throws IllegalStateException { - if (!busyLock.tryReadLock()) - throw new IllegalStateException("Closure processor cannot be used on stopped grid: " + ctx.gridName()); - } - - /** - * Unlocks busy lock. - */ - private void leaveBusy() { - busyLock.readUnlock(); - } - /** * @param mode Distribution mode. * @param jobs Closures to execute. @@ -142,9 +137,14 @@ public ComputeTaskInternalFuture runAsync(GridClosureCallMode mode, assert mode != null; assert !F.isEmpty(jobs) : jobs; - enterBusy(); + busyLock.readLock(); try { + if (stopping) { + return ComputeTaskInternalFuture.finishedFuture(ctx, T1.class, + new IgniteCheckedException("Closure processor cannot be used on stopped grid: " + ctx.gridName())); + } + if (F.isEmpty(nodes)) return ComputeTaskInternalFuture.finishedFuture(ctx, T1.class, U.emptyTopologyException()); @@ -153,7 +153,7 @@ public ComputeTaskInternalFuture runAsync(GridClosureCallMode mode, return ctx.task().execute(new T1(mode, jobs), null, sys); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -183,7 +183,7 @@ public ComputeTaskInternalFuture runAsync(GridClosureCallMode mode, assert mode != null; assert job != null; - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -194,7 +194,7 @@ public ComputeTaskInternalFuture runAsync(GridClosureCallMode mode, return ctx.task().execute(new T2(mode, job), null, sys); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -319,7 +319,7 @@ public ComputeTaskInternalFuture forkjoinAsync(GridClosureCallMode assert rdc != null; assert !F.isEmpty(jobs); - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -330,7 +330,7 @@ public ComputeTaskInternalFuture forkjoinAsync(GridClosureCallMode return ctx.task().execute(new T3<>(mode, jobs, rdc), null); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -364,7 +364,7 @@ public ComputeTaskInternalFuture> callAsync(GridClosureCallMod assert mode != null; assert !F.isEmpty(jobs); - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -375,7 +375,7 @@ public ComputeTaskInternalFuture> callAsync(GridClosureCallMod return ctx.task().execute(new T6<>(mode, jobs), null, sys); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -401,7 +401,7 @@ public ComputeTaskInternalFuture callAsync(GridClosureCallMode mode, */ public ComputeTaskInternalFuture affinityCall(@Nullable String cacheName, Object affKey, Callable job, @Nullable Collection nodes) { - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -420,7 +420,7 @@ public ComputeTaskInternalFuture affinityCall(@Nullable String cacheName, return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, e); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -433,7 +433,7 @@ public ComputeTaskInternalFuture affinityCall(@Nullable String cacheName, */ public ComputeTaskInternalFuture affinityRun(@Nullable String cacheName, Object affKey, Runnable job, @Nullable Collection nodes) { - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -452,7 +452,7 @@ public ComputeTaskInternalFuture affinityRun(@Nullable String cacheName, Obje return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, e); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -468,7 +468,7 @@ public IgniteInternalFuture callAsyncNoFailover(GridClosureCallMode mode, @Nullable Collection nodes, boolean sys) { assert mode != null; - enterBusy(); + busyLock.readLock(); try { if (job == null) @@ -483,7 +483,7 @@ public IgniteInternalFuture callAsyncNoFailover(GridClosureCallMode mode, return ctx.task().execute(new T7<>(mode, job), null, sys); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -500,7 +500,7 @@ public IgniteInternalFuture> callAsyncNoFailover(GridClosureCa boolean sys) { assert mode != null; - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(jobs)) @@ -515,7 +515,7 @@ public IgniteInternalFuture> callAsyncNoFailover(GridClosureCa return ctx.task().execute(new T6<>(mode, jobs), null, sys); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -535,7 +535,7 @@ public ComputeTaskInternalFuture callAsync(GridClosureCallMode mode, assert mode != null; assert job != null; - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -546,7 +546,7 @@ public ComputeTaskInternalFuture callAsync(GridClosureCallMode mode, return ctx.task().execute(new T7<>(mode, job), null, sys); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -558,7 +558,7 @@ public ComputeTaskInternalFuture callAsync(GridClosureCallMode mode, */ public ComputeTaskInternalFuture callAsync(IgniteClosure job, @Nullable T arg, @Nullable Collection nodes) { - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -569,7 +569,7 @@ public ComputeTaskInternalFuture callAsync(IgniteClosure job, @N return ctx.task().execute(new T8<>(job, arg), null, false); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -581,7 +581,7 @@ public ComputeTaskInternalFuture callAsync(IgniteClosure job, @N */ public IgniteInternalFuture> broadcast(IgniteClosure job, @Nullable T arg, @Nullable Collection nodes) { - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -592,7 +592,7 @@ public IgniteInternalFuture> broadcast(IgniteClosure return ctx.task().execute(new T11<>(job, arg, nodes), null, false); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -604,7 +604,7 @@ public IgniteInternalFuture> broadcast(IgniteClosure */ public IgniteInternalFuture> broadcastNoFailover(IgniteClosure job, @Nullable T arg, @Nullable Collection nodes) { - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -616,7 +616,7 @@ public IgniteInternalFuture> broadcastNoFailover(IgniteClos return ctx.task().execute(new T11<>(job, arg, nodes), null, false); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -630,7 +630,7 @@ public ComputeTaskInternalFuture> callAsync(IgniteClosure args, @Nullable Collection nodes) { - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -641,7 +641,7 @@ public ComputeTaskInternalFuture> callAsync(IgniteClosure(job, args), null, false); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -654,7 +654,7 @@ public ComputeTaskInternalFuture> callAsync(IgniteClosure ComputeTaskInternalFuture callAsync(IgniteClosure job, Collection args, IgniteReducer rdc, @Nullable Collection nodes) { - enterBusy(); + busyLock.readLock(); try { if (F.isEmpty(nodes)) @@ -665,7 +665,7 @@ public ComputeTaskInternalFuture callAsync(IgniteClosure return ctx.task().execute(new T10<>(job, args, rdc), null, false); } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -733,7 +733,7 @@ private IgniteInternalFuture runLocal(@Nullable final Runnable c, GridClosure if (c == null) return new GridFinishedFuture(ctx); - enterBusy(); + busyLock.readLock(); try { // Inject only if needed. @@ -744,8 +744,6 @@ private IgniteInternalFuture runLocal(@Nullable final Runnable c, GridClosure final GridWorkerFuture fut = new GridWorkerFuture(ctx); - workersCnt.increment(); - GridWorker w = new GridWorker(ctx.gridName(), "closure-proc-worker", log) { @Override protected void body() { try { @@ -762,9 +760,6 @@ private IgniteInternalFuture runLocal(@Nullable final Runnable c, GridClosure fut.onDone(U.cast(e)); } - finally { - workersCnt.decrement(); - } } }; @@ -783,7 +778,7 @@ private IgniteInternalFuture runLocal(@Nullable final Runnable c, GridClosure return fut; } finally { - leaveBusy(); + busyLock.readUnlock(); } } @@ -870,7 +865,7 @@ private IgniteInternalFuture callLocal(@Nullable final Callable c, Gri if (c == null) return new GridFinishedFuture<>(ctx); - enterBusy(); + busyLock.readLock(); try { // Inject only if needed. @@ -881,8 +876,6 @@ private IgniteInternalFuture callLocal(@Nullable final Callable c, Gri final GridWorkerFuture fut = new GridWorkerFuture<>(ctx); - workersCnt.increment(); - GridWorker w = new GridWorker(ctx.gridName(), "closure-proc-worker", log) { @Override protected void body() { try { @@ -897,9 +890,6 @@ private IgniteInternalFuture callLocal(@Nullable final Callable c, Gri fut.onDone(U.cast(e)); } - finally { - workersCnt.decrement(); - } } }; @@ -918,7 +908,7 @@ private IgniteInternalFuture callLocal(@Nullable final Callable c, Gri return fut; } finally { - leaveBusy(); + busyLock.readUnlock(); } }