Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Cancellable DirectoryReader #52822

Merged
merged 32 commits into from
Mar 5, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e2ebfb4
Implement Cancellable DirectoryReader
matriv Feb 25, 2020
c890142
fix compilation
matriv Feb 26, 2020
e38cfa0
Merge remote-tracking branch 'upstream/master' into impl-cancellable
matriv Feb 26, 2020
73b0e6d
Address comments
matriv Feb 26, 2020
ffdf6d2
Fix behaviour by properly handling cancellable.run()
matriv Feb 26, 2020
add7dd4
split query timeout and cancellation to be able to unset query timeout
matriv Feb 27, 2020
d74edb2
Merge remote-tracking branch 'upstream/master' into impl-cancellable
matriv Feb 27, 2020
331411b
Custom implementation of ExitableDirReader to overcome casting
matriv Feb 27, 2020
d10c51a
revert changes
matriv Feb 27, 2020
e5fdf47
revert
matriv Feb 27, 2020
248ee51
revert unrelated changes
matriv Feb 27, 2020
bc85193
Merge remote-tracking branch 'upstream/master' into impl-cancellable
matriv Feb 27, 2020
ce2d557
Fix issues after splitting to cancelation and timeout
matriv Feb 28, 2020
0012e3a
Add unit test
matriv Feb 28, 2020
4c3183f
re-enable MultiReader Tests, fix cancellation tests
matriv Feb 28, 2020
8b38977
fix checkstyle - enhance comments
matriv Feb 28, 2020
9ebd847
Fix NPE
matriv Feb 28, 2020
0bf64f0
address comments
matriv Feb 29, 2020
3936a05
fix revert
matriv Feb 29, 2020
9bf0fe3
fix revert
matriv Feb 29, 2020
ce51935
move impl to QueryPhase
matriv Feb 29, 2020
9695114
unit test wrapping and exit
matriv Mar 1, 2020
d562cf1
address comments
matriv Mar 3, 2020
6243ced
Merge remote-tracking branch 'upstream/master' into impl-cancellable
matriv Mar 3, 2020
19bdbdf
improve tests
matriv Mar 3, 2020
b446dfd
extract wrapper classes to another file, use more elegant approach fo…
matriv Mar 4, 2020
183da17
Merge remote-tracking branch 'upstream/master' into impl-cancellable
matriv Mar 4, 2020
a912fa3
rename method, add more tests
matriv Mar 4, 2020
23c3adc
replace QueryTimeout with our own iface
matriv Mar 4, 2020
087f2ad
document iface
matriv Mar 4, 2020
df0da4c
Merge remote-tracking branch 'upstream/master' into impl-cancellable
matriv Mar 4, 2020
eb158e5
fix comment
matriv Mar 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ final class DefaultSearchContext extends SearchContext {
DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService,
IndexShard indexShard, BigArrays bigArrays, LongSupplier relativeTimeSupplier, TimeValue timeout,
FetchPhase fetchPhase) {
FetchPhase fetchPhase) throws IOException {
this.id = id;
this.request = request;
this.fetchPhase = fetchPhase;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,8 @@ private DefaultSearchContext createSearchContext(SearchRewriteContext rewriteCon
searcher, clusterService, indexService, indexShard, bigArrays, threadPool::relativeTimeInMillis, timeout, fetchPhase);
success = true;
return searchContext;
} catch (IOException e) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this leniency looks dangerous?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I simply propagate the exception? or any other suggestion?

} finally {
if (success == false) {
// we handle the case where `IndicesService#indexServiceSafe`or `IndexService#getShard`, or the DefaultSearchContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,18 @@

package org.elasticsearch.search.internal;

import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.ExitableDirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.QueryTimeout;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BulkScorer;
import org.apache.lucene.search.CollectionStatistics;
Expand Down Expand Up @@ -64,6 +73,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.function.Supplier;

/**
* Context-aware extension of {@link IndexSearcher}.
Expand All @@ -79,8 +89,18 @@ public class ContextIndexSearcher extends IndexSearcher {
private QueryProfiler profiler;
private Runnable checkCancelled;

public ContextIndexSearcher(IndexReader reader, Similarity similarity, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy) {
super(reader);
public ContextIndexSearcher(IndexReader reader, Similarity similarity,
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy) throws IOException {
this(reader, similarity, queryCache, queryCachingPolicy, true);
}

public ContextIndexSearcher(IndexReader reader, Similarity similarity,
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
boolean shouldWrap) throws IOException {
super(shouldWrap? new CancellableIndexReader((DirectoryReader) reader, new Cancellable()) : reader);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CancellableIndexReader shouldn't have any overhead, so it might be simpler to wrap all the time here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This boolean was added together with a second constructor because of the AggregatorTestCase and hundreds of tests that derive from that. If we wrap the IndexReader we get:

java.lang.AssertionError: The top-reader used to create Weight is not the same as the current reader's top-reader (org.apache.lucene.index.CompositeReaderContext@382edaaa

which I tried to fix by changing the AggregatorTestCase to receive IndexReader and not IndexSearcher as an argument. and all the tests to use the derived IndexSearcher from the context created. But even with this there were a few more tests failing that didn't manage to fix, so after discussion with @jimczi we decided to make this workaround for the moment and address the issue in a separate PR afterwards.

I can add a TODO though to not miss it.

if (shouldWrap) {
((CancellableIndexReader) getIndexReader()).setCheckCancelled(() -> checkCancelled);
}
setSimilarity(similarity);
setQueryCache(queryCache);
setQueryCachingPolicy(queryCachingPolicy);
Expand Down Expand Up @@ -320,4 +340,99 @@ public DirectoryReader getDirectoryReader() {
assert reader instanceof DirectoryReader : "expected an instance of DirectoryReader, got " + reader.getClass();
return (DirectoryReader) reader;
}

/**
* Wraps an {@link IndexReader} with a cancellation Runnable task.
*/
private static class CancellableIndexReader extends FilterDirectoryReader {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call it CancellableDirectoryReader if it extends DirectoryReader.


private final Cancellable checkCancelled;

private CancellableIndexReader(DirectoryReader in, Cancellable checkCancelled) throws IOException {
super(in, new SubReaderWrapper() {
@Override
public LeafReader wrap(LeafReader reader) {
return new CancellableLeafReader(reader, checkCancelled);
}
});
this.checkCancelled = checkCancelled;
}

private void setCheckCancelled(Supplier<Runnable> checkCancelled) {
this.checkCancelled.setCancellable(checkCancelled);
}

@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) {
return in;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's throw an UnsupportedOperationException? (this is only used when asking a DirectoryReader to take into account some new changes in a directory, which should never happen with this impl)

}

@Override
public CacheHelper getReaderCacheHelper() {
return in.getReaderCacheHelper();
}
}

/**
* Wraps a leaf reader with a cancellable task
*/
private static class CancellableLeafReader extends ExitableDirectoryReader.ExitableFilterAtomicReader {

private CancellableLeafReader(LeafReader leafReader, Cancellable checkCancelled) {
super(leafReader, checkCancelled);
}

@Override
public NumericDocValues getNumericDocValues(String field) throws IOException {
return in.getNumericDocValues(field);
}

@Override
public BinaryDocValues getBinaryDocValues(String field) throws IOException {
return in.getBinaryDocValues(field);
}

@Override
public SortedDocValues getSortedDocValues(String field) throws IOException {
return in.getSortedDocValues(field);
}

@Override
public SortedNumericDocValues getSortedNumericDocValues(String field) throws IOException {
return in.getSortedNumericDocValues(field);
}

@Override
public SortedSetDocValues getSortedSetDocValues(String field) throws IOException {
return in.getSortedSetDocValues(field);
}
}

/**
* Implementation of {@link QueryTimeout} with a Runnable task.
*/
private static class Cancellable implements QueryTimeout {

private Supplier<Runnable> cancellable;

public void setCancellable(Supplier<Runnable> cancellable) {
this.cancellable = cancellable;
}

@Override
public boolean shouldExit() {
assert cancellable != null : "checkCancelled must be set immediately after the construction of CancellableIndexReader";
if (cancellable.get() == null) {
return false;
}
cancellable.get().run();
return false;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation feels a bit awkward, I'd rather like to fork ExitableDirectoryReader entirely to not inherit from its QueryTimeout abstraction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that in the 1st approach but this means we have to copy the whole ExitablePointValues to wrap the point values and therefore the ExitableIntersectVisitor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind copying it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to copy


@Override
public boolean isTimeoutEnabled() {
assert cancellable != null : "checkCancelled must be set immediately after the construction of CancellableIndexReader";
return cancellable.get() != null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public void testReplay() throws Exception {
TopDocs topDocs = indexSearcher.search(termQuery, numDocs);

SearchContext searchContext = createSearchContext(indexSearcher, createIndexSettings(), rewrittenQuery, null);
indexSearcher = searchContext.searcher();
when(searchContext.query()).thenReturn(rewrittenQuery);
BestBucketsDeferringCollector collector = new BestBucketsDeferringCollector(searchContext, false) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testRandom() throws Exception {
indexWriter.close();

IndexReader indexReader = DirectoryReader.open(directory);
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
IndexSearcher indexSearcher = newIndexSearcher(indexReader);
try {

int value = randomInt(maxTerm - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ private void testCase(Query query, int precision, GeoBoundingBox geoBoundingBox,
indexWriter.close();

IndexReader indexReader = DirectoryReader.open(directory);
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
IndexSearcher indexSearcher = newIndexSearcher(indexReader);

aggregationBuilder.precision(precision);
if (geoBoundingBox != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1235,7 +1235,7 @@ private void executeTestCase(boolean reduced,
}

try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
IndexSearcher indexSearcher = newIndexSearcher(indexReader);

DateHistogramAggregationBuilder aggregationBuilder = new DateHistogramAggregationBuilder("_name");
if (configure != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ private void executeTestCase(int numDocs,
}

try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher =
newSearcher(indexReader, true, true);
IndexSearcher indexSearcher = newIndexSearcher(indexReader);
MissingAggregationBuilder builder = new MissingAggregationBuilder("_name", null);
builder.field(fieldName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ private void verifyAvgOfDoubles(double[] values, double expected, double delta)
);
}

@AwaitsFix(bugUrl = "Replace with integration test")
public void testSingleValuedFieldPartiallyUnmapped() throws IOException {
Directory directory = newDirectory();
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
Expand Down Expand Up @@ -537,7 +538,7 @@ public void testOrderByEmptyAggregation() throws IOException {
indexWriter.close();

IndexReader indexReader = DirectoryReader.open(directory);
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
IndexSearcher indexSearcher = newIndexSearcher(indexReader);

TermsAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
aggregator.preCollection();
Expand Down Expand Up @@ -587,7 +588,7 @@ private void testCase(AvgAggregationBuilder aggregationBuilder, Query query,
indexWriter.close();

IndexReader indexReader = DirectoryReader.open(directory);
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
IndexSearcher indexSearcher = newIndexSearcher(indexReader);

AvgAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
aggregator.preCollection();
Expand All @@ -611,14 +612,8 @@ public void testCacheAggregation() throws IOException {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you revert this change ? It should work without this modification so I'd like to keep this for a different pr since the issue is not related to the exitable directory reader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.

indexWriter.close();

Directory unmappedDirectory = newDirectory();
RandomIndexWriter unmappedIndexWriter = new RandomIndexWriter(random(), unmappedDirectory);
unmappedIndexWriter.close();

IndexReader indexReader = DirectoryReader.open(directory);
IndexReader unamappedIndexReader = DirectoryReader.open(unmappedDirectory);
MultiReader multiReader = new MultiReader(indexReader, unamappedIndexReader);
IndexSearcher indexSearcher = newSearcher(multiReader, true, true);
IndexSearcher indexSearcher = newIndexSearcher(indexReader);

MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.INTEGER);
fieldType.setName("value");
Expand All @@ -639,9 +634,8 @@ public void testCacheAggregation() throws IOException {
// Test that an aggregation not using a script does get cached
assertTrue(aggregator.context().getQueryShardContext().isCacheable());

multiReader.close();
indexReader.close();
directory.close();
unmappedDirectory.close();
}

/**
Expand All @@ -657,14 +651,8 @@ public void testScriptCaching() throws IOException {
}
indexWriter.close();

Directory unmappedDirectory = newDirectory();
RandomIndexWriter unmappedIndexWriter = new RandomIndexWriter(random(), unmappedDirectory);
unmappedIndexWriter.close();

IndexReader indexReader = DirectoryReader.open(directory);
IndexReader unamappedIndexReader = DirectoryReader.open(unmappedDirectory);
MultiReader multiReader = new MultiReader(indexReader, unamappedIndexReader);
IndexSearcher indexSearcher = newSearcher(multiReader, true, true);
IndexSearcher indexSearcher = newIndexSearcher(indexReader);

MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.INTEGER);
fieldType.setName("value");
Expand Down Expand Up @@ -705,8 +693,7 @@ public void testScriptCaching() throws IOException {
// Test that an aggregation using a nondeterministic script does not get cached
assertFalse(aggregator.context().getQueryShardContext().isCacheable());

multiReader.close();
indexReader.close();
directory.close();
unmappedDirectory.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.lucene.document.Document;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MultiReader;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
Expand Down Expand Up @@ -62,13 +61,19 @@ public void testEmpty() throws IOException {
.method(PercentilesMethod.HDR);
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.DOUBLE);
fieldType.setName("field");
try (IndexReader reader = new MultiReader()) {

Directory directory = newDirectory();
RandomIndexWriter unmappedIndexWriter = new RandomIndexWriter(random(), directory);
try (IndexReader reader = unmappedIndexWriter.getReader()) {
IndexSearcher searcher = new IndexSearcher(reader);
PercentileRanks ranks = search(searcher, new MatchAllDocsQuery(), aggBuilder, fieldType);
Percentile rank = ranks.iterator().next();
assertEquals(Double.NaN, rank.getPercent(), 0d);
assertEquals(0.5, rank.getValue(), 0d);
assertFalse(AggregationInspectionHelper.hasValue((InternalHDRPercentileRanks)ranks));
} finally {
unmappedIndexWriter.close();
directory.close();
}
}

Expand Down Expand Up @@ -106,13 +111,13 @@ public void testSimple() throws IOException {
}
}

public void testNullValues() throws IOException {
public void testNullValues() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> new PercentileRanksAggregationBuilder("my_agg", null).field("field").method(PercentilesMethod.HDR));
assertThat(e.getMessage(), Matchers.equalTo("[values] must not be null: [my_agg]"));
}

public void testEmptyValues() throws IOException {
public void testEmptyValues() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> new PercentileRanksAggregationBuilder("my_agg", new double[0]).field("field").method(PercentilesMethod.HDR));

Expand Down
Loading