Skip to content

Commit

Permalink
Implement Exitable DirectoryReader (#52822)
Browse files Browse the repository at this point in the history
Implement an Exitable DirectoryReader that wraps the original
DirectoryReader so that when a search task is cancelled the
DirectoryReaders also stop their work fast. This is usuful for
expensive operations like wilcard/prefix queries where the
DirectoryReaders can spend lots of time and consume resources,
as previously their work wouldn't stop even though the original
search task was cancelled (e.g. because of timeout or dropped client
connection).
  • Loading branch information
matriv committed Mar 5, 2020
1 parent dcbc9d3 commit 67acaf6
Show file tree
Hide file tree
Showing 9 changed files with 520 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ final class DefaultSearchContext extends SearchContext {
DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService,
IndexShard indexShard, BigArrays bigArrays, LongSupplier relativeTimeSupplier, TimeValue timeout,
FetchPhase fetchPhase) {
FetchPhase fetchPhase) throws IOException {
this.id = id;
this.request = request;
this.fetchPhase = fetchPhase;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ && canRewriteToMatchNone(rewritten.source())
}, listener::onFailure));
}

private void onMatchNoDocs(SearchRewriteContext rewriteContext, ActionListener<SearchPhaseResult> listener) {
private void onMatchNoDocs(SearchRewriteContext rewriteContext, ActionListener<SearchPhaseResult> listener) throws IOException {
// creates a lightweight search context that we use to inform context listeners
// before closing
SearchContext searchContext = createSearchContext(rewriteContext, defaultSearchTimeout);
Expand Down Expand Up @@ -609,7 +609,7 @@ private SearchContext findContext(long id, TransportRequest request) throws Sear
}
}

final SearchContext createAndPutContext(SearchRewriteContext rewriteContext) {
final SearchContext createAndPutContext(SearchRewriteContext rewriteContext) throws IOException {
SearchContext context = createContext(rewriteContext);
onNewContext(context);
boolean success = false;
Expand Down Expand Up @@ -644,7 +644,7 @@ private void onNewContext(SearchContext context) {
}
}

final SearchContext createContext(SearchRewriteContext rewriteContext) {
final SearchContext createContext(SearchRewriteContext rewriteContext) throws IOException {
final DefaultSearchContext context = createSearchContext(rewriteContext, defaultSearchTimeout);
try {
if (rewriteContext.request != null && openScrollContexts.get() >= maxOpenScrollContext) {
Expand Down Expand Up @@ -695,7 +695,7 @@ public DefaultSearchContext createSearchContext(ShardSearchRequest request, Time
return createSearchContext(rewriteContext.wrapSearcher(), timeout);
}

private DefaultSearchContext createSearchContext(SearchRewriteContext rewriteContext, TimeValue timeout) {
private DefaultSearchContext createSearchContext(SearchRewriteContext rewriteContext, TimeValue timeout) throws IOException {
boolean success = false;
try {
final ShardSearchRequest request = rewriteContext.request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;

/**
Expand All @@ -77,25 +79,46 @@ public class ContextIndexSearcher extends IndexSearcher {

private AggregatedDfs aggregatedDfs;
private QueryProfiler profiler;
private Runnable checkCancelled;
private MutableQueryTimeout cancellable;

public ContextIndexSearcher(IndexReader reader, Similarity similarity, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy) {
super(reader);
public ContextIndexSearcher(IndexReader reader, Similarity similarity,
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy) throws IOException {
this(reader, similarity, queryCache, queryCachingPolicy, new MutableQueryTimeout());
}

// TODO: Make the 2nd constructor private so that the IndexReader is always wrapped.
// Some issues must be fixed:
// - regarding tests deriving from AggregatorTestCase and more specifically the use of searchAndReduce and
// the ShardSearcher sub-searchers.
// - tests that use a MultiReader
public ContextIndexSearcher(IndexReader reader, Similarity similarity,
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
MutableQueryTimeout cancellable) throws IOException {
super(cancellable != null ? new ExitableDirectoryReader((DirectoryReader) reader, cancellable) : reader);
setSimilarity(similarity);
setQueryCache(queryCache);
setQueryCachingPolicy(queryCachingPolicy);
this.cancellable = cancellable != null ? cancellable : new MutableQueryTimeout();
}

public void setProfiler(QueryProfiler profiler) {
this.profiler = profiler;
}

/**
* Set a {@link Runnable} that will be run on a regular basis while
* collecting documents.
* Add a {@link Runnable} that will be run on a regular basis while accessing documents in the
* DirectoryReader but also while collecting them and check for query cancellation or timeout.
*/
public Runnable addQueryCancellation(Runnable action) {
return this.cancellable.add(action);
}

/**
* Remove a {@link Runnable} that checks for query cancellation or timeout
* which is called while accessing documents in the DirectoryReader but also while collecting them.
*/
public void setCheckCancelled(Runnable checkCancelled) {
this.checkCancelled = checkCancelled;
public void removeQueryCancellation(Runnable action) {
this.cancellable.remove(action);
}

public void setAggregatedDfs(AggregatedDfs aggregatedDfs) {
Expand Down Expand Up @@ -139,12 +162,6 @@ public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws
}
}

private void checkCancelled() {
if (checkCancelled != null) {
checkCancelled.run();
}
}

@SuppressWarnings({"unchecked", "rawtypes"})
public void search(List<LeafReaderContext> leaves, Weight weight, CollectorManager manager,
QuerySearchResult result, DocValueFormat[] formats, TotalHits totalHits) throws IOException {
Expand Down Expand Up @@ -180,7 +197,7 @@ protected void search(List<LeafReaderContext> leaves, Weight weight, Collector c
* the provided <code>ctx</code>.
*/
private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {
checkCancelled();
cancellable.checkCancelled();
weight = wrapWeight(weight);
final LeafCollector leafCollector;
try {
Expand Down Expand Up @@ -208,7 +225,7 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
if (scorer != null) {
try {
intersectScorerAndBitSet(scorer, liveDocsBitSet, leafCollector,
checkCancelled == null ? () -> { } : checkCancelled);
this.cancellable.isEnabled() ? cancellable::checkCancelled: () -> {});
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
Expand All @@ -218,7 +235,7 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
}

private Weight wrapWeight(Weight weight) {
if (checkCancelled != null) {
if (cancellable.isEnabled()) {
return new Weight(weight.getQuery()) {
@Override
public void extractTerms(Set<Term> terms) {
Expand All @@ -244,7 +261,7 @@ public Scorer scorer(LeafReaderContext context) throws IOException {
public BulkScorer bulkScorer(LeafReaderContext context) throws IOException {
BulkScorer in = weight.bulkScorer(context);
if (in != null) {
return new CancellableBulkScorer(in, checkCancelled);
return new CancellableBulkScorer(in, cancellable::checkCancelled);
} else {
return null;
}
Expand Down Expand Up @@ -320,4 +337,33 @@ public DirectoryReader getDirectoryReader() {
assert reader instanceof DirectoryReader : "expected an instance of DirectoryReader, got " + reader.getClass();
return (DirectoryReader) reader;
}

private static class MutableQueryTimeout implements ExitableDirectoryReader.QueryCancellation {

private final Set<Runnable> runnables = new HashSet<>();

private Runnable add(Runnable action) {
Objects.requireNonNull(action, "cancellation runnable should not be null");
if (runnables.add(action) == false) {
throw new IllegalArgumentException("Cancellation runnable already added");
}
return action;
}

private void remove(Runnable action) {
runnables.remove(action);
}

@Override
public void checkCancelled() {
for (Runnable timeout : runnables) {
timeout.run();
}
}

@Override
public boolean isEnabled() {
return runnables.isEmpty() == false;
}
}
}

0 comments on commit 67acaf6

Please sign in to comment.