Skip to content

Commit

Permalink
Fix scroll contexts counter in SearchService (#71354)
Browse files Browse the repository at this point in the history
The scroll context counter can be negative even become Integer.MAX_VALUE
after handling many search requests.  This bug causes two issues:

- Disable the limit of open scroll contexts when the counter is negative
- Prevent opening new scroll contexts when the counter is greater than 
  the limit of open scroll contexts

Relates #53449
Closes #56202
  • Loading branch information
dnhatn committed Apr 7, 2021
1 parent 5d0321c commit f14e5f4
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 14 deletions.
16 changes: 14 additions & 2 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Setting;
Expand Down Expand Up @@ -574,6 +575,7 @@ private SearchContext findContext(long id, TransportRequest request) throws Sear
}

final SearchContext createAndPutContext(ShardSearchRequest request) throws IOException {
final Releasable decreaseScrollContexts;
if (request.scroll() != null) {
if (maxOpenScrollContext == Integer.MAX_VALUE && openScrollContexts.get() > 500) {
/**
Expand All @@ -593,14 +595,17 @@ final SearchContext createAndPutContext(ShardSearchRequest request) throws IOExc
maxOpenScrollContext + "]. " + "This limit can be set by changing the ["
+ MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting.");
}
decreaseScrollContexts = openScrollContexts::decrementAndGet;
} else {
decreaseScrollContexts = () -> {};
}
SearchContext context = null;
try {
context = createContext(request);
context.addReleasable(openScrollContexts::decrementAndGet, Lifetime.CONTEXT);
context.addReleasable(decreaseScrollContexts, Lifetime.CONTEXT);
} finally {
if (context == null) {
openScrollContexts.decrementAndGet();
decreaseScrollContexts.close();
}
}
onNewContext(context);
Expand Down Expand Up @@ -1023,6 +1028,13 @@ public int getActiveContexts() {
return this.activeContexts.size();
}

/**
* Returns the number of scroll contexts opened on the node
*/
public int getOpenScrollContexts() {
return openScrollContexts.get();
}

public ResponseCollectorService getResponseCollectorService() {
return this.responseCollectorService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,5 +796,6 @@ public void testCreateSearchContext() throws IOException {
assertSame(searchShardTarget, searchContext.dfsResult().getSearchShardTarget());
assertSame(searchShardTarget, searchContext.queryResult().getSearchShardTarget());
assertSame(searchShardTarget, searchContext.fetchResult().getSearchShardTarget());
assertThat(service.getOpenScrollContexts(), equalTo(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -121,6 +122,9 @@ public void setUp() throws Exception {
@Override
public void tearDown() throws Exception {
logger.info("[{}#{}]: cleaning up after test", getTestClass().getSimpleName(), getTestName());
SearchService searchService = getInstanceFromNode(SearchService.class);
assertThat(searchService.getActiveContexts(), equalTo(0));
assertThat(searchService.getOpenScrollContexts(), equalTo(0));
super.tearDown();
assertAcked(client().admin().indices().prepareDelete("*").get());
MetaData metaData = client().admin().cluster().prepareState().get().getState().getMetaData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2355,6 +2355,7 @@ public void ensureEstimatedStats() {
public void assertAfterTest() throws IOException {
super.assertAfterTest();
assertRequestsFinished();
assertSearchContextsReleased();
for (NodeAndClient nodeAndClient : nodes.values()) {
NodeEnvironment env = nodeAndClient.node().getNodeEnvironment();
Set<ShardId> shardIds = env.lockedShards();
Expand Down Expand Up @@ -2388,4 +2389,18 @@ private void assertRequestsFinished() {
}
}
}

private void assertSearchContextsReleased() {
for (NodeAndClient nodeAndClient : nodes.values()) {
SearchService searchService = getInstance(SearchService.class, nodeAndClient.name);
try {
assertBusy(() -> {
assertThat(searchService.getActiveContexts(), equalTo(0));
assertThat(searchService.getOpenScrollContexts(), equalTo(0));
});
} catch (Exception e) {
throw new AssertionError("Failed to verify search contexts", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,22 @@ public void testCloseFreezeAndOpen() throws ExecutionException, InterruptedExcep
// now scroll
SearchResponse searchResponse = client().prepareSearch().setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED)
.setScroll(TimeValue.timeValueMinutes(1)).setSize(1).get();
do {
assertHitCount(searchResponse, 3);
assertEquals(1, searchResponse.getHits().getHits().length);
SearchService searchService = getInstanceFromNode(SearchService.class);
assertThat(searchService.getActiveContexts(), Matchers.greaterThanOrEqualTo(1));
for (int i = 0; i < 2; i++) {
shard = indexService.getShard(i);
engine = IndexShardTestCase.getEngine(shard);
assertFalse(((FrozenEngine) engine).isReaderOpen());
}
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).get();
} while (searchResponse.getHits().getHits().length > 0);
try {
do {
assertHitCount(searchResponse, 3);
assertEquals(1, searchResponse.getHits().getHits().length);
SearchService searchService = getInstanceFromNode(SearchService.class);
assertThat(searchService.getActiveContexts(), Matchers.greaterThanOrEqualTo(1));
for (int i = 0; i < 2; i++) {
shard = indexService.getShard(i);
engine = IndexShardTestCase.getEngine(shard);
assertFalse(((FrozenEngine) engine).isReaderOpen());
}
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).get();
} while (searchResponse.getHits().getHits().length > 0);
} finally {
client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get();
}
}

public void testSearchAndGetAPIsAreThrottled() throws InterruptedException, IOException, ExecutionException {
Expand Down

0 comments on commit f14e5f4

Please sign in to comment.