Skip to content

Commit

Permalink
Fix concurrent requests race over scroll context limit (#53449)
Browse files Browse the repository at this point in the history
Concurrent search scroll requests can lead to more scroll contexts than the limit.
  • Loading branch information
dnhatn committed Mar 13, 2020
1 parent e1094ac commit 01af65d
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 5 deletions.
17 changes: 12 additions & 5 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -585,15 +585,24 @@ final SearchContext createAndPutContext(ShardSearchRequest request) throws IOExc
" not be allowed in the next major version by default. You can change the [" +
MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting to use a greater default value or lower the number of" +
" scrolls that you need to run in parallel.");
} else if (openScrollContexts.get() >= maxOpenScrollContext) {
}
if (openScrollContexts.incrementAndGet() > maxOpenScrollContext) {
openScrollContexts.decrementAndGet();
throw new ElasticsearchException(
"Trying to create too many scroll contexts. Must be less than or equal to: [" +
maxOpenScrollContext + "]. " + "This limit can be set by changing the ["
+ MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting.");
}
}

SearchContext context = createContext(request);
SearchContext context = null;
try {
context = createContext(request);
context.addReleasable(openScrollContexts::decrementAndGet, Lifetime.CONTEXT);
} finally {
if (context == null) {
openScrollContexts.decrementAndGet();
}
}
onNewContext(context);
boolean success = false;
try {
Expand All @@ -611,7 +620,6 @@ private void onNewContext(SearchContext context) {
boolean success = false;
try {
if (context.scrollContext() != null) {
openScrollContexts.incrementAndGet();
context.indexShard().getSearchOperationListener().onNewScrollContext(context);
}
context.indexShard().getSearchOperationListener().onNewContext(context);
Expand Down Expand Up @@ -724,7 +732,6 @@ private void onFreeContext(SearchContext context) {
assert activeContexts.containsKey(context.id()) == false;
context.indexShard().getSearchOperationListener().onFreeContext(context);
if (context.scrollContext() != null) {
openScrollContexts.decrementAndGet();
context.indexShard().getSearchOperationListener().onFreeScrollContext(context);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,52 @@ public void testMaxOpenScrollContexts() throws RuntimeException, IOException {
}
}

public void testOpenScrollContextsConcurrently() throws Exception {
final int maxScrollContexts = randomIntBetween(50, 200);
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SearchService.MAX_OPEN_SCROLL_CONTEXT.getKey(), maxScrollContexts)));
try {
createIndex("index");
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
final IndexShard indexShard = indicesService.indexServiceSafe(resolveIndex("index")).getShard(0);

final SearchService searchService = getInstanceFromNode(SearchService.class);
Thread[] threads = new Thread[randomIntBetween(2, 8)];
CountDownLatch latch = new CountDownLatch(threads.length);
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
latch.countDown();
try {
latch.await();
for (; ; ) {
try {
searchService.createAndPutContext(new ShardScrollRequestTest(indexShard.shardId()));
} catch (ElasticsearchException e) {
assertThat(e.getMessage(), equalTo(
"Trying to create too many scroll contexts. Must be less than or equal to: " +
"[" + maxScrollContexts + "]. " +
"This limit can be set by changing the [search.max_open_scroll_context] setting."));
return;
}
}
} catch (Exception e) {
throw new AssertionError(e);
}
});
threads[i].setName("elasticsearch[node_s_0][search]");
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
assertThat(searchService.getActiveContexts(), equalTo(maxScrollContexts));
searchService.freeAllScrollContexts();
} finally {
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().putNull(SearchService.MAX_OPEN_SCROLL_CONTEXT.getKey())));
}
}

public static class FailOnRewriteQueryPlugin extends Plugin implements SearchPlugin {
@Override
public List<QuerySpec<?>> getQueries() {
Expand Down

0 comments on commit 01af65d

Please sign in to comment.