-
Notifications
You must be signed in to change notification settings - Fork 24.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Cancellable DirectoryReader #52822
Changes from 29 commits
e2ebfb4
c890142
e38cfa0
73b0e6d
ffdf6d2
add7dd4
d74edb2
331411b
d10c51a
e5fdf47
248ee51
bc85193
ce2d557
0012e3a
4c3183f
8b38977
9ebd847
0bf64f0
3936a05
9bf0fe3
ce51935
9695114
d562cf1
6243ced
19bdbdf
b446dfd
183da17
a912fa3
23c3adc
087f2ad
df0da4c
eb158e5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,7 +62,9 @@ | |
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
|
||
/** | ||
|
@@ -77,25 +79,46 @@ public class ContextIndexSearcher extends IndexSearcher { | |
|
||
private AggregatedDfs aggregatedDfs; | ||
private QueryProfiler profiler; | ||
private Runnable checkCancelled; | ||
private MutableQueryTimeout cancellable; | ||
|
||
public ContextIndexSearcher(IndexReader reader, Similarity similarity, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy) { | ||
super(reader); | ||
public ContextIndexSearcher(IndexReader reader, Similarity similarity, | ||
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy) throws IOException { | ||
this(reader, similarity, queryCache, queryCachingPolicy, new MutableQueryTimeout()); | ||
} | ||
|
||
// TODO: Make the 2nd constructor private so that the IndexReader is always wrapped. | ||
// Some issues must be fixed: | ||
// - regarding tests deriving from AggregatorTestCase and more specifically the use of searchAndReduce and | ||
// the ShardSearcher sub-searchers. | ||
// - tests that use a MultiReader | ||
public ContextIndexSearcher(IndexReader reader, Similarity similarity, | ||
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, | ||
MutableQueryTimeout cancellable) throws IOException { | ||
super(cancellable != null ? new ExitableDirectoryReader((DirectoryReader) reader, cancellable) : reader); | ||
setSimilarity(similarity); | ||
setQueryCache(queryCache); | ||
setQueryCachingPolicy(queryCachingPolicy); | ||
this.cancellable = cancellable != null ? cancellable : new MutableQueryTimeout(); | ||
} | ||
|
||
public void setProfiler(QueryProfiler profiler) { | ||
this.profiler = profiler; | ||
} | ||
|
||
/** | ||
* Set a {@link Runnable} that will be run on a regular basis while | ||
* collecting documents. | ||
* Add a {@link Runnable} that will be run on a regular basis while fetching document from the | ||
* DirectoryReader but also while collecting them and check for query cancellation or timeout. | ||
*/ | ||
public Runnable addQueryCancellation(Runnable action) { | ||
jimczi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return this.cancellable.add(action); | ||
} | ||
|
||
/** | ||
* Remove a {@link Runnable} that checks for query cancellation or timeout | ||
* which is called while fetching documents from the DirectoryReader but also while collecting them. | ||
*/ | ||
public void setCheckCancelled(Runnable checkCancelled) { | ||
this.checkCancelled = checkCancelled; | ||
public void removeQueryCancellation(Runnable action) { | ||
this.cancellable.remove(action); | ||
} | ||
|
||
public void setAggregatedDfs(AggregatedDfs aggregatedDfs) { | ||
|
@@ -139,12 +162,6 @@ public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws | |
} | ||
} | ||
|
||
private void checkCancelled() { | ||
if (checkCancelled != null) { | ||
checkCancelled.run(); | ||
} | ||
} | ||
|
||
@SuppressWarnings({"unchecked", "rawtypes"}) | ||
public void search(List<LeafReaderContext> leaves, Weight weight, CollectorManager manager, | ||
QuerySearchResult result, DocValueFormat[] formats, TotalHits totalHits) throws IOException { | ||
|
@@ -180,7 +197,7 @@ protected void search(List<LeafReaderContext> leaves, Weight weight, Collector c | |
* the provided <code>ctx</code>. | ||
*/ | ||
private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException { | ||
checkCancelled(); | ||
cancellable.checkCancelled(); | ||
weight = wrapWeight(weight); | ||
final LeafCollector leafCollector; | ||
try { | ||
|
@@ -208,7 +225,7 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto | |
if (scorer != null) { | ||
try { | ||
intersectScorerAndBitSet(scorer, liveDocsBitSet, leafCollector, | ||
checkCancelled == null ? () -> { } : checkCancelled); | ||
this.cancellable.isEnabled() ? cancellable::checkCancelled: () -> {}); | ||
} catch (CollectionTerminatedException e) { | ||
// collection was terminated prematurely | ||
// continue with the following leaf | ||
|
@@ -218,7 +235,7 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto | |
} | ||
|
||
private Weight wrapWeight(Weight weight) { | ||
if (checkCancelled != null) { | ||
if (cancellable.isEnabled()) { | ||
return new Weight(weight.getQuery()) { | ||
@Override | ||
public void extractTerms(Set<Term> terms) { | ||
|
@@ -244,7 +261,7 @@ public Scorer scorer(LeafReaderContext context) throws IOException { | |
public BulkScorer bulkScorer(LeafReaderContext context) throws IOException { | ||
BulkScorer in = weight.bulkScorer(context); | ||
if (in != null) { | ||
return new CancellableBulkScorer(in, checkCancelled); | ||
return new CancellableBulkScorer(in, cancellable::checkCancelled); | ||
} else { | ||
return null; | ||
} | ||
|
@@ -320,4 +337,33 @@ public DirectoryReader getDirectoryReader() { | |
assert reader instanceof DirectoryReader : "expected an instance of DirectoryReader, got " + reader.getClass(); | ||
return (DirectoryReader) reader; | ||
} | ||
|
||
private static class MutableQueryTimeout implements ExitableDirectoryReader.QueryCancellation { | ||
|
||
private final Set<Runnable> runnables = new HashSet<>(); | ||
|
||
private Runnable add(Runnable action) { | ||
Objects.requireNonNull(action, "cancellation runnable should not be null"); | ||
if (runnables.add(action) == false) { | ||
throw new IllegalArgumentException("Cancellation runnable already added"); | ||
} | ||
return action; | ||
} | ||
|
||
private void remove(Runnable action) { | ||
runnables.remove(action); | ||
} | ||
|
||
@Override | ||
public void checkCancelled() { | ||
for (Runnable timeout : runnables) { | ||
timeout.run(); | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This implementation feels a bit awkward, I'd rather like to fork ExitableDirectoryReader entirely to not inherit from its QueryTimeout abstraction. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried that in the 1st approach but this means we have to copy the whole There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't mind copying it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 to copy |
||
|
||
@Override | ||
public boolean isEnabled() { | ||
return runnables.isEmpty() == false; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s/while accessing/while fetching document/ this is confusing since we don't check the cancellation when visiting the stored fields ?