Skip to content

Commit

Permalink
Fix leaking searcher when shards are removed or relocated (#52099)
Browse files Browse the repository at this point in the history
We might leak a searcher if the target shard is removed (i.e., its index
is deleted) or relocated while we are creating a SearchContext from a
SearchRewriteContext.

Relates #51708
Closes #52021

I labelled this non-issue for an unreleased bug introduced in #51708.
  • Loading branch information
dnhatn committed Feb 10, 2020
1 parent 79f67e7 commit 80a9a08
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 10 deletions.
17 changes: 9 additions & 8 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -694,23 +694,24 @@ public DefaultSearchContext createSearchContext(ShardSearchRequest request, Time
}

private DefaultSearchContext createSearchContext(SearchRewriteContext rewriteContext, TimeValue timeout) {
final ShardSearchRequest request = rewriteContext.request;
final Engine.Searcher searcher = rewriteContext.searcher;
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().getId());
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(),
indexShard.shardId(), request.getClusterAlias(), OriginalIndices.NONE);
boolean success = false;
try {
final ShardSearchRequest request = rewriteContext.request;
final Engine.Searcher searcher = rewriteContext.searcher;
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().getId());
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(),
indexShard.shardId(), request.getClusterAlias(), OriginalIndices.NONE);
DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget,
searcher, clusterService, indexService, indexShard, bigArrays, threadPool::relativeTimeInMillis, timeout,
fetchPhase, clusterService.state().nodes().getMinNodeVersion());
success = true;
return searchContext;
} finally {
if (success == false) {
// we handle the case where the DefaultSearchContext constructor throws an exception since we would otherwise
// leak a searcher and this can have severe implications (unable to obtain shard lock exceptions).
// we handle the case where `IndicesService#indexServiceSafe`or `IndexService#getShard`, or the DefaultSearchContext
// constructor throws an exception since we would otherwise leak a searcher and this can have severe implications
// (unable to obtain shard lock exceptions).
IOUtils.closeWhileHandlingException(rewriteContext.searcher);
}
}
Expand Down
21 changes: 19 additions & 2 deletions server/src/test/java/org/elasticsearch/recovery/RelocationIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -457,7 +458,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO
}
}

public void testIndexAndRelocateConcurrently() throws Exception {
public void testIndexSearchAndRelocateConcurrently() throws Exception {
int halfNodes = randomIntBetween(1, 3);
Settings[] nodeSettings = Stream.concat(
Stream.generate(() -> Settings.builder().put("node.attr.color", "blue").build()).limit(halfNodes),
Expand All @@ -474,8 +475,21 @@ public void testIndexAndRelocateConcurrently() throws Exception {
.put("index.routing.allocation.exclude.color", "blue")
.put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1));
if (randomBoolean()) {
settings.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), randomIntBetween(1, 10) + "s");
}
assertAcked(prepareCreate("test", settings));
assertAllShardsOnNodes("test", redNodes);
AtomicBoolean stopped = new AtomicBoolean(false);
Thread[] searchThreads = randomBoolean() ? new Thread[0] : new Thread[randomIntBetween(1, 4)];
for (int i = 0; i < searchThreads.length; i++) {
searchThreads[i] = new Thread(() -> {
while (stopped.get() == false) {
assertNoFailures(client().prepareSearch("test").setRequestCache(false).get());
}
});
searchThreads[i].start();
}
int numDocs = randomIntBetween(100, 150);
ArrayList<String> ids = new ArrayList<>();
logger.info(" --> indexing [{}] docs", numDocs);
Expand Down Expand Up @@ -513,7 +527,10 @@ public void testIndexAndRelocateConcurrently() throws Exception {
assertNoFailures(afterRelocation);
assertSearchHits(afterRelocation, ids.toArray(new String[ids.size()]));
}

stopped.set(true);
for (Thread searchThread : searchThreads) {
searchThread.join();
}
}

public void testRelocateWhileWaitingForRefresh() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,4 +898,35 @@ public void onFailure(Exception e) {
latch.await();
}
}

public void testDeleteIndexWhileSearch() throws Exception {
createIndex("test");
int numDocs = randomIntBetween(1, 20);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("test", "_doc").setSource("f", "v").get();
}
client().admin().indices().prepareRefresh("test").get();
AtomicBoolean stopped = new AtomicBoolean(false);
Thread[] searchers = new Thread[randomIntBetween(1, 4)];
CountDownLatch latch = new CountDownLatch(searchers.length);
for (int i = 0; i < searchers.length; i++) {
searchers[i] = new Thread(() -> {
latch.countDown();
while (stopped.get() == false) {
try {
client().prepareSearch("test").setRequestCache(false).get();
} catch (Exception ignored) {
return;
}
}
});
searchers[i].start();
}
latch.await();
client().admin().indices().prepareDelete("test").get();
stopped.set(true);
for (Thread searcher : searchers) {
searcher.join();
}
}
}

0 comments on commit 80a9a08

Please sign in to comment.