Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronize WriteReplicaResult callbacks #36770

Merged
merged 2 commits into from Dec 18, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -163,6 +163,7 @@ public synchronized void respond(ActionListener<Response> listener) {
* Respond if the refresh has occurred and the listener is ready. Always called while synchronized on {@code this}.
*/
protected void respondIfPossible(Exception ex) {
assert Thread.holdsLock(this);
if (finishedAsyncActions && listener != null) {
if (ex == null) {
super.respond(listener);
Expand Down Expand Up @@ -206,7 +207,7 @@ public WriteReplicaResult(ReplicaRequest request, @Nullable Location location,
}

@Override
public void respond(ActionListener<TransportResponse.Empty> listener) {
public synchronized void respond(ActionListener<TransportResponse.Empty> listener) {
this.listener = listener;
respondIfPossible(null);
}
Expand All @@ -215,6 +216,7 @@ public void respond(ActionListener<TransportResponse.Empty> listener) {
* Respond if the refresh has occurred and the listener is ready. Always called while synchronized on {@code this}.
*/
protected void respondIfPossible(Exception ex) {
assert Thread.holdsLock(this);
if (finishedAsyncActions && listener != null) {
if (ex == null) {
super.respond(listener);
Expand All @@ -225,7 +227,7 @@ protected void respondIfPossible(Exception ex) {
}

@Override
public void onFailure(Exception ex) {
public synchronized void onFailure(Exception ex) {
finishedAsyncActions = true;
respondIfPossible(ex);
}
Expand Down
Expand Up @@ -65,6 +65,9 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -347,6 +350,51 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException {
}
}

public void testConcurrentWriteReplicaResultCompletion() throws InterruptedException {
IndexShard replica = mock(IndexShard.class);
when(replica.getTranslogDurability()).thenReturn(Translog.Durability.ASYNC);
TestRequest request = new TestRequest();
request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
TransportWriteAction.WriteReplicaResult<TestRequest> replicaResult = new TransportWriteAction.WriteReplicaResult<>(
request, new Translog.Location(0, 0, 0), null, replica, logger);
CyclicBarrier barrier = new CyclicBarrier(2);
Runnable waitForBarrier = () -> {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new AssertionError(e);
}
};
CountDownLatch completionLatch = new CountDownLatch(1);
threadPool.generic().execute(() -> {
waitForBarrier.run();
replicaResult.respond(new ActionListener<TransportResponse.Empty>() {
@Override
public void onResponse(TransportResponse.Empty empty) {
completionLatch.countDown();
}

@Override
public void onFailure(Exception e) {
completionLatch.countDown();
}
});
});
if (randomBoolean()) {
threadPool.generic().execute(() -> {
waitForBarrier.run();
replicaResult.onFailure(null);
});
} else {
threadPool.generic().execute(() -> {
waitForBarrier.run();
replicaResult.onSuccess(false);
});
}

assertTrue(completionLatch.await(30, TimeUnit.SECONDS));
}

private class TestAction extends TransportWriteAction<TestRequest, TestRequest, TestResponse> {

private final boolean withDocumentFailureOnPrimary;
Expand Down