diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java index eb86c41ec7c12..ea50e3db5196d 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java @@ -56,7 +56,7 @@ final class PerThreadIDVersionAndSeqNoLookup { /** * Initialize lookup for the provided segment */ - PerThreadIDVersionAndSeqNoLookup(LeafReader reader, String uidField) throws IOException { + PerThreadIDVersionAndSeqNoLookup(LeafReader reader, String uidField, boolean trackReaderKey) throws IOException { this.uidField = uidField; final Terms terms = reader.terms(uidField); if (terms == null) { @@ -77,10 +77,14 @@ final class PerThreadIDVersionAndSeqNoLookup { throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME + "] field; _uid terms [" + terms + "]"); } Object readerKey = null; - assert (readerKey = reader.getCoreCacheHelper().getKey()) != null; + assert trackReaderKey ? (readerKey = reader.getCoreCacheHelper().getKey()) != null : readerKey == null; this.readerKey = readerKey; } + PerThreadIDVersionAndSeqNoLookup(LeafReader reader, String uidField) throws IOException { + this(reader, uidField, true); + } + /** Return null if id is not found. * We pass the {@link LeafReaderContext} as an argument so that things * still work with reader wrappers that hide some documents while still @@ -89,7 +93,7 @@ final class PerThreadIDVersionAndSeqNoLookup { */ public DocIdAndVersion lookupVersion(BytesRef id, boolean loadSeqNo, LeafReaderContext context) throws IOException { - assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) : + assert readerKey == null || context.reader().getCoreCacheHelper().getKey().equals(readerKey) : "context's reader is not the same as the reader class was initialized on."; int docID = getDocID(id, context); @@ -144,7 +148,7 @@ private static long readNumericDocValues(LeafReader reader, String field, int do /** Return null if id is not found. */ DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOException { - assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) : + assert readerKey == null || context.reader().getCoreCacheHelper().getKey().equals(readerKey) : "context's reader is not the same as the reader class was initialized on."; final int docID = getDocID(id, context); if (docID != DocIdSetIterator.NO_MORE_DOCS) { diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java index 3aeb5c4d18cb7..a5e9fbbb96155 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java @@ -134,6 +134,19 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term, return null; } + public static DocIdAndVersion loadDocIdAndVersionUncached(IndexReader reader, Term term, boolean loadSeqNo) throws IOException { + List leaves = reader.leaves(); + for (int i = leaves.size() - 1; i >= 0; i--) { + final LeafReaderContext leaf = leaves.get(i); + PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(leaf.reader(), term.field(), false); + DocIdAndVersion result = lookup.lookupVersion(term.bytes(), loadSeqNo, leaf); + if (result != null) { + return result; + } + } + return null; + } + /** * Loads the internal docId and sequence number of the latest copy for a given uid from the provided reader. * The result is either null or the live and latest version of the given uid. diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 2134ca38b9510..cc52ecaaa920d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -568,10 +568,14 @@ public enum SyncedFlushResult { PENDING_OPERATIONS } - protected final GetResult getFromSearcher(Get get, Engine.Searcher searcher) throws EngineException { + protected final GetResult getFromSearcher(Get get, Engine.Searcher searcher, boolean uncachedLookup) throws EngineException { final DocIdAndVersion docIdAndVersion; try { - docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.getIndexReader(), get.uid(), true); + if (uncachedLookup) { + docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersionUncached(searcher.getIndexReader(), get.uid(), true); + } else { + docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.getIndexReader(), get.uid(), true); + } } catch (Exception e) { Releasables.closeWhileHandlingException(searcher); //TODO: A better exception goes here @@ -596,7 +600,7 @@ protected final GetResult getFromSearcher(Get get, Engine.Searcher searcher) thr if (docIdAndVersion != null) { // don't release the searcher on this path, it is the // responsibility of the caller to call GetResult.release - return new GetResult(searcher, docIdAndVersion, false); + return new GetResult(searcher, docIdAndVersion); } else { Releasables.close(searcher); return GetResult.NOT_EXISTS; @@ -1623,21 +1627,18 @@ public static class GetResult implements Releasable { private final long version; private final DocIdAndVersion docIdAndVersion; private final Engine.Searcher searcher; - private final boolean fromTranslog; - public static final GetResult NOT_EXISTS = new GetResult(false, Versions.NOT_FOUND, null, null, false); + public static final GetResult NOT_EXISTS = new GetResult(false, Versions.NOT_FOUND, null, null); - private GetResult(boolean exists, long version, DocIdAndVersion docIdAndVersion, Engine.Searcher searcher, boolean fromTranslog) { + private GetResult(boolean exists, long version, DocIdAndVersion docIdAndVersion, Engine.Searcher searcher) { this.exists = exists; this.version = version; this.docIdAndVersion = docIdAndVersion; this.searcher = searcher; - this.fromTranslog = fromTranslog; - assert fromTranslog == false || searcher.getIndexReader() instanceof TranslogLeafReader; } - public GetResult(Engine.Searcher searcher, DocIdAndVersion docIdAndVersion, boolean fromTranslog) { - this(true, docIdAndVersion.version, docIdAndVersion, searcher, fromTranslog); + public GetResult(Engine.Searcher searcher, DocIdAndVersion docIdAndVersion) { + this(true, docIdAndVersion.version, docIdAndVersion, searcher); } public boolean exists() { @@ -1648,14 +1649,6 @@ public long version() { return this.version; } - /** - * Returns {@code true} iff the get was performed from a translog operation. Notes that this returns {@code false} - * if the get was performed on an in-memory Lucene segment created from the corresponding translog operation. - */ - public boolean isFromTranslog() { - return fromTranslog; - } - public Engine.Searcher searcher() { return this.searcher; } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a69e8eb3ec8dc..ab90fe74604b1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -31,6 +31,7 @@ import org.apache.lucene.search.DocValuesFieldExistsQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Scorer; @@ -654,26 +655,31 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external } } + private static final QueryCachingPolicy NEVER_CACHE_POLICY = new QueryCachingPolicy() { + @Override + public void onUse(Query query) { + + } + + @Override + public boolean shouldCache(Query query) { + return false; + } + }; + + public final AtomicLong translogGetCount = new AtomicLong(); // number of times realtime get was done on translog + public final AtomicLong translogInMemorySegmentsCount = new AtomicLong(); // number of times in-memory index needed to be created + private GetResult getFromTranslog(Get get, Translog.Index index, MappingLookup mappingLookup, DocumentParser documentParser, Function searcherWrapper) throws IOException { assert get.isReadFromTranslog(); - final SingleDocDirectoryReader inMemoryReader = new SingleDocDirectoryReader(shardId, index, mappingLookup, documentParser, - config().getAnalyzer()); + translogGetCount.incrementAndGet(); + final TranslogDirectoryReader inMemoryReader = new TranslogDirectoryReader(shardId, index, mappingLookup, documentParser, + config().getAnalyzer(), translogInMemorySegmentsCount::incrementAndGet); final Engine.Searcher searcher = new Engine.Searcher("realtime_get", ElasticsearchDirectoryReader.wrap(inMemoryReader, shardId), - config().getSimilarity(), config().getQueryCache(), config().getQueryCachingPolicy(), inMemoryReader); + config().getSimilarity(), null /*query cache disabled*/, NEVER_CACHE_POLICY, inMemoryReader); final Searcher wrappedSearcher = searcherWrapper.apply(searcher); - if (wrappedSearcher == searcher) { - searcher.close(); - assert inMemoryReader.assertMemorySegmentStatus(false); - final TranslogLeafReader translogLeafReader = new TranslogLeafReader(index); - return new GetResult(new Engine.Searcher("realtime_get", translogLeafReader, - IndexSearcher.getDefaultSimilarity(), null, IndexSearcher.getDefaultQueryCachingPolicy(), translogLeafReader), - new VersionsAndSeqNoResolver.DocIdAndVersion( - 0, index.version(), index.seqNo(), index.primaryTerm(), translogLeafReader, 0), true); - } else { - assert inMemoryReader.assertMemorySegmentStatus(true); - return getFromSearcher(get, wrappedSearcher); - } + return getFromSearcher(get, wrappedSearcher, true); } @Override @@ -722,10 +728,10 @@ public GetResult get(Get get, MappingLookup mappingLookup, DocumentParser docume assert versionValue.seqNo >= 0 : versionValue; refreshIfNeeded("realtime_get", versionValue.seqNo); } - return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper)); + return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper), false); } else { // we expose what has been externally expose in a point in time snapshot via an explicit refresh - return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper)); + return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper), false); } } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 5a92b20ed66ff..15ffe2ad9dd1a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -258,7 +258,7 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm @Override public GetResult get(Get get, MappingLookup mappingLookup, DocumentParser documentParser, Function searcherWrapper) { - return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper)); + return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper), false); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java b/server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java new file mode 100644 index 0000000000000..00008ded01314 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java @@ -0,0 +1,579 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.index.BaseTermsEnum; +import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.Fields; +import org.apache.lucene.index.ImpactsEnum; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafMetaData; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.PointValues; +import org.apache.lucene.index.PostingsEnum; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.index.StoredFieldVisitor; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.store.ByteBuffersDirectory; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.fieldvisitor.FieldsVisitor; +import org.elasticsearch.index.mapper.DocumentParser; +import org.elasticsearch.index.mapper.IdFieldMapper; +import org.elasticsearch.index.mapper.MappingLookup; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.RoutingFieldMapper; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.mapper.VersionFieldMapper; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A {@link DirectoryReader} that contains a single translog indexing operation. + * This can be used during a realtime get to access documents that haven't been refreshed yet. + * In the normal case, all information relevant to resolve the realtime get is mocked out + * to provide fast access to _id and _source. In case where more values are requested + * (e.g. access to other stored fields) etc., this reader will index the document + * into an in-memory Lucene segment that is created on-demand. + */ +final class TranslogDirectoryReader extends DirectoryReader { + private final TranslogLeafReader leafReader; + + TranslogDirectoryReader(ShardId shardId, Translog.Index operation, MappingLookup mappingLookup, DocumentParser documentParser, + Analyzer analyzer, Runnable onSegmentCreated) throws IOException { + this(new TranslogLeafReader(shardId, operation, mappingLookup, documentParser, analyzer, onSegmentCreated)); + } + + private TranslogDirectoryReader(TranslogLeafReader leafReader) throws IOException { + super(leafReader.directory, new LeafReader[]{leafReader}, null); + this.leafReader = leafReader; + } + + private static UnsupportedOperationException unsupported() { + assert false : "unsupported operation"; + return new UnsupportedOperationException(); + } + + public TranslogLeafReader getLeafReader() { + return leafReader; + } + + @Override + protected DirectoryReader doOpenIfChanged() { + throw unsupported(); + } + + @Override + protected DirectoryReader doOpenIfChanged(IndexCommit commit) { + throw unsupported(); + } + + @Override + protected DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) { + throw unsupported(); + } + + @Override + public long getVersion() { + throw unsupported(); + } + + @Override + public boolean isCurrent() { + throw unsupported(); + } + + @Override + public IndexCommit getIndexCommit() { + throw unsupported(); + } + + @Override + protected void doClose() throws IOException { + leafReader.close(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return leafReader.getReaderCacheHelper(); + } + + private static class TranslogLeafReader extends LeafReader { + + private static final FieldInfo FAKE_SOURCE_FIELD + = new FieldInfo(SourceFieldMapper.NAME, 1, false, false, false, IndexOptions.NONE, + DocValuesType.NONE, -1, Collections.emptyMap(), 0, 0, 0, false); + private static final FieldInfo FAKE_ROUTING_FIELD + = new FieldInfo(RoutingFieldMapper.NAME, 2, false, false, false, IndexOptions.NONE, + DocValuesType.NONE, -1, Collections.emptyMap(), 0, 0, 0, false); + private static final FieldInfo FAKE_ID_FIELD + = new FieldInfo(IdFieldMapper.NAME, 3, false, false, false, IndexOptions.DOCS, + DocValuesType.NONE, -1, Collections.emptyMap(), 0, 0, 0, false); + private static Set TRANSLOG_FIELD_NAMES = + Sets.newHashSet(SourceFieldMapper.NAME, RoutingFieldMapper.NAME, IdFieldMapper.NAME); + + + private final ShardId shardId; + private final Translog.Index operation; + private final MappingLookup mappingLookup; + private final DocumentParser documentParser; + private final Analyzer analyzer; + private final Directory directory; + private final Runnable onSegmentCreated; + + private final AtomicReference delegate = new AtomicReference<>(); + private final BytesRef uid; + + TranslogLeafReader(ShardId shardId, Translog.Index operation, MappingLookup mappingLookup, DocumentParser documentParser, + Analyzer analyzer, Runnable onSegmentCreated) { + this.shardId = shardId; + this.operation = operation; + this.mappingLookup = mappingLookup; + this.documentParser = documentParser; + this.analyzer = analyzer; + this.onSegmentCreated = onSegmentCreated; + this.directory = new ByteBuffersDirectory(); + this.uid = Uid.encodeId(operation.id()); + } + + private LeafReader getDelegate() { + ensureOpen(); + LeafReader reader = delegate.get(); + if (reader == null) { + synchronized (this) { + ensureOpen(); + reader = delegate.get(); + if (reader == null) { + reader = createInMemoryLeafReader(); + final LeafReader existing = delegate.getAndSet(reader); + assert existing == null; + onSegmentCreated.run(); + } + } + } + return reader; + } + + private LeafReader createInMemoryLeafReader() { + assert Thread.holdsLock(this); + final ParsedDocument parsedDocs = documentParser.parseDocument(new SourceToParse(shardId.getIndexName(), operation.type(), + operation.id(), operation.source(), XContentHelper.xContentType(operation.source()), operation.routing(), + Collections.emptyMap()), mappingLookup); + + parsedDocs.updateSeqID(operation.seqNo(), operation.primaryTerm()); + parsedDocs.version().setLongValue(operation.version()); + final IndexWriterConfig writeConfig = new IndexWriterConfig(analyzer).setOpenMode(IndexWriterConfig.OpenMode.CREATE); + try (IndexWriter writer = new IndexWriter(directory, writeConfig)) { + writer.addDocument(parsedDocs.rootDoc()); + final DirectoryReader reader = open(writer); + if (reader.leaves().size() != 1 || reader.leaves().get(0).reader().numDocs() != 1) { + reader.close(); + throw new IllegalStateException("Expected a single document segment; " + + "but [" + reader.leaves().size() + " segments with " + reader.leaves().get(0).reader().numDocs() + " documents"); + } + return reader.leaves().get(0).reader(); + } catch (IOException e) { + throw new EngineException(shardId, "failed to create an in-memory segment for get [" + operation.id() + "]", e); + } + } + + @Override + public CacheHelper getCoreCacheHelper() { + return getDelegate().getCoreCacheHelper(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return null; + } + + @Override + public Terms terms(String field) throws IOException { + if (delegate.get() == null) { + // override this for VersionsAndSeqNoResolver + if (field.equals(IdFieldMapper.NAME)) { + return new FakeTerms(uid); + } + } + return getDelegate().terms(field); + } + + @Override + public NumericDocValues getNumericDocValues(String field) throws IOException { + if (delegate.get() == null) { + // override this for VersionsAndSeqNoResolver + if (field.equals(VersionFieldMapper.NAME)) { + return new FakeNumericDocValues(operation.version()); + } + if (field.equals(SeqNoFieldMapper.NAME)) { + return new FakeNumericDocValues(operation.seqNo()); + } + if (field.equals(SeqNoFieldMapper.PRIMARY_TERM_NAME)) { + return new FakeNumericDocValues(operation.primaryTerm()); + } + } + return getDelegate().getNumericDocValues(field); + } + + @Override + public BinaryDocValues getBinaryDocValues(String field) throws IOException { + return getDelegate().getBinaryDocValues(field); + } + + @Override + public SortedDocValues getSortedDocValues(String field) throws IOException { + return getDelegate().getSortedDocValues(field); + } + + @Override + public SortedNumericDocValues getSortedNumericDocValues(String field) throws IOException { + return getDelegate().getSortedNumericDocValues(field); + } + + @Override + public SortedSetDocValues getSortedSetDocValues(String field) throws IOException { + return getDelegate().getSortedSetDocValues(field); + } + + @Override + public NumericDocValues getNormValues(String field) throws IOException { + return getDelegate().getNormValues(field); + } + + @Override + public FieldInfos getFieldInfos() { + return getDelegate().getFieldInfos(); + } + + @Override + public Bits getLiveDocs() { + return null; + } + + @Override + public PointValues getPointValues(String field) throws IOException { + return getDelegate().getPointValues(field); + } + + @Override + public void checkIntegrity() throws IOException { + } + + @Override + public LeafMetaData getMetaData() { + return getDelegate().getMetaData(); + } + + @Override + public Fields getTermVectors(int docID) throws IOException { + return getDelegate().getTermVectors(docID); + } + + @Override + public int numDocs() { + return 1; + } + + @Override + public int maxDoc() { + return 1; + } + + @Override + public void document(int docID, StoredFieldVisitor visitor) throws IOException { + assert docID == 0; + if (docID != 0) { + throw new IllegalArgumentException("no such doc ID " + docID); + } + if (delegate.get() == null) { + if (visitor instanceof FieldsVisitor) { + // override this for ShardGetService + if (TRANSLOG_FIELD_NAMES.containsAll(((FieldsVisitor) visitor).getFieldNames())) { + readStoredFieldsDirectly(visitor); + return; + } + } + } + + getDelegate().document(docID, visitor); + } + + private void readStoredFieldsDirectly(StoredFieldVisitor visitor) throws IOException { + if (visitor.needsField(FAKE_SOURCE_FIELD) == StoredFieldVisitor.Status.YES) { + BytesReference sourceBytes = operation.source(); + assert BytesReference.toBytes(sourceBytes) == sourceBytes.toBytesRef().bytes; + SourceFieldMapper mapper = mappingLookup.getMapping().getMetadataMapperByClass(SourceFieldMapper.class); + if (mapper != null) { + try { + sourceBytes = mapper.applyFilters(sourceBytes, null); + } catch (IOException e) { + throw new IOException("Failed to reapply filters after reading from translog", e); + } + } + if (sourceBytes != null) { + visitor.binaryField(FAKE_SOURCE_FIELD, BytesReference.toBytes(sourceBytes)); + } + } + if (operation.routing() != null && visitor.needsField(FAKE_ROUTING_FIELD) == StoredFieldVisitor.Status.YES) { + visitor.stringField(FAKE_ROUTING_FIELD, operation.routing().getBytes(StandardCharsets.UTF_8)); + } + if (visitor.needsField(FAKE_ID_FIELD) == StoredFieldVisitor.Status.YES) { + final byte[] id = new byte[uid.length]; + System.arraycopy(uid.bytes, uid.offset, id, 0, uid.length); + visitor.binaryField(FAKE_ID_FIELD, id); + } + } + + @Override + protected synchronized void doClose() throws IOException { + IOUtils.close(delegate.get(), directory); + } + } + + private static class FakeTerms extends Terms { + private final BytesRef uid; + + FakeTerms(BytesRef uid) { + this.uid = uid; + } + + @Override + public TermsEnum iterator() throws IOException { + return new FakeTermsEnum(uid); + } + + @Override + public long size() throws IOException { + return 1; + } + + @Override + public long getSumTotalTermFreq() throws IOException { + return 1; + } + + @Override + public long getSumDocFreq() throws IOException { + return 1; + } + + @Override + public int getDocCount() throws IOException { + return 1; + } + + @Override + public boolean hasFreqs() { + return false; + } + + @Override + public boolean hasOffsets() { + return false; + } + + @Override + public boolean hasPositions() { + return false; + } + + @Override + public boolean hasPayloads() { + return false; + } + } + + private static class FakeTermsEnum extends BaseTermsEnum { + private final BytesRef term; + private long position = -1; + + FakeTermsEnum(BytesRef term) { + this.term = term; + } + + @Override + public SeekStatus seekCeil(BytesRef text) throws IOException { + int cmp = text.compareTo(term); + if (cmp == 0) { + position = 0; + return SeekStatus.FOUND; + } else if (cmp < 0) { + position = 0; + return SeekStatus.NOT_FOUND; + } + position = Long.MAX_VALUE; + return SeekStatus.END; + } + + @Override + public void seekExact(long ord) throws IOException { + position = ord; + } + + @Override + public BytesRef term() throws IOException { + assert position == 0; + return term; + } + + @Override + public long ord() throws IOException { + return position; + } + + @Override + public int docFreq() throws IOException { + return 1; + } + + @Override + public long totalTermFreq() throws IOException { + return 1; + } + + @Override + public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException { + return new FakePostingsEnum(term); + } + + @Override + public ImpactsEnum impacts(int flags) throws IOException { + throw unsupported(); + } + + @Override + public BytesRef next() throws IOException { + return ++position == 0 ? term : null; + } + } + + private static class FakePostingsEnum extends PostingsEnum { + private final BytesRef term; + + private int iter = -1; + + private FakePostingsEnum(BytesRef term) { + this.term = term; + } + + @Override + public int freq() { + return 1; + } + + @Override + public int nextPosition() { + return 0; + } + + @Override + public int startOffset() { + return 0; + } + + @Override + public int endOffset() { + return term.length; + } + + @Override + public BytesRef getPayload() { + return null; + } + + @Override + public int docID() { + return iter > 0 ? NO_MORE_DOCS : iter; + } + + @Override + public int nextDoc() { + return ++iter == 0 ? 0 : NO_MORE_DOCS; + } + + @Override + public int advance(int target) { + int doc; + while ((doc = nextDoc()) < target) { + } + return doc; + } + + @Override + public long cost() { + return 0; + } + } + + private static class FakeNumericDocValues extends NumericDocValues { + private final long value; + private final DocIdSetIterator disi = DocIdSetIterator.all(1); + + FakeNumericDocValues(long value) { + this.value = value; + } + + @Override + public long longValue() { + return value; + } + + @Override + public boolean advanceExact(int target) throws IOException { + return disi.advance(target) == target; + } + + @Override + public int docID() { + return disi.docID(); + } + + @Override + public int nextDoc() throws IOException { + return disi.nextDoc(); + } + + @Override + public int advance(int target) throws IOException { + return disi.advance(target); + } + + @Override + public long cost() { + return disi.cost(); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/TranslogLeafReader.java b/server/src/main/java/org/elasticsearch/index/engine/TranslogLeafReader.java deleted file mode 100644 index 548b25b29e4cd..0000000000000 --- a/server/src/main/java/org/elasticsearch/index/engine/TranslogLeafReader.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ -package org.elasticsearch.index.engine; - -import org.apache.lucene.index.BinaryDocValues; -import org.apache.lucene.index.DocValuesType; -import org.apache.lucene.index.FieldInfo; -import org.apache.lucene.index.FieldInfos; -import org.apache.lucene.index.Fields; -import org.apache.lucene.index.IndexOptions; -import org.apache.lucene.index.LeafMetaData; -import org.apache.lucene.index.LeafReader; -import org.apache.lucene.index.NumericDocValues; -import org.apache.lucene.index.PointValues; -import org.apache.lucene.index.SortedDocValues; -import org.apache.lucene.index.SortedNumericDocValues; -import org.apache.lucene.index.SortedSetDocValues; -import org.apache.lucene.index.StoredFieldVisitor; -import org.apache.lucene.index.Terms; -import org.apache.lucene.util.Bits; -import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.index.mapper.IdFieldMapper; -import org.elasticsearch.index.mapper.RoutingFieldMapper; -import org.elasticsearch.index.mapper.SourceFieldMapper; -import org.elasticsearch.index.mapper.Uid; -import org.elasticsearch.index.translog.Translog; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Collections; -import java.util.Set; - -/** - * Internal class that mocks a single doc read from the transaction log as a leaf reader. - */ -public final class TranslogLeafReader extends LeafReader { - - private final Translog.Index operation; - private static final FieldInfo FAKE_SOURCE_FIELD - = new FieldInfo(SourceFieldMapper.NAME, 1, false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, Collections.emptyMap(), - 0, 0, 0, false); - private static final FieldInfo FAKE_ROUTING_FIELD - = new FieldInfo(RoutingFieldMapper.NAME, 2, false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, Collections.emptyMap(), - 0, 0, 0, false); - private static final FieldInfo FAKE_ID_FIELD - = new FieldInfo(IdFieldMapper.NAME, 3, false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, Collections.emptyMap(), - 0, 0, 0, false); - public static Set ALL_FIELD_NAMES = Sets.newHashSet(FAKE_SOURCE_FIELD.name, FAKE_ROUTING_FIELD.name, FAKE_ID_FIELD.name); - - TranslogLeafReader(Translog.Index operation) { - this.operation = operation; - } - @Override - public CacheHelper getCoreCacheHelper() { - throw new UnsupportedOperationException(); - } - - @Override - public Terms terms(String field) { - throw new UnsupportedOperationException(); - } - - @Override - public NumericDocValues getNumericDocValues(String field) { - throw new UnsupportedOperationException(); - } - - @Override - public BinaryDocValues getBinaryDocValues(String field) { - throw new UnsupportedOperationException(); - } - - @Override - public SortedDocValues getSortedDocValues(String field) { - throw new UnsupportedOperationException(); - } - - @Override - public SortedNumericDocValues getSortedNumericDocValues(String field) { - throw new UnsupportedOperationException(); - } - - @Override - public SortedSetDocValues getSortedSetDocValues(String field) { - throw new UnsupportedOperationException(); - } - - @Override - public NumericDocValues getNormValues(String field) { - throw new UnsupportedOperationException(); - } - - @Override - public FieldInfos getFieldInfos() { - throw new UnsupportedOperationException(); - } - - @Override - public Bits getLiveDocs() { - throw new UnsupportedOperationException(); - } - - @Override - public PointValues getPointValues(String field) { - throw new UnsupportedOperationException(); - } - - @Override - public void checkIntegrity() { - - } - - @Override - public LeafMetaData getMetaData() { - throw new UnsupportedOperationException(); - } - - @Override - public Fields getTermVectors(int docID) { - throw new UnsupportedOperationException(); - } - - @Override - public int numDocs() { - return 1; - } - - @Override - public int maxDoc() { - return 1; - } - - @Override - public void document(int docID, StoredFieldVisitor visitor) throws IOException { - if (docID != 0) { - throw new IllegalArgumentException("no such doc ID " + docID); - } - if (visitor.needsField(FAKE_SOURCE_FIELD) == StoredFieldVisitor.Status.YES) { - assert operation.source().toBytesRef().offset == 0; - assert operation.source().toBytesRef().length == operation.source().toBytesRef().bytes.length; - visitor.binaryField(FAKE_SOURCE_FIELD, operation.source().toBytesRef().bytes); - } - if (operation.routing() != null && visitor.needsField(FAKE_ROUTING_FIELD) == StoredFieldVisitor.Status.YES) { - visitor.stringField(FAKE_ROUTING_FIELD, operation.routing().getBytes(StandardCharsets.UTF_8)); - } - if (visitor.needsField(FAKE_ID_FIELD) == StoredFieldVisitor.Status.YES) { - BytesRef bytesRef = Uid.encodeId(operation.id()); - final byte[] id = new byte[bytesRef.length]; - System.arraycopy(bytesRef.bytes, bytesRef.offset, id, 0, bytesRef.length); - visitor.binaryField(FAKE_ID_FIELD, id); - } - } - - @Override - protected void doClose() { - - } - - @Override - public CacheHelper getReaderCacheHelper() { - throw new UnsupportedOperationException(); - } -} diff --git a/server/src/main/java/org/elasticsearch/index/fieldvisitor/CustomFieldsVisitor.java b/server/src/main/java/org/elasticsearch/index/fieldvisitor/CustomFieldsVisitor.java index a4311826b3a06..811ed8aa7bb7f 100644 --- a/server/src/main/java/org/elasticsearch/index/fieldvisitor/CustomFieldsVisitor.java +++ b/server/src/main/java/org/elasticsearch/index/fieldvisitor/CustomFieldsVisitor.java @@ -9,6 +9,7 @@ import org.apache.lucene.index.FieldInfo; +import java.util.HashSet; import java.util.Set; /** @@ -26,6 +27,13 @@ public CustomFieldsVisitor(Set fields, boolean loadSource) { this.fields = fields; } + @Override + public Set getFieldNames() { + Set fields = new HashSet<>(super.getFieldNames()); + fields.addAll(this.fields); + return fields; + } + @Override public Status needsField(FieldInfo fieldInfo) { if (super.needsField(fieldInfo) == Status.YES) { diff --git a/server/src/main/java/org/elasticsearch/index/fieldvisitor/FieldsVisitor.java b/server/src/main/java/org/elasticsearch/index/fieldvisitor/FieldsVisitor.java index 2005cd013d82e..fc19d1e80592e 100644 --- a/server/src/main/java/org/elasticsearch/index/fieldvisitor/FieldsVisitor.java +++ b/server/src/main/java/org/elasticsearch/index/fieldvisitor/FieldsVisitor.java @@ -77,6 +77,10 @@ public Status needsField(FieldInfo fieldInfo) { : Status.NO; } + public Set getFieldNames() { + return requiredFields; + } + public final void postProcess(Function fieldTypeLookup, @Nullable String type) { assert this.type == null || this.type.equals(type); this.type = type; diff --git a/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java b/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java index 07fff81bd0fa7..9519c6546796a 100644 --- a/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java +++ b/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java @@ -8,17 +8,9 @@ package org.elasticsearch.index.get; -import org.apache.lucene.index.DocValuesType; -import org.apache.lucene.index.FieldInfo; -import org.apache.lucene.index.IndexOptions; -import org.apache.lucene.index.IndexableField; -import org.apache.lucene.index.IndexableFieldType; -import org.apache.lucene.index.StoredFieldVisitor; import org.apache.lucene.index.Term; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.core.Nullable; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.core.Tuple; import org.elasticsearch.common.document.DocumentField; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; @@ -29,10 +21,11 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.engine.TranslogLeafReader; import org.elasticsearch.index.fieldvisitor.CustomFieldsVisitor; import org.elasticsearch.index.fieldvisitor.FieldsVisitor; import org.elasticsearch.index.mapper.DocumentMapper; @@ -40,22 +33,18 @@ import org.elasticsearch.index.mapper.Mapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MappingLookup; -import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.RoutingFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; -import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import java.io.IOException; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -169,7 +158,6 @@ private GetResult innerGet(String type, String id, String[] gFields, boolean rea Term uidTerm = new Term(IdFieldMapper.NAME, Uid.encodeId(id)); get = indexShard.get(new Engine.Get(realtime, realtime, type, id, uidTerm) .version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)); - assert get.isFromTranslog() == false || realtime : "should only read from translog if realtime enabled"; if (get.exists() == false) { get.close(); } @@ -210,11 +198,7 @@ private GetResult innerGetLoadFromStoredFields(String type, String id, String[] Map metadataFields = null; BytesReference source = null; DocIdAndVersion docIdAndVersion = get.docIdAndVersion(); - // force fetching source if we read from translog and need to recreate stored fields - boolean forceSourceForComputingTranslogStoredFields = get.isFromTranslog() && storedFields != null && - Stream.of(storedFields).anyMatch(f -> TranslogLeafReader.ALL_FIELD_NAMES.contains(f) == false); - FieldsVisitor fieldVisitor = buildFieldsVisitors(storedFields, - forceSourceForComputingTranslogStoredFields ? FetchSourceContext.FETCH_SOURCE : fetchSourceContext); + FieldsVisitor fieldVisitor = buildFieldsVisitors(storedFields, fetchSourceContext); if (fieldVisitor != null) { try { docIdAndVersion.reader.document(docIdAndVersion.docId, fieldVisitor); @@ -223,54 +207,6 @@ private GetResult innerGetLoadFromStoredFields(String type, String id, String[] } source = fieldVisitor.source(); - // in case we read from translog, some extra steps are needed to make _source consistent and to load stored fields - if (get.isFromTranslog()) { - // Fast path: if only asked for the source or stored fields that have been already provided by TranslogLeafReader, - // just make source consistent by reapplying source filters from mapping (possibly also nulling the source) - if (forceSourceForComputingTranslogStoredFields == false) { - try { - source = indexShard.mapperService().documentMapper().sourceMapper().applyFilters(source, null); - } catch (IOException e) { - throw new ElasticsearchException("Failed to reapply filters for [" + id + "] after reading from translog", e); - } - } else { - // Slow path: recreate stored fields from original source - assert source != null : "original source in translog must exist"; - SourceToParse sourceToParse = new SourceToParse(shardId.getIndexName(), type, id, source, - XContentHelper.xContentType(source), fieldVisitor.routing(), Collections.emptyMap()); - MapperService mapperService = indexShard.mapperService(); - ParsedDocument doc = mapperService.documentParser().parseDocument(sourceToParse, mapperService.mappingLookup()); - assert doc.dynamicMappingsUpdate() == null : "mapping updates should not be required on already-indexed doc"; - // update special fields - doc.updateSeqID(docIdAndVersion.seqNo, docIdAndVersion.primaryTerm); - doc.version().setLongValue(docIdAndVersion.version); - - // retrieve stored fields from parsed doc - fieldVisitor = buildFieldsVisitors(storedFields, fetchSourceContext); - for (IndexableField indexableField : doc.rootDoc().getFields()) { - IndexableFieldType fieldType = indexableField.fieldType(); - if (fieldType.stored()) { - FieldInfo fieldInfo = new FieldInfo(indexableField.name(), 0, false, false, false, IndexOptions.NONE, - DocValuesType.NONE, -1, Collections.emptyMap(), 0, 0, 0, false); - StoredFieldVisitor.Status status = fieldVisitor.needsField(fieldInfo); - if (status == StoredFieldVisitor.Status.YES) { - if (indexableField.numericValue() != null) { - fieldVisitor.objectField(fieldInfo, indexableField.numericValue()); - } else if (indexableField.binaryValue() != null) { - fieldVisitor.binaryField(fieldInfo, indexableField.binaryValue()); - } else if (indexableField.stringValue() != null) { - fieldVisitor.objectField(fieldInfo, indexableField.stringValue()); - } - } else if (status == StoredFieldVisitor.Status.STOP) { - break; - } - } - } - // retrieve source (with possible transformations, e.g. source filters - source = fieldVisitor.source(); - } - } - // put stored fields into result objects if (fieldVisitor.fields().isEmpty() == false) { fieldVisitor.postProcess(mapperService::fieldType, mappingLookup.hasMappings() ? mappingLookup.getType() : null); @@ -310,15 +246,6 @@ private GetResult innerGetLoadFromStoredFields(String type, String id, String[] source = null; } - if (source != null && get.isFromTranslog()) { - // reapply source filters from mapping (possibly also nulling the source) - try { - source = docMapper.sourceMapper().applyFilters(source, null); - } catch (IOException e) { - throw new ElasticsearchException("Failed to reapply filters for [" + id + "] after reading from translog", e); - } - } - if (source != null && (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0)) { Map sourceAsMap; // TODO: The source might parsed and available in the sourceLookup but that one uses unordered maps so different. Do we care? diff --git a/server/src/main/java/org/elasticsearch/index/mapper/Mapping.java b/server/src/main/java/org/elasticsearch/index/mapper/Mapping.java index cafb088c6cf25..4ad684b4b8682 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/Mapping.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/Mapping.java @@ -106,7 +106,7 @@ Map, MetadataFieldMapper> getMetadataMapper /** Get the metadata mapper with the given class. */ @SuppressWarnings("unchecked") - T getMetadataMapperByClass(Class clazz) { + public T getMetadataMapperByClass(Class clazz) { return (T) metadataMappersMap.get(clazz); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index ea7244ff06d8a..a637cff03eb37 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1121,11 +1121,16 @@ public void testGetWithSearcherWrapper() throws Exception { MapperService mapperService = createMapperService(type); MappingLookup mappingLookup = mapperService.mappingLookup(); DocumentParser documentParser = mapperService.documentParser(); + LongSupplier translogGetCount = engine.translogGetCount::get; + long translogGetCountExpected = 0; + LongSupplier translogInMemorySegmentCount = engine.translogInMemorySegmentsCount::get; + long translogInMemorySegmentCountExpected = 0; try (Engine.GetResult get = engine.get(new Engine.Get(true, true, type, "1", newUid("1")), mappingLookup, documentParser, randomSearcherWrapper())) { // we do not track the translog location yet assertTrue(get.exists()); - assertFalse(get.isFromTranslog()); + assertEquals(translogGetCountExpected, translogGetCount.getAsLong()); + assertEquals(translogInMemorySegmentCountExpected, translogInMemorySegmentCount.getAsLong()); } // refresh triggered, as we did not track translog location until the first realtime get. assertThat(engine.lastRefreshedCheckpoint(), equalTo(0L)); @@ -1134,36 +1139,53 @@ public void testGetWithSearcherWrapper() throws Exception { try (Engine.GetResult get = engine.get(new Engine.Get(true, true, type, "1", newUid("1")), mappingLookup, documentParser, searcher -> searcher)) { assertTrue(get.exists()); - assertTrue(get.isFromTranslog()); + assertEquals(++translogGetCountExpected, translogGetCount.getAsLong()); + assertEquals(translogInMemorySegmentCountExpected, translogInMemorySegmentCount.getAsLong()); } assertThat(engine.lastRefreshedCheckpoint(), equalTo(0L)); // no refresh; just read from translog + if (randomBoolean()) { engine.index(indexForDoc(createParsedDoc("1", null))); } try (Engine.GetResult get = engine.get(new Engine.Get(true, true, type, "1", newUid("1")), mappingLookup, documentParser, searcher -> SearcherHelper.wrapSearcher(searcher, reader -> new MatchingDirectoryReader(reader, new MatchAllDocsQuery())))) { assertTrue(get.exists()); - assertFalse(get.isFromTranslog()); + assertEquals(++translogGetCountExpected, translogGetCount.getAsLong()); + assertEquals(translogInMemorySegmentCountExpected, translogInMemorySegmentCount.getAsLong()); } try (Engine.GetResult get = engine.get(new Engine.Get(true, true, type, "1", newUid("1")), mappingLookup, documentParser, searcher -> SearcherHelper.wrapSearcher(searcher, reader -> new MatchingDirectoryReader(reader, new MatchNoDocsQuery())))) { assertFalse(get.exists()); - assertFalse(get.isFromTranslog()); + assertEquals(++translogGetCountExpected, translogGetCount.getAsLong()); + assertEquals(translogInMemorySegmentCountExpected, translogInMemorySegmentCount.getAsLong()); } try (Engine.GetResult get = engine.get(new Engine.Get(true, true, type, "1", newUid("1")), mappingLookup, documentParser, searcher -> SearcherHelper.wrapSearcher(searcher, reader -> new MatchingDirectoryReader(reader, new TermQuery(newUid("1")))))) { assertTrue(get.exists()); - assertFalse(get.isFromTranslog()); + assertEquals(++translogGetCountExpected, translogGetCount.getAsLong()); + // term query on _id field is properly faked + assertEquals(translogInMemorySegmentCountExpected, translogInMemorySegmentCount.getAsLong()); } try (Engine.GetResult get = engine.get(new Engine.Get(true, true, type, "1", newUid("1")), mappingLookup, documentParser, searcher -> SearcherHelper.wrapSearcher(searcher, reader -> new MatchingDirectoryReader(reader, new TermQuery(newUid("2")))))) { assertFalse(get.exists()); - assertFalse(get.isFromTranslog()); + assertEquals(++translogGetCountExpected, translogGetCount.getAsLong()); + // term query on _id field is properly faked + assertEquals(translogInMemorySegmentCountExpected, translogInMemorySegmentCount.getAsLong()); } assertThat("no refresh, just read from translog or in-memory segment", engine.lastRefreshedCheckpoint(), equalTo(0L)); + + engine.index(indexForDoc(createParsedDoc("1", null))); + try (Engine.GetResult get = engine.get(new Engine.Get(true, true, type, "1", newUid("1")), mappingLookup, documentParser, + searcher -> SearcherHelper.wrapSearcher(searcher, reader -> new MatchingDirectoryReader(reader, + new TermQuery(new Term("other_field", Uid.encodeId("test"))))))) { + assertEquals(++translogGetCountExpected, translogGetCount.getAsLong()); + // term query on some other field needs in-memory index + assertEquals(++translogInMemorySegmentCountExpected, translogInMemorySegmentCount.getAsLong()); + } } public void testSearchResultRelease() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java index 81e9296c4b7d3..049e79c74faf8 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineTestCase; +import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.mapper.RoutingFieldMapper; @@ -21,6 +22,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.function.LongSupplier; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -39,11 +41,14 @@ public void testGetForUpdate() throws IOException { .primaryTerm(0, 1).build(); IndexShard primary = newShard(new ShardId(metadata.getIndex(), 0), true, "n1", metadata, null); recoverShardFromStore(primary); + LongSupplier translogInMemorySegmentCount = ((InternalEngine) primary.getEngine()).translogInMemorySegmentsCount::get; + long translogInMemorySegmentCountExpected = 0; Engine.IndexResult test = indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); assertTrue(primary.getEngine().refreshNeeded()); GetResult testGet = primary.getService().getForUpdate("test", "0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertFalse(testGet.getFields().containsKey(RoutingFieldMapper.NAME)); assertEquals(new String(testGet.source(), StandardCharsets.UTF_8), "{\"foo\" : \"bar\"}"); + assertEquals(translogInMemorySegmentCountExpected, translogInMemorySegmentCount.getAsLong()); try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { assertEquals(searcher.getIndexReader().maxDoc(), 1); // we refreshed } @@ -54,6 +59,7 @@ public void testGetForUpdate() throws IOException { assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME)); assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue()); + assertEquals(translogInMemorySegmentCountExpected, translogInMemorySegmentCount.getAsLong()); try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { assertEquals(searcher.getIndexReader().maxDoc(), 1); // we read from the translog } @@ -69,10 +75,12 @@ public void testGetForUpdate() throws IOException { assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME)); assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue()); + assertEquals(translogInMemorySegmentCountExpected, translogInMemorySegmentCount.getAsLong()); final long primaryTerm = primary.getOperationPrimaryTerm(); testGet1 = primary.getService().getForUpdate("test", "1", test2.getSeqNo(), primaryTerm); assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); + assertEquals(translogInMemorySegmentCountExpected, translogInMemorySegmentCount.getAsLong()); expectThrows(VersionConflictEngineException.class, () -> primary.getService().getForUpdate("test", "1", test2.getSeqNo() + 1, primaryTerm)); @@ -109,6 +117,8 @@ private void runGetFromTranslogWithOptions(String docToIndex, String sourceOptio .primaryTerm(0, 1).build(); IndexShard primary = newShard(new ShardId(metadata.getIndex(), 0), true, "n1", metadata, EngineTestCase.randomReaderWrapper()); recoverShardFromStore(primary); + LongSupplier translogInMemorySegmentCount = ((InternalEngine) primary.getEngine()).translogInMemorySegmentsCount::get; + long translogInMemorySegmentCountExpected = 0; Engine.IndexResult test = indexDoc(primary, "test", "0", docToIndex); assertTrue(primary.getEngine().refreshNeeded()); GetResult testGet = primary.getService().getForUpdate("test", "0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); @@ -124,6 +134,7 @@ private void runGetFromTranslogWithOptions(String docToIndex, String sourceOptio assertEquals(new String(testGet1.source() == null ? new byte[0] : testGet1.source(), StandardCharsets.UTF_8), expectedResult); assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME)); assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue()); + assertEquals(translogInMemorySegmentCountExpected, translogInMemorySegmentCount.getAsLong()); try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { assertEquals(searcher.getIndexReader().maxDoc(), 1); // we read from the translog } @@ -140,6 +151,7 @@ private void runGetFromTranslogWithOptions(String docToIndex, String sourceOptio assertTrue(testGet2.getFields().containsKey(RoutingFieldMapper.NAME)); assertTrue(testGet2.getFields().containsKey("foo")); assertEquals(expectedFooVal, testGet2.getFields().get("foo").getValue()); + assertEquals(++translogInMemorySegmentCountExpected, translogInMemorySegmentCount.getAsLong()); try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { assertEquals(searcher.getIndexReader().maxDoc(), 2); // we read from the translog } @@ -154,6 +166,7 @@ private void runGetFromTranslogWithOptions(String docToIndex, String sourceOptio assertTrue(testGet2.getFields().containsKey(RoutingFieldMapper.NAME)); assertTrue(testGet2.getFields().containsKey("foo")); assertEquals(expectedFooVal, testGet2.getFields().get("foo").getValue()); + assertEquals(translogInMemorySegmentCountExpected, translogInMemorySegmentCount.getAsLong()); closeShards(primary); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 43b3602890dbc..8f00af1165489 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -1241,6 +1241,7 @@ public MatchingDirectoryReader(DirectoryReader in, Query query) throws IOExcepti public LeafReader wrap(LeafReader leaf) { try { final IndexSearcher searcher = new IndexSearcher(leaf); + searcher.setQueryCache(null); final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); final Scorer scorer = weight.scorer(leaf.getContext()); final DocIdSetIterator iterator = scorer != null ? scorer.iterator() : null; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetBitsetCache.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetBitsetCache.java index b0585e216242a..b3b60078c5efa 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetBitsetCache.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetBitsetCache.java @@ -217,7 +217,11 @@ public long ramBytesUsed() { public BitSet getBitSet(final Query query, final LeafReaderContext context) throws ExecutionException { final IndexReader.CacheHelper coreCacheHelper = context.reader().getCoreCacheHelper(); if (coreCacheHelper == null) { - throw new IllegalArgumentException("Reader " + context.reader() + " does not support caching"); + try { + return computeBitSet(query, context); + } catch (IOException e) { + throw new ExecutionException(e); + } } coreCacheHelper.addClosedListener(this); final IndexReader.CacheKey indexKey = coreCacheHelper.getKey(); @@ -233,27 +237,21 @@ public BitSet getBitSet(final Query query, final LeafReaderContext context) thro set.add(cacheKey); return set; }); - final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context); - final IndexSearcher searcher = new IndexSearcher(topLevelContext); - searcher.setQueryCache(null); - final Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1f); - Scorer s = weight.scorer(context); - if (s == null) { + final BitSet result = computeBitSet(query, context); + if (result == null) { // A cache loader is not allowed to return null, return a marker object instead. return NULL_MARKER; - } else { - final BitSet bs = bitSetFromDocIterator(s.iterator(), context.reader().maxDoc()); - final long bitSetBytes = bs.ramBytesUsed(); - if (bitSetBytes > this.maxWeightBytes) { - logger.warn("built a DLS BitSet that uses [{}] bytes; the DLS BitSet cache has a maximum size of [{}] bytes;" + - " this object cannot be cached and will need to be rebuilt for each use;" + - " consider increasing the value of [{}]", - bitSetBytes, maxWeightBytes, CACHE_SIZE_SETTING.getKey()); - } else if (bitSetBytes + bitsetCache.weight() > maxWeightBytes) { - maybeLogCacheFullWarning(); - } - return bs; } + final long bitSetBytes = result.ramBytesUsed(); + if (bitSetBytes > this.maxWeightBytes) { + logger.warn("built a DLS BitSet that uses [{}] bytes; the DLS BitSet cache has a maximum size of [{}] bytes;" + + " this object cannot be cached and will need to be rebuilt for each use;" + + " consider increasing the value of [{}]", + bitSetBytes, maxWeightBytes, CACHE_SIZE_SETTING.getKey()); + } else if (bitSetBytes + bitsetCache.weight() > maxWeightBytes) { + maybeLogCacheFullWarning(); + } + return result; }); if (bitSet == NULL_MARKER) { return null; @@ -263,6 +261,20 @@ public BitSet getBitSet(final Query query, final LeafReaderContext context) thro } } + @Nullable + private BitSet computeBitSet(Query query, LeafReaderContext context) throws IOException { + final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context); + final IndexSearcher searcher = new IndexSearcher(topLevelContext); + searcher.setQueryCache(null); + final Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1f); + final Scorer s = weight.scorer(context); + if (s == null) { + return null; + } else { + return bitSetFromDocIterator(s.iterator(), context.reader().maxDoc()); + } + } + private void maybeLogCacheFullWarning() { final long nextLogTime = cacheFullWarningTime.get(); final long now = System.currentTimeMillis(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetReader.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetReader.java index 973cf28a24db3..579d927c61eec 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetReader.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetReader.java @@ -86,7 +86,7 @@ private static int computeNumDocs(LeafReader reader, BitSet roleQueryBits) { private static int getNumDocs(LeafReader reader, Query roleQuery, BitSet roleQueryBits) throws IOException, ExecutionException { IndexReader.CacheHelper cacheHelper = reader.getReaderCacheHelper(); // this one takes deletes into account if (cacheHelper == null) { - throw new IllegalStateException("Reader " + reader + " does not support caching"); + return computeNumDocs(reader, roleQueryBits); } final boolean[] added = new boolean[] { false }; Cache perReaderCache = NUM_DOCS_CACHE.computeIfAbsent(cacheHelper.getKey(),