Skip to content

Commit

Permalink
Fix concurrent search and index delete (#42621)
Browse files Browse the repository at this point in the history
Changed order of listener invocation so that we notify before
registering search context and notify after unregistering same.

This ensures that count up/down like what we do in ShardSearchStats
works. Otherwise, we risk notifying onFreeScrollContext before notifying
onNewScrollContext (same for onFreeContext/onNewContext, but we
currently have no assertions failing in those).

Closes #28053
  • Loading branch information
henningandersen committed Jun 6, 2019
1 parent 2de919e commit ca5dbf9
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 12 deletions.
41 changes: 31 additions & 10 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Expand Up @@ -548,19 +548,35 @@ final SearchContext createAndPutContext(ShardSearchRequest request) throws IOExc
}

SearchContext context = createContext(request);
onNewContext(context);
boolean success = false;
try {
putContext(context);
if (request.scroll() != null) {
success = true;
return context;
} finally {
if (success == false) {
freeContext(context.id());
}
}
}

private void onNewContext(SearchContext context) {
boolean success = false;
try {
if (context.scrollContext() != null) {
openScrollContexts.incrementAndGet();
context.indexShard().getSearchOperationListener().onNewScrollContext(context);
}
context.indexShard().getSearchOperationListener().onNewContext(context);
success = true;
return context;
} finally {
if (!success) {
freeContext(context.id());
// currently, the concrete listener is CompositeListener, which swallows exceptions, but here we anyway try to do the
// right thing by closing and notifying onFreeXXX in case one of the listeners fails with an exception in the future.
if (success == false) {
try (SearchContext dummy = context) {
onFreeContext(context);
}
}
}
}
Expand Down Expand Up @@ -648,18 +664,23 @@ private void freeAllContextForIndex(Index index) {
public boolean freeContext(long id) {
try (SearchContext context = removeContext(id)) {
if (context != null) {
assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount();
context.indexShard().getSearchOperationListener().onFreeContext(context);
if (context.scrollContext() != null) {
openScrollContexts.decrementAndGet();
context.indexShard().getSearchOperationListener().onFreeScrollContext(context);
}
onFreeContext(context);
return true;
}
return false;
}
}

private void onFreeContext(SearchContext context) {
assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount();
assert activeContexts.containsKey(context.id()) == false;
context.indexShard().getSearchOperationListener().onFreeContext(context);
if (context.scrollContext() != null) {
openScrollContexts.decrementAndGet();
context.indexShard().getSearchOperationListener().onFreeScrollContext(context);
}
}

public void freeAllScrollContexts() {
for (SearchContext searchContext : activeContexts.values()) {
if (searchContext.scrollContext() != null) {
Expand Down
Expand Up @@ -19,7 +19,6 @@
package org.elasticsearch.search;

import com.carrotsearch.hppc.IntArrayList;

import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
Expand Down Expand Up @@ -51,6 +50,7 @@
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -226,6 +226,7 @@ public void testSearchWhileIndexDeleted() throws InterruptedException {
AtomicBoolean running = new AtomicBoolean(true);
CountDownLatch startGun = new CountDownLatch(1);
Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);

final Thread thread = new Thread() {
@Override
public void run() {
Expand Down Expand Up @@ -261,12 +262,16 @@ public void onFailure(Exception e) {
try {
final int rounds = scaledRandomIntBetween(100, 10000);
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true);
SearchRequest scrollSearchRequest = new SearchRequest().allowPartialSearchResults(true)
.scroll(new Scroll(TimeValue.timeValueMinutes(1)));
for (int i = 0; i < rounds; i++) {
try {
try {
PlainActionFuture<SearchPhaseResult> result = new PlainActionFuture<>();
final boolean useScroll = randomBoolean();
service.executeQueryPhase(
new ShardSearchLocalRequest(searchRequest, indexShard.shardId(), 1,
new ShardSearchLocalRequest(useScroll ? scrollSearchRequest : searchRequest,
indexShard.shardId(), 1,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null),
new SearchTask(123L, "", "", "", null, Collections.emptyMap()), result);
SearchPhaseResult searchPhaseResult = result.get();
Expand All @@ -276,6 +281,9 @@ public void onFailure(Exception e) {
PlainActionFuture<FetchSearchResult> listener = new PlainActionFuture<>();
service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap()), listener);
listener.get();
if (useScroll) {
service.freeContext(searchPhaseResult.getRequestId());
}
} catch (ExecutionException ex) {
assertThat(ex.getCause(), instanceOf(RuntimeException.class));
throw ((RuntimeException)ex.getCause());
Expand All @@ -293,6 +301,13 @@ public void onFailure(Exception e) {
thread.join();
semaphore.acquire(Integer.MAX_VALUE);
}

assertEquals(0, service.getActiveContexts());

SearchStats.Stats totalStats = indexShard.searchStats().getTotal();
assertEquals(0, totalStats.getQueryCurrent());
assertEquals(0, totalStats.getScrollCurrent());
assertEquals(0, totalStats.getFetchCurrent());
}

public void testTimeout() throws IOException {
Expand Down

0 comments on commit ca5dbf9

Please sign in to comment.