Skip to content

Commit

Permalink
Return docs when using nested mappings in archive indices (#90585)
Browse files Browse the repository at this point in the history
Archive indices < 6.1 do not work together with nested fields. While the documentation makes sure not to claim support for nested fields, see https://www.elastic.co/guide/en/elasticsearch/reference/current/archive-indices.html#archive-indices-supported-field-types, it leads to a situation where the import still works yet none of the documents at all are returned by any query (not even match_all). This is because indices before 6.1 did not have primary terms, but the default search context in ES 8 adds a FieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME) filter to the query:

https://github.com/elastic/elasticsearch/blob/f56126089ca4db89b631901ad7cce0a8e10e2fe5/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java#L284

This PR fixes the issue by adding basic support for nested fields so that it at least allows queries that are not leveraging
the nested documents to work (i.e. allow extracting the documents e.g. to be reindexed).

Closes #90523
  • Loading branch information
ywelsch committed Oct 20, 2022
1 parent fd6c20f commit ddbd7a8
Show file tree
Hide file tree
Showing 30 changed files with 212 additions and 61 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/90585.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 90585
summary: Return docs when using nested mappings in archive indices
area: Search
type: enhancement
issues:
- 90523
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ Query percolateQuery(
}
Query filter = null;
if (excludeNestedDocuments) {
filter = Queries.newNonNestedFilter();
filter = Queries.newNonNestedFilter(indexVersion);
}
return new PercolateQuery(name, queryStore, documents, candidateQuery, searcher, filter, verifiedMatchesQuery);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.BitSetIterator;
import org.elasticsearch.Version;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.search.fetch.FetchContext;
Expand Down Expand Up @@ -51,7 +52,9 @@ public FetchSubPhaseProcessor getProcessor(FetchContext fetchContext) throws IOE
List<PercolateQuery> percolateQueries = locatePercolatorQuery(fetchContext.query());
boolean singlePercolateQuery = percolateQueries.size() == 1;
for (PercolateQuery pq : percolateQueries) {
percolateContexts.add(new PercolateContext(pq, singlePercolateQuery));
percolateContexts.add(
new PercolateContext(pq, singlePercolateQuery, fetchContext.getSearchExecutionContext().indexVersionCreated())
);
}
if (percolateContexts.isEmpty()) {
return null;
Expand All @@ -75,7 +78,7 @@ public void process(HitContext hitContext) throws IOException {
// This is not a document with a percolator field.
continue;
}
query = pc.filterNestedDocs(query);
query = pc.filterNestedDocs(query, fetchContext.getSearchExecutionContext().indexVersionCreated());
IndexSearcher percolatorIndexSearcher = pc.percolateQuery.getPercolatorIndexSearcher();
int memoryIndexMaxDoc = percolatorIndexSearcher.getIndexReader().maxDoc();
TopDocs topDocs = percolatorIndexSearcher.search(query, memoryIndexMaxDoc, new Sort(SortField.FIELD_DOC));
Expand All @@ -98,11 +101,11 @@ static class PercolateContext {
final boolean singlePercolateQuery;
final int[] rootDocsBySlot;

PercolateContext(PercolateQuery pq, boolean singlePercolateQuery) throws IOException {
PercolateContext(PercolateQuery pq, boolean singlePercolateQuery, Version indexVersionCreated) throws IOException {
this.percolateQuery = pq;
this.singlePercolateQuery = singlePercolateQuery;
IndexSearcher percolatorIndexSearcher = percolateQuery.getPercolatorIndexSearcher();
Query nonNestedFilter = percolatorIndexSearcher.rewrite(Queries.newNonNestedFilter());
Query nonNestedFilter = percolatorIndexSearcher.rewrite(Queries.newNonNestedFilter(indexVersionCreated));
Weight weight = percolatorIndexSearcher.createWeight(nonNestedFilter, ScoreMode.COMPLETE_NO_SCORES, 1f);
Scorer s = weight.scorer(percolatorIndexSearcher.getIndexReader().leaves().get(0));
int memoryIndexMaxDoc = percolatorIndexSearcher.getIndexReader().maxDoc();
Expand All @@ -119,11 +122,11 @@ String fieldName() {
return singlePercolateQuery ? FIELD_NAME_PREFIX : FIELD_NAME_PREFIX + "_" + percolateQuery.getName();
}

Query filterNestedDocs(Query in) {
Query filterNestedDocs(Query in, Version indexVersionCreated) {
if (rootDocsBySlot != null) {
// Ensures that we filter out nested documents
return new BooleanQuery.Builder().add(in, BooleanClause.Occur.MUST)
.add(Queries.newNonNestedFilter(), BooleanClause.Occur.FILTER)
.add(Queries.newNonNestedFilter(indexVersionCreated), BooleanClause.Occur.FILTER)
.build();
}
return in;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.Version;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.fetch.FetchContext;
import org.elasticsearch.search.fetch.FetchSubPhase.HitContext;
Expand Down Expand Up @@ -70,6 +72,9 @@ public void testHitsExecute() throws Exception {

FetchContext sc = mock(FetchContext.class);
when(sc.query()).thenReturn(percolateQuery);
SearchExecutionContext sec = mock(SearchExecutionContext.class);
when(sc.getSearchExecutionContext()).thenReturn(sec);
when(sec.indexVersionCreated()).thenReturn(Version.CURRENT);

FetchSubPhaseProcessor processor = phase.getProcessor(sc);
assertNotNull(processor);
Expand Down Expand Up @@ -98,6 +103,9 @@ public void testHitsExecute() throws Exception {

FetchContext sc = mock(FetchContext.class);
when(sc.query()).thenReturn(percolateQuery);
SearchExecutionContext sec = mock(SearchExecutionContext.class);
when(sc.getSearchExecutionContext()).thenReturn(sec);
when(sec.indexVersionCreated()).thenReturn(Version.CURRENT);

FetchSubPhaseProcessor processor = phase.getProcessor(sc);
assertNotNull(processor);
Expand Down Expand Up @@ -125,6 +133,9 @@ public void testHitsExecute() throws Exception {

FetchContext sc = mock(FetchContext.class);
when(sc.query()).thenReturn(percolateQuery);
SearchExecutionContext sec = mock(SearchExecutionContext.class);
when(sc.getSearchExecutionContext()).thenReturn(sec);
when(sec.indexVersionCreated()).thenReturn(Version.CURRENT);

FetchSubPhaseProcessor processor = phase.getProcessor(sc);
assertNotNull(processor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1842,7 +1842,7 @@ private long getLocalCheckpointOfSafeCommit(IndexCommit safeIndexCommit) throws
final Query query = new BooleanQuery.Builder().add(
LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, commitLocalCheckpoint + 1, Long.MAX_VALUE),
BooleanClause.Occur.MUST
).add(Queries.newNonNestedFilter(), BooleanClause.Occur.MUST).build();
).add(Queries.newNonNestedFilter(Version.CURRENT), BooleanClause.Occur.MUST).build();
final Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1.0f);
for (LeafReaderContext leaf : directoryReader.leaves()) {
final Scorer scorer = weight.scorer(leaf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,20 @@

package org.elasticsearch.common.lucene.search;

import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.FieldExistsQuery;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.MatchNoDocsQuery;
import org.apache.lucene.search.PrefixQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.mapper.NestedPathFieldMapper;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;

import java.util.Collection;
Expand Down Expand Up @@ -48,15 +53,30 @@ public static Query newLenientFieldQuery(String field, RuntimeException e) {
return Queries.newMatchNoDocsQuery("failed [" + field + "] query, caused by " + message);
}

public static Query newNestedFilter() {
return not(newNonNestedFilter());
private static final Version NESTED_DOCS_IDENTIFIED_VIA_PRIMARY_TERMS_VERSION = Version.fromString("6.1.0");

/**
* Creates a new nested docs query
* @param indexVersionCreated the index version created since newer indices can identify a parent field more efficiently
*/
public static Query newNestedFilter(Version indexVersionCreated) {
if (indexVersionCreated.onOrAfter(NESTED_DOCS_IDENTIFIED_VIA_PRIMARY_TERMS_VERSION)) {
return not(newNonNestedFilter(indexVersionCreated));
} else {
return new PrefixQuery(new Term(NestedPathFieldMapper.NAME_PRE_V8, new BytesRef("__")));
}
}

/**
* Creates a new non-nested docs query
* @param indexVersionCreated the index version created since newer indices can identify a parent field more efficiently
*/
public static Query newNonNestedFilter() {
return new FieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME);
public static Query newNonNestedFilter(Version indexVersionCreated) {
if (indexVersionCreated.onOrAfter(NESTED_DOCS_IDENTIFIED_VIA_PRIMARY_TERMS_VERSION)) {
return new FieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME);
} else {
return not(newNestedFilter(indexVersionCreated));
}
}

public static BooleanQuery filtered(@Nullable Query query, @Nullable Query filter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public IndexWarmer.TerminationHandle warmReader(final IndexShard indexShard, fin
MappingLookup lookup = mapperService.mappingLookup();
NestedLookup nestedLookup = lookup.nestedLookup();
if (nestedLookup != NestedLookup.EMPTY) {
warmUp.add(Queries.newNonNestedFilter());
warmUp.add(Queries.newNonNestedFilter(mapperService.getIndexSettings().getIndexVersionCreated()));
warmUp.addAll(nestedLookup.getNestedParentFilters().values());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,10 @@ public InternalEngine(EngineConfig engineConfig) {
maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo()));
if (localCheckpointTracker.getPersistedCheckpoint() < localCheckpointTracker.getMaxSeqNo()) {
try (Searcher searcher = acquireSearcher("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL)) {
restoreVersionMapAndCheckpointTracker(Lucene.wrapAllDocsLive(searcher.getDirectoryReader()));
restoreVersionMapAndCheckpointTracker(
Lucene.wrapAllDocsLive(searcher.getDirectoryReader()),
engineConfig.getIndexSettings().getIndexVersionCreated()
);
} catch (IOException e) {
throw new EngineCreationFailureException(
config().getShardId(),
Expand Down Expand Up @@ -2769,7 +2772,12 @@ public int countChanges(String source, long fromSeqNo, long toSeqNo) throws IOEx
ensureOpen();
refreshIfNeeded(source, toSeqNo);
try (Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL)) {
return LuceneChangesSnapshot.countOperations(searcher, fromSeqNo, toSeqNo);
return LuceneChangesSnapshot.countOperations(
searcher,
fromSeqNo,
toSeqNo,
config().getIndexSettings().getIndexVersionCreated()
);
} catch (Exception e) {
try {
maybeFailEngine("count changes", e);
Expand Down Expand Up @@ -2800,7 +2808,8 @@ public Translog.Snapshot newChangesSnapshot(
toSeqNo,
requiredFullRange,
singleConsumer,
accessStats
accessStats,
config().getIndexSettings().getIndexVersionCreated()
);
searcher = null;
return snapshot;
Expand Down Expand Up @@ -2967,14 +2976,14 @@ private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean a
* after the local checkpoint in the safe commit. This step ensures the live version map and checkpoint tracker
* are in sync with the Lucene commit.
*/
private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryReader) throws IOException {
private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryReader, Version indexVersionCreated) throws IOException {
final IndexSearcher searcher = new IndexSearcher(directoryReader);
searcher.setQueryCache(null);
final Query query = new BooleanQuery.Builder().add(
LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE),
BooleanClause.Occur.MUST
)
.add(Queries.newNonNestedFilter(), BooleanClause.Occur.MUST) // exclude non-root nested documents
.add(Queries.newNonNestedFilter(indexVersionCreated), BooleanClause.Occur.MUST) // exclude non-root nested documents
.build();
final Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1.0f);
for (LeafReaderContext leaf : directoryReader.leaves()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
Expand Down Expand Up @@ -62,6 +63,8 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
private final ParallelArray parallelArray;
private final Closeable onClose;

private final Version indexVersionCreated;

private int storedFieldsReaderOrd = -1;
private StoredFieldsReader storedFieldsReader = null;

Expand All @@ -77,6 +80,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
* @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo
* @param singleConsumer true if the snapshot is accessed by a single thread that creates the snapshot
* @param accessStats true if the stats of the snapshot can be accessed via {@link #totalOperations()}
* @param indexVersionCreated the version on which this index was created
*/
LuceneChangesSnapshot(
Engine.Searcher engineSearcher,
Expand All @@ -85,7 +89,8 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
long toSeqNo,
boolean requiredFullRange,
boolean singleConsumer,
boolean accessStats
boolean accessStats,
Version indexVersionCreated
) throws IOException {
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
Expand All @@ -111,6 +116,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
this.indexSearcher.setQueryCache(null);
this.accessStats = accessStats;
this.parallelArray = new ParallelArray(this.searchBatchSize);
this.indexVersionCreated = indexVersionCreated;
final TopDocs topDocs = searchOperations(null, accessStats);
this.totalHits = Math.toIntExact(topDocs.totalHits.value);
this.scoreDocs = topDocs.scoreDocs;
Expand Down Expand Up @@ -272,21 +278,22 @@ private static IndexSearcher newIndexSearcher(Engine.Searcher engineSearcher) th
return new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
}

private static Query rangeQuery(long fromSeqNo, long toSeqNo) {
private static Query rangeQuery(long fromSeqNo, long toSeqNo, Version indexVersionCreated) {
return new BooleanQuery.Builder().add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo), BooleanClause.Occur.MUST)
.add(Queries.newNonNestedFilter(), BooleanClause.Occur.MUST) // exclude non-root nested documents
.add(Queries.newNonNestedFilter(indexVersionCreated), BooleanClause.Occur.MUST) // exclude non-root nested documents
.build();
}

static int countOperations(Engine.Searcher engineSearcher, long fromSeqNo, long toSeqNo) throws IOException {
static int countOperations(Engine.Searcher engineSearcher, long fromSeqNo, long toSeqNo, Version indexVersionCreated)
throws IOException {
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
}
return newIndexSearcher(engineSearcher).count(rangeQuery(fromSeqNo, toSeqNo));
return newIndexSearcher(engineSearcher).count(rangeQuery(fromSeqNo, toSeqNo, indexVersionCreated));
}

private TopDocs searchOperations(FieldDoc after, boolean accurateTotalHits) throws IOException {
final Query rangeQuery = rangeQuery(Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo);
final Query rangeQuery = rangeQuery(Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo, indexVersionCreated);
assert accurateTotalHits == false || after == null : "accurate total hits is required by the first batch only";
final SortField sortBySeqNo = new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG);
final TopFieldCollector collector = TopFieldCollector.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ protected Query doToQuery(SearchExecutionContext context) throws IOException {
Query innerQuery;
NestedObjectMapper objectMapper = context.nestedScope().getObjectMapper();
if (objectMapper == null) {
parentFilter = context.bitsetFilter(Queries.newNonNestedFilter());
parentFilter = context.bitsetFilter(Queries.newNonNestedFilter(context.indexVersionCreated()));
} else {
parentFilter = context.bitsetFilter(objectMapper.nestedTypeFilter());
}
Expand Down Expand Up @@ -391,7 +391,7 @@ public TopDocsAndMaxScore topDocs(SearchHit hit) throws IOException {

Query rawParentFilter;
if (parentObjectMapper == null) {
rawParentFilter = Queries.newNonNestedFilter();
rawParentFilter = Queries.newNonNestedFilter(context.getSearchExecutionContext().indexVersionCreated());
} else {
rawParentFilter = parentObjectMapper.nestedTypeFilter();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,6 @@ public MappingLookup.CacheKey mappingCacheKey() {
}

public NestedDocuments getNestedDocuments() {
return new NestedDocuments(mappingLookup, bitsetFilterCache::getBitSetProducer);
return new NestedDocuments(mappingLookup, bitsetFilterCache::getBitSetProducer, indexVersionCreated());
}
}

0 comments on commit ddbd7a8

Please sign in to comment.