From 160d670e7a91f16ae554fddd2e3cb4d09ef7297f Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 18 Dec 2018 14:45:05 +0100 Subject: [PATCH] Synchronize WriteReplicaResult callbacks --- .../replication/TransportWriteAction.java | 6 ++- .../TransportWriteActionTests.java | 48 +++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 5d425b16d1617..134d130fddcbe 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -163,6 +163,7 @@ public synchronized void respond(ActionListener listener) { * Respond if the refresh has occurred and the listener is ready. Always called while synchronized on {@code this}. */ protected void respondIfPossible(Exception ex) { + assert Thread.holdsLock(this); if (finishedAsyncActions && listener != null) { if (ex == null) { super.respond(listener); @@ -206,7 +207,7 @@ public WriteReplicaResult(ReplicaRequest request, @Nullable Location location, } @Override - public void respond(ActionListener listener) { + public synchronized void respond(ActionListener listener) { this.listener = listener; respondIfPossible(null); } @@ -215,6 +216,7 @@ public void respond(ActionListener listener) { * Respond if the refresh has occurred and the listener is ready. Always called while synchronized on {@code this}. */ protected void respondIfPossible(Exception ex) { + assert Thread.holdsLock(this); if (finishedAsyncActions && listener != null) { if (ex == null) { super.respond(listener); @@ -225,7 +227,7 @@ protected void respondIfPossible(Exception ex) { } @Override - public void onFailure(Exception ex) { + public synchronized void onFailure(Exception ex) { finishedAsyncActions = true; respondIfPossible(ex); } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 3eb9811c50db2..6e1ec3c76797d 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -65,6 +65,9 @@ import java.util.Collections; import java.util.HashSet; import java.util.Locale; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -347,6 +350,51 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { } } + public void testConcurrentWriteReplicaResultCompletion() throws InterruptedException { + IndexShard replica = mock(IndexShard.class); + when(replica.getTranslogDurability()).thenReturn(Translog.Durability.ASYNC); + TestRequest request = new TestRequest(); + request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL); + TransportWriteAction.WriteReplicaResult replicaResult = new TransportWriteAction.WriteReplicaResult<>( + request, new Translog.Location(0, 0, 0), null, replica, logger); + CyclicBarrier barrier = new CyclicBarrier(2); + Runnable waitForBarrier = () -> { + try { + barrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new AssertionError(e); + } + }; + CountDownLatch completionLatch = new CountDownLatch(1); + threadPool.generic().execute(() -> { + waitForBarrier.run(); + replicaResult.respond(new ActionListener() { + @Override + public void onResponse(TransportResponse.Empty empty) { + completionLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + completionLatch.countDown(); + } + }); + }); + if (randomBoolean()) { + threadPool.generic().execute(() -> { + waitForBarrier.run(); + replicaResult.onFailure(null); + }); + } else { + threadPool.generic().execute(() -> { + waitForBarrier.run(); + replicaResult.onSuccess(false); + }); + } + + assertTrue(completionLatch.await(30, TimeUnit.SECONDS)); + } + private class TestAction extends TransportWriteAction { private final boolean withDocumentFailureOnPrimary;