diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 17756630517d2..921eeb319f1a0 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2286,23 +2286,32 @@ private void bumpPrimaryTerm(long newPrimaryTerm, final Ch assert newPrimaryTerm > pendingPrimaryTerm; assert operationPrimaryTerm <= pendingPrimaryTerm; final CountDownLatch termUpdated = new CountDownLatch(1); - indexShardOperationPermits.asyncBlockOperations(30, TimeUnit.MINUTES, () -> { - assert operationPrimaryTerm <= pendingPrimaryTerm; - termUpdated.await(); - // indexShardOperationPermits doesn't guarantee that async submissions are executed - // in the order submitted. We need to guard against another term bump - if (operationPrimaryTerm < newPrimaryTerm) { - operationPrimaryTerm = newPrimaryTerm; - onBlocked.run(); - } - }, - e -> { + indexShardOperationPermits.asyncBlockOperations(new ActionListener() { + @Override + public void onFailure(final Exception e) { try { failShard("exception during primary term transition", e); } catch (AlreadyClosedException ace) { // ignore, shard is already closed } - }); + } + + @Override + public void onResponse(final Releasable releasable) { + try (Releasable ignored = releasable) { + assert operationPrimaryTerm <= pendingPrimaryTerm; + termUpdated.await(); + // indexShardOperationPermits doesn't guarantee that async submissions are executed + // in the order submitted. We need to guard against another term bump + if (operationPrimaryTerm < newPrimaryTerm) { + operationPrimaryTerm = newPrimaryTerm; + onBlocked.run(); + } + } catch (final Exception e) { + onFailure(e); + } + } + }, 30, TimeUnit.MINUTES); pendingPrimaryTerm = newPrimaryTerm; termUpdated.countDown(); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index d4c3833b13a58..67c48c38791f0 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -104,42 +103,54 @@ void blockOperations( final TimeUnit timeUnit, final CheckedRunnable onBlocked) throws InterruptedException, TimeoutException, E { delayOperations(); - try { - doBlockOperations(timeout, timeUnit, onBlocked); + try (Releasable ignored = acquireAll(timeout, timeUnit)) { + onBlocked.run(); } finally { releaseDelayedOperations(); } } /** - * Immediately delays operations and on another thread waits for in-flight operations to finish and then executes {@code onBlocked} - * under the guarantee that no new operations are started. Delayed operations are run after {@code onBlocked} has executed. After - * operations are delayed and the blocking is forked to another thread, returns to the caller. If a failure occurs while blocking - * operations or executing {@code onBlocked} then the {@code onFailure} handler will be invoked. + * Immediately delays operations and on another thread waits for in-flight operations to finish and then acquires all permits. When all + * permits are acquired, the provided {@link ActionListener} is called under the guarantee that no new operations are started. Delayed + * operations are run once the {@link Releasable} is released or if a failure occurs while acquiring all permits; in this case the + * {@code onFailure} handler will be invoked after delayed operations are released. * - * @param timeout the maximum time to wait for the in-flight operations block - * @param timeUnit the time unit of the {@code timeout} argument - * @param onBlocked the action to run once the block has been acquired - * @param onFailure the action to run if a failure occurs while blocking operations - * @param the type of checked exception thrown by {@code onBlocked} (not thrown on the calling thread) + * @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed + * @param timeout the maximum time to wait for the in-flight operations block + * @param timeUnit the time unit of the {@code timeout} argument */ - void asyncBlockOperations( - final long timeout, final TimeUnit timeUnit, final CheckedRunnable onBlocked, final Consumer onFailure) { + public void asyncBlockOperations(final ActionListener onAcquired, final long timeout, final TimeUnit timeUnit) { delayOperations(); threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() { + + final AtomicBoolean released = new AtomicBoolean(false); + @Override public void onFailure(final Exception e) { - onFailure.accept(e); + try { + releaseDelayedOperationsIfNeeded(); // resume delayed operations as soon as possible + } finally { + onAcquired.onFailure(e); + } } @Override protected void doRun() throws Exception { - doBlockOperations(timeout, timeUnit, onBlocked); + final Releasable releasable = acquireAll(timeout, timeUnit); + onAcquired.onResponse(() -> { + try { + releasable.close(); + } finally { + releaseDelayedOperationsIfNeeded(); + } + }); } - @Override - public void onAfter() { - releaseDelayedOperations(); + private void releaseDelayedOperationsIfNeeded() { + if (released.compareAndSet(false, true)) { + releaseDelayedOperations(); + } } }); } @@ -154,10 +165,7 @@ private void delayOperations() { } } - private void doBlockOperations( - final long timeout, - final TimeUnit timeUnit, - final CheckedRunnable onBlocked) throws InterruptedException, TimeoutException, E { + private Releasable acquireAll(final long timeout, final TimeUnit timeUnit) throws InterruptedException, TimeoutException { if (Assertions.ENABLED) { // since delayed is not volatile, we have to synchronize even here for visibility synchronized (this) { @@ -165,12 +173,13 @@ private void doBlockOperations( } } if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) { - assert semaphore.availablePermits() == 0; - try { - onBlocked.run(); - } finally { - semaphore.release(TOTAL_PERMITS); - } + final AtomicBoolean closed = new AtomicBoolean(); + return () -> { + if (closed.compareAndSet(false, true)) { + assert semaphore.availablePermits() == 0; + semaphore.release(TOTAL_PERMITS); + } + }; } else { throw new TimeoutException("timeout while blocking operations"); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index 23337def2ae1b..a785c2c4d8224 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; @@ -199,8 +200,9 @@ public void testBlockIfClosed() { permits.close(); expectThrows(IndexShardClosedException.class, () -> permits.blockOperations(randomInt(10), TimeUnit.MINUTES, () -> { throw new IllegalArgumentException("fake error"); })); - expectThrows(IndexShardClosedException.class, () -> permits.asyncBlockOperations(randomInt(10), TimeUnit.MINUTES, - () -> { throw new IllegalArgumentException("fake error"); }, e -> { throw new AssertionError(e); })); + expectThrows(IndexShardClosedException.class, + () -> permits.asyncBlockOperations(wrap(() -> { throw new IllegalArgumentException("fake error");}), + randomInt(10), TimeUnit.MINUTES)); } public void testOperationsDelayedIfBlock() throws ExecutionException, InterruptedException, TimeoutException { @@ -220,17 +222,11 @@ public void testGetBlockWhenBlocked() throws ExecutionException, InterruptedExce try (Releasable ignored = blockAndWait()) { permits.acquire(future, ThreadPool.Names.GENERIC, true, ""); - permits.asyncBlockOperations( - 30, - TimeUnit.MINUTES, - () -> { - blocked.set(true); - blockAcquired.countDown(); - releaseBlock.await(); - }, - e -> { - throw new RuntimeException(e); - }); + permits.asyncBlockOperations(wrap(() -> { + blocked.set(true); + blockAcquired.countDown(); + releaseBlock.await(); + }), 30, TimeUnit.MINUTES); assertFalse(blocked.get()); assertFalse(future.isDone()); } @@ -292,7 +288,7 @@ public void onResponse(Releasable releasable) { future2.get(1, TimeUnit.HOURS).close(); } - protected Releasable blockAndWait() throws InterruptedException { + private Releasable blockAndWait() throws InterruptedException { CountDownLatch blockAcquired = new CountDownLatch(1); CountDownLatch releaseBlock = new CountDownLatch(1); CountDownLatch blockReleased = new CountDownLatch(1); @@ -334,17 +330,11 @@ public void testAsyncBlockOperationsOperationWhileBlocked() throws InterruptedEx final CountDownLatch blockAcquired = new CountDownLatch(1); final CountDownLatch releaseBlock = new CountDownLatch(1); final AtomicBoolean blocked = new AtomicBoolean(); - permits.asyncBlockOperations( - 30, - TimeUnit.MINUTES, - () -> { - blocked.set(true); - blockAcquired.countDown(); - releaseBlock.await(); - }, - e -> { - throw new RuntimeException(e); - }); + permits.asyncBlockOperations(wrap(() -> { + blocked.set(true); + blockAcquired.countDown(); + releaseBlock.await(); + }), 30, TimeUnit.MINUTES); blockAcquired.await(); assertTrue(blocked.get()); @@ -392,16 +382,10 @@ public void testAsyncBlockOperationsOperationBeforeBlocked() throws InterruptedE // now we will delay operations while the first operation is still executing (because it is latched) final CountDownLatch blockedLatch = new CountDownLatch(1); final AtomicBoolean onBlocked = new AtomicBoolean(); - permits.asyncBlockOperations( - 30, - TimeUnit.MINUTES, - () -> { - onBlocked.set(true); - blockedLatch.countDown(); - }, e -> { - throw new RuntimeException(e); - }); - + permits.asyncBlockOperations(wrap(() -> { + onBlocked.set(true); + blockedLatch.countDown(); + }), 30, TimeUnit.MINUTES); assertFalse(onBlocked.get()); // if we submit another operation, it should be delayed @@ -486,15 +470,10 @@ public void onFailure(Exception e) { } catch (final BrokenBarrierException | InterruptedException e) { throw new RuntimeException(e); } - permits.asyncBlockOperations( - 30, - TimeUnit.MINUTES, - () -> { - values.add(operations); - operationLatch.countDown(); - }, e -> { - throw new RuntimeException(e); - }); + permits.asyncBlockOperations(wrap(() -> { + values.add(operations); + operationLatch.countDown(); + }), 30, TimeUnit.MINUTES); }); blockingThread.start(); @@ -559,16 +538,20 @@ public void testActiveOperationsCount() throws ExecutionException, InterruptedEx public void testAsyncBlockOperationsOnFailure() throws InterruptedException { final AtomicReference reference = new AtomicReference<>(); final CountDownLatch onFailureLatch = new CountDownLatch(1); - permits.asyncBlockOperations( - 10, - TimeUnit.MINUTES, - () -> { + permits.asyncBlockOperations(new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + try (Releasable ignored = releasable) { throw new RuntimeException("simulated"); - }, - e -> { - reference.set(e); - onFailureLatch.countDown(); - }); + } + } + + @Override + public void onFailure(final Exception e) { + reference.set(e); + onFailureLatch.countDown(); + } + }, 10, TimeUnit.MINUTES); onFailureLatch.await(); assertThat(reference.get(), instanceOf(RuntimeException.class)); assertThat(reference.get(), hasToString(containsString("simulated"))); @@ -596,14 +579,18 @@ public void testTimeout() throws BrokenBarrierException, InterruptedException { { final AtomicReference reference = new AtomicReference<>(); final CountDownLatch onFailureLatch = new CountDownLatch(1); - permits.asyncBlockOperations( - 1, - TimeUnit.MILLISECONDS, - () -> {}, - e -> { - reference.set(e); - onFailureLatch.countDown(); - }); + permits.asyncBlockOperations(new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + releasable.close(); + } + + @Override + public void onFailure(final Exception e) { + reference.set(e); + onFailureLatch.countDown(); + } + }, 1, TimeUnit.MILLISECONDS); onFailureLatch.await(); assertThat(reference.get(), hasToString(containsString("timeout while blocking operations"))); } @@ -716,4 +703,22 @@ public void testPermitTraceCapturing() throws ExecutionException, InterruptedExc assertThat(permits.getActiveOperationsCount(), equalTo(0)); assertThat(permits.getActiveOperations(), emptyIterable()); } + + private static ActionListener wrap(final CheckedRunnable onResponse) { + return new ActionListener() { + @Override + public void onResponse(final Releasable releasable) { + try (Releasable ignored = releasable) { + onResponse.run(); + } catch (final Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(final Exception e) { + throw new AssertionError(e); + } + }; + } }