From e277034a210dcbdc23cff0d737de40f036f6f95a Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 9 Feb 2020 22:03:57 -0500 Subject: [PATCH] Fix leaking searcher when shards are removed or relocated (#52099) We might leak a searcher if the target shard is removed (i.e., its index is deleted) or relocated while we are creating a SearchContext from a SearchRewriteContext. Relates #51708 Closes #52021 I labelled this non-issue for an unreleased bug introduced in #51708. --- .../elasticsearch/search/SearchService.java | 17 +++++----- .../elasticsearch/recovery/RelocationIT.java | 22 +++++++++++-- .../search/SearchServiceTests.java | 31 +++++++++++++++++++ 3 files changed, 60 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index bb83a56599532..d3afcd5c4677a 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -694,22 +694,23 @@ public DefaultSearchContext createSearchContext(ShardSearchRequest request, Time } private DefaultSearchContext createSearchContext(SearchRewriteContext rewriteContext, TimeValue timeout) { - final ShardSearchRequest request = rewriteContext.request; - final Engine.Searcher searcher = rewriteContext.searcher; - IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); - IndexShard indexShard = indexService.getShard(request.shardId().getId()); - SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(), - indexShard.shardId(), request.getClusterAlias(), OriginalIndices.NONE); boolean success = false; try { + final ShardSearchRequest request = rewriteContext.request; + final Engine.Searcher searcher = rewriteContext.searcher; + IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); + IndexShard indexShard = indexService.getShard(request.shardId().getId()); + SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(), + indexShard.shardId(), request.getClusterAlias(), OriginalIndices.NONE); DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, searcher, clusterService, indexService, indexShard, bigArrays, threadPool::relativeTimeInMillis, timeout, fetchPhase); success = true; return searchContext; } finally { if (success == false) { - // we handle the case where the DefaultSearchContext constructor throws an exception since we would otherwise - // leak a searcher and this can have severe implications (unable to obtain shard lock exceptions). + // we handle the case where `IndicesService#indexServiceSafe`or `IndexService#getShard`, or the DefaultSearchContext + // constructor throws an exception since we would otherwise leak a searcher and this can have severe implications + // (unable to obtain shard lock exceptions). IOUtils.closeWhileHandlingException(rewriteContext.searcher); } } diff --git a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 2a29582fdefd3..a04f92d823611 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -47,6 +47,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.shard.IndexEventListener; @@ -86,6 +87,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -456,7 +458,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO } } - public void testIndexAndRelocateConcurrently() throws Exception { + public void testIndexSearchAndRelocateConcurrently() throws Exception { int halfNodes = randomIntBetween(1, 3); Settings[] nodeSettings = Stream.concat( Stream.generate(() -> Settings.builder().put("node.attr.color", "blue").build()).limit(halfNodes), @@ -473,8 +475,21 @@ public void testIndexAndRelocateConcurrently() throws Exception { .put("index.routing.allocation.exclude.color", "blue") .put(indexSettings()) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1)); + if (randomBoolean()) { + settings.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), randomIntBetween(1, 10) + "s"); + } assertAcked(prepareCreate("test", settings)); assertAllShardsOnNodes("test", redNodes); + AtomicBoolean stopped = new AtomicBoolean(false); + Thread[] searchThreads = randomBoolean() ? new Thread[0] : new Thread[randomIntBetween(1, 4)]; + for (int i = 0; i < searchThreads.length; i++) { + searchThreads[i] = new Thread(() -> { + while (stopped.get() == false) { + assertNoFailures(client().prepareSearch("test").setRequestCache(false).get()); + } + }); + searchThreads[i].start(); + } int numDocs = randomIntBetween(100, 150); ArrayList ids = new ArrayList<>(); logger.info(" --> indexing [{}] docs", numDocs); @@ -512,7 +527,10 @@ public void testIndexAndRelocateConcurrently() throws Exception { assertNoFailures(afterRelocation); assertSearchHits(afterRelocation, ids.toArray(new String[ids.size()])); } - + stopped.set(true); + for (Thread searchThread : searchThreads) { + searchThread.join(); + } } public void testRelocateWhileWaitingForRefresh() { diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index c9dc231f0997b..2e61d63c81117 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -898,4 +898,35 @@ public void onFailure(Exception e) { latch.await(); } } + + public void testDeleteIndexWhileSearch() throws Exception { + createIndex("test"); + int numDocs = randomIntBetween(1, 20); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("test").setSource("f", "v").get(); + } + client().admin().indices().prepareRefresh("test").get(); + AtomicBoolean stopped = new AtomicBoolean(false); + Thread[] searchers = new Thread[randomIntBetween(1, 4)]; + CountDownLatch latch = new CountDownLatch(searchers.length); + for (int i = 0; i < searchers.length; i++) { + searchers[i] = new Thread(() -> { + latch.countDown(); + while (stopped.get() == false) { + try { + client().prepareSearch("test").setRequestCache(false).get(); + } catch (Exception ignored) { + return; + } + } + }); + searchers[i].start(); + } + latch.await(); + client().admin().indices().prepareDelete("test").get(); + stopped.set(true); + for (Thread searcher : searchers) { + searcher.join(); + } + } }