Skip to content

Commit

Permalink
Fix concurrent search and index delete (elastic#42621)
Browse files Browse the repository at this point in the history
Changed order of listener invocation so that we notify before
registering search context and notify after unregistering same.

This ensures that count up/down like what we do in ShardSearchStats
works. Otherwise, we risk notifying onFreeScrollContext before notifying
onNewScrollContext (same for onFreeContext/onNewContext, but we
currently have no assertions failing in those).

Closes elastic#28053
  • Loading branch information
henningandersen committed Jun 7, 2019
1 parent 04a7a19 commit 4287010
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 14 deletions.
41 changes: 31 additions & 10 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Expand Up @@ -594,19 +594,35 @@ final SearchContext createAndPutContext(ShardSearchRequest request) throws IOExc
}

SearchContext context = createContext(request);
onNewContext(context);
boolean success = false;
try {
putContext(context);
if (request.scroll() != null) {
success = true;
return context;
} finally {
if (success == false) {
freeContext(context.id());
}
}
}

private void onNewContext(SearchContext context) {
boolean success = false;
try {
if (context.scrollContext() != null) {
openScrollContexts.incrementAndGet();
context.indexShard().getSearchOperationListener().onNewScrollContext(context);
}
context.indexShard().getSearchOperationListener().onNewContext(context);
success = true;
return context;
} finally {
if (!success) {
freeContext(context.id());
// currently, the concrete listener is CompositeListener, which swallows exceptions, but here we anyway try to do the
// right thing by closing and notifying onFreeXXX in case one of the listeners fails with an exception in the future.
if (success == false) {
try (SearchContext dummy = context) {
onFreeContext(context);
}
}
}
}
Expand Down Expand Up @@ -693,13 +709,8 @@ private void freeAllContextForIndex(Index index) {
public boolean freeContext(long id) {
final SearchContext context = removeContext(id);
if (context != null) {
assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount();
try {
context.indexShard().getSearchOperationListener().onFreeContext(context);
if (context.scrollContext() != null) {
openScrollContexts.decrementAndGet();
context.indexShard().getSearchOperationListener().onFreeScrollContext(context);
}
onFreeContext(context);
} finally {
context.close();
}
Expand All @@ -708,6 +719,16 @@ public boolean freeContext(long id) {
return false;
}

private void onFreeContext(SearchContext context) {
assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount();
assert activeContexts.containsKey(context.id()) == false;
context.indexShard().getSearchOperationListener().onFreeContext(context);
if (context.scrollContext() != null) {
openScrollContexts.decrementAndGet();
context.indexShard().getSearchOperationListener().onFreeScrollContext(context);
}
}

public void freeAllScrollContexts() {
for (SearchContext searchContext : activeContexts.values()) {
if (searchContext.scrollContext() != null) {
Expand Down
Expand Up @@ -19,7 +19,6 @@
package org.elasticsearch.search;

import com.carrotsearch.hppc.IntArrayList;

import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
Expand Down Expand Up @@ -52,6 +51,7 @@
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -229,6 +229,7 @@ public void testSearchWhileIndexDeleted() throws InterruptedException {
AtomicBoolean running = new AtomicBoolean(true);
CountDownLatch startGun = new CountDownLatch(1);
Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);

final Thread thread = new Thread() {
@Override
public void run() {
Expand Down Expand Up @@ -267,10 +268,17 @@ public void onFailure(Exception e) {
try {
try {
PlainActionFuture<SearchPhaseResult> result = new PlainActionFuture<>();
service.executeQueryPhase(
new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
final boolean useScroll = randomBoolean();
ShardSearchLocalRequest shardRequest;
if (useScroll) {
shardRequest = new ShardScrollRequestTest(indexShard.shardId());
} else {
shardRequest = new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f,
true, null, null),
true, null, null);
}
service.executeQueryPhase(
shardRequest,
new SearchTask(123L, "", "", "", null, Collections.emptyMap()), result);
SearchPhaseResult searchPhaseResult = result.get();
IntArrayList intCursors = new IntArrayList(1);
Expand All @@ -279,6 +287,9 @@ public void onFailure(Exception e) {
PlainActionFuture<FetchSearchResult> listener = new PlainActionFuture<>();
service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap()), listener);
listener.get();
if (useScroll) {
service.freeContext(searchPhaseResult.getRequestId());
}
} catch (ExecutionException ex) {
assertThat(ex.getCause(), instanceOf(RuntimeException.class));
throw ((RuntimeException)ex.getCause());
Expand All @@ -296,6 +307,13 @@ public void onFailure(Exception e) {
thread.join();
semaphore.acquire(Integer.MAX_VALUE);
}

assertEquals(0, service.getActiveContexts());

SearchStats.Stats totalStats = indexShard.searchStats().getTotal();
assertEquals(0, totalStats.getQueryCurrent());
assertEquals(0, totalStats.getScrollCurrent());
assertEquals(0, totalStats.getFetchCurrent());
}

public void testTimeout() throws IOException {
Expand Down

0 comments on commit 4287010

Please sign in to comment.