Skip to content

Commit

Permalink
Fix ReopenWhileClosingIT Deadlocks (#92662) (#92665)
Browse files Browse the repository at this point in the history
Instead of blocking either a transport or management thread here (the latter will lead to
a dead-lock if there's only a single management thread available, we should just queue
up those sends and run them async via listener).
This test is currently only working on `main` because of #90193, this makes it work
with a single management thread as well so it works on 7.x again.

closes #92629
  • Loading branch information
original-brownbear committed Jan 4, 2023
1 parent 7221c6a commit 4085ed8
Showing 1 changed file with 13 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.core.Glob;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
Expand All @@ -25,6 +27,7 @@
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -134,8 +137,7 @@ private Releasable interceptVerifyShardBeforeCloseActions(final String indexPatt
TransportService.class,
internalCluster().getMasterName()
);

final CountDownLatch release = new CountDownLatch(1);
final ListenableFuture<Void> release = new ListenableFuture<>();
for (DiscoveryNode node : internalCluster().clusterService().state().getNodes()) {
mockTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, node.getName()),
Expand All @@ -146,21 +148,23 @@ private Releasable interceptVerifyShardBeforeCloseActions(final String indexPatt
if (Glob.globMatch(indexPattern, index)) {
logger.info("request {} intercepted for index {}", requestId, index);
onIntercept.run();
try {
release.await();
release.addListener(ActionListener.wrap(() -> {
logger.info("request {} released for index {}", requestId, index);
} catch (final InterruptedException e) {
throw new AssertionError(e);
}
try {
connection.sendRequest(requestId, action, request, options);
} catch (IOException e) {
throw new AssertionError(e);
}
}));
return;
}
}

}
connection.sendRequest(requestId, action, request, options);
}
);
}
return Releasables.releaseOnce(release::countDown);
return Releasables.releaseOnce(() -> release.onResponse(null));
}

private static void assertIndexIsBlocked(final String... indices) {
Expand Down

0 comments on commit 4085ed8

Please sign in to comment.