Skip to content

Commit

Permalink
Always use DirectoryReader for realtime get from translog (#74722)
Browse files Browse the repository at this point in the history
Reading from translog during a realtime get requires special handling in some higher level components, e.g.
ShardGetService, where we're doing a bunch of tricks to extract other stored fields from the source. Another issue with
the current approach relates to #74227 where we introduce a new "field usage tracking" directory wrapper that's always
applied, and we want to make sure that we can still quickly do realtime gets from translog without creating an in-memory
index of the document, even when this directory wrapper exists.

This PR introduces a directory reader that contains a single translog indexing operation. This can be used during a
realtime get to access documents that haven't been refreshed yet. In the normal case, all information relevant to resolve
the realtime get is mocked out to provide fast access to _id and _source. In case where more values are requested (e.g.
access to other stored fields) etc., this reader will index the document into an in-memory Lucene segment that is
created on-demand.

Relates #64504
  • Loading branch information
ywelsch committed Jul 1, 2021
1 parent 5837f9d commit fb67d36
Show file tree
Hide file tree
Showing 16 changed files with 725 additions and 312 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ final class PerThreadIDVersionAndSeqNoLookup {
/**
* Initialize lookup for the provided segment
*/
PerThreadIDVersionAndSeqNoLookup(LeafReader reader, String uidField) throws IOException {
PerThreadIDVersionAndSeqNoLookup(LeafReader reader, String uidField, boolean trackReaderKey) throws IOException {
this.uidField = uidField;
final Terms terms = reader.terms(uidField);
if (terms == null) {
Expand All @@ -77,10 +77,14 @@ final class PerThreadIDVersionAndSeqNoLookup {
throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME + "] field; _uid terms [" + terms + "]");
}
Object readerKey = null;
assert (readerKey = reader.getCoreCacheHelper().getKey()) != null;
assert trackReaderKey ? (readerKey = reader.getCoreCacheHelper().getKey()) != null : readerKey == null;
this.readerKey = readerKey;
}

PerThreadIDVersionAndSeqNoLookup(LeafReader reader, String uidField) throws IOException {
this(reader, uidField, true);
}

/** Return null if id is not found.
* We pass the {@link LeafReaderContext} as an argument so that things
* still work with reader wrappers that hide some documents while still
Expand All @@ -89,7 +93,7 @@ final class PerThreadIDVersionAndSeqNoLookup {
*/
public DocIdAndVersion lookupVersion(BytesRef id, boolean loadSeqNo, LeafReaderContext context)
throws IOException {
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
assert readerKey == null || context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
"context's reader is not the same as the reader class was initialized on.";
int docID = getDocID(id, context);

Expand Down Expand Up @@ -144,7 +148,7 @@ private static long readNumericDocValues(LeafReader reader, String field, int do

/** Return null if id is not found. */
DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOException {
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
assert readerKey == null || context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
"context's reader is not the same as the reader class was initialized on.";
final int docID = getDocID(id, context);
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,19 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term,
return null;
}

public static DocIdAndVersion loadDocIdAndVersionUncached(IndexReader reader, Term term, boolean loadSeqNo) throws IOException {
List<LeafReaderContext> leaves = reader.leaves();
for (int i = leaves.size() - 1; i >= 0; i--) {
final LeafReaderContext leaf = leaves.get(i);
PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(leaf.reader(), term.field(), false);
DocIdAndVersion result = lookup.lookupVersion(term.bytes(), loadSeqNo, leaf);
if (result != null) {
return result;
}
}
return null;
}

/**
* Loads the internal docId and sequence number of the latest copy for a given uid from the provided reader.
* The result is either null or the live and latest version of the given uid.
Expand Down
29 changes: 11 additions & 18 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -568,10 +568,14 @@ public enum SyncedFlushResult {
PENDING_OPERATIONS
}

protected final GetResult getFromSearcher(Get get, Engine.Searcher searcher) throws EngineException {
protected final GetResult getFromSearcher(Get get, Engine.Searcher searcher, boolean uncachedLookup) throws EngineException {
final DocIdAndVersion docIdAndVersion;
try {
docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.getIndexReader(), get.uid(), true);
if (uncachedLookup) {
docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersionUncached(searcher.getIndexReader(), get.uid(), true);
} else {
docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.getIndexReader(), get.uid(), true);
}
} catch (Exception e) {
Releasables.closeWhileHandlingException(searcher);
//TODO: A better exception goes here
Expand All @@ -596,7 +600,7 @@ protected final GetResult getFromSearcher(Get get, Engine.Searcher searcher) thr
if (docIdAndVersion != null) {
// don't release the searcher on this path, it is the
// responsibility of the caller to call GetResult.release
return new GetResult(searcher, docIdAndVersion, false);
return new GetResult(searcher, docIdAndVersion);
} else {
Releasables.close(searcher);
return GetResult.NOT_EXISTS;
Expand Down Expand Up @@ -1623,21 +1627,18 @@ public static class GetResult implements Releasable {
private final long version;
private final DocIdAndVersion docIdAndVersion;
private final Engine.Searcher searcher;
private final boolean fromTranslog;

public static final GetResult NOT_EXISTS = new GetResult(false, Versions.NOT_FOUND, null, null, false);
public static final GetResult NOT_EXISTS = new GetResult(false, Versions.NOT_FOUND, null, null);

private GetResult(boolean exists, long version, DocIdAndVersion docIdAndVersion, Engine.Searcher searcher, boolean fromTranslog) {
private GetResult(boolean exists, long version, DocIdAndVersion docIdAndVersion, Engine.Searcher searcher) {
this.exists = exists;
this.version = version;
this.docIdAndVersion = docIdAndVersion;
this.searcher = searcher;
this.fromTranslog = fromTranslog;
assert fromTranslog == false || searcher.getIndexReader() instanceof TranslogLeafReader;
}

public GetResult(Engine.Searcher searcher, DocIdAndVersion docIdAndVersion, boolean fromTranslog) {
this(true, docIdAndVersion.version, docIdAndVersion, searcher, fromTranslog);
public GetResult(Engine.Searcher searcher, DocIdAndVersion docIdAndVersion) {
this(true, docIdAndVersion.version, docIdAndVersion, searcher);
}

public boolean exists() {
Expand All @@ -1648,14 +1649,6 @@ public long version() {
return this.version;
}

/**
* Returns {@code true} iff the get was performed from a translog operation. Notes that this returns {@code false}
* if the get was performed on an in-memory Lucene segment created from the corresponding translog operation.
*/
public boolean isFromTranslog() {
return fromTranslog;
}

public Engine.Searcher searcher() {
return this.searcher;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
Expand Down Expand Up @@ -654,26 +655,31 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external
}
}

private static final QueryCachingPolicy NEVER_CACHE_POLICY = new QueryCachingPolicy() {
@Override
public void onUse(Query query) {

}

@Override
public boolean shouldCache(Query query) {
return false;
}
};

public final AtomicLong translogGetCount = new AtomicLong(); // number of times realtime get was done on translog
public final AtomicLong translogInMemorySegmentsCount = new AtomicLong(); // number of times in-memory index needed to be created

private GetResult getFromTranslog(Get get, Translog.Index index, MappingLookup mappingLookup, DocumentParser documentParser,
Function<Searcher, Searcher> searcherWrapper) throws IOException {
assert get.isReadFromTranslog();
final SingleDocDirectoryReader inMemoryReader = new SingleDocDirectoryReader(shardId, index, mappingLookup, documentParser,
config().getAnalyzer());
translogGetCount.incrementAndGet();
final TranslogDirectoryReader inMemoryReader = new TranslogDirectoryReader(shardId, index, mappingLookup, documentParser,
config().getAnalyzer(), translogInMemorySegmentsCount::incrementAndGet);
final Engine.Searcher searcher = new Engine.Searcher("realtime_get", ElasticsearchDirectoryReader.wrap(inMemoryReader, shardId),
config().getSimilarity(), config().getQueryCache(), config().getQueryCachingPolicy(), inMemoryReader);
config().getSimilarity(), null /*query cache disabled*/, NEVER_CACHE_POLICY, inMemoryReader);
final Searcher wrappedSearcher = searcherWrapper.apply(searcher);
if (wrappedSearcher == searcher) {
searcher.close();
assert inMemoryReader.assertMemorySegmentStatus(false);
final TranslogLeafReader translogLeafReader = new TranslogLeafReader(index);
return new GetResult(new Engine.Searcher("realtime_get", translogLeafReader,
IndexSearcher.getDefaultSimilarity(), null, IndexSearcher.getDefaultQueryCachingPolicy(), translogLeafReader),
new VersionsAndSeqNoResolver.DocIdAndVersion(
0, index.version(), index.seqNo(), index.primaryTerm(), translogLeafReader, 0), true);
} else {
assert inMemoryReader.assertMemorySegmentStatus(true);
return getFromSearcher(get, wrappedSearcher);
}
return getFromSearcher(get, wrappedSearcher, true);
}

@Override
Expand Down Expand Up @@ -722,10 +728,10 @@ public GetResult get(Get get, MappingLookup mappingLookup, DocumentParser docume
assert versionValue.seqNo >= 0 : versionValue;
refreshIfNeeded("realtime_get", versionValue.seqNo);
}
return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper));
return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper), false);
} else {
// we expose what has been externally expose in a point in time snapshot via an explicit refresh
return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper));
return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper), false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
@Override
public GetResult get(Get get, MappingLookup mappingLookup, DocumentParser documentParser,
Function<Searcher, Searcher> searcherWrapper) {
return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper));
return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper), false);
}

@Override
Expand Down
Loading

0 comments on commit fb67d36

Please sign in to comment.