From 87db681382b8ec2874a88edb46a911dc16482ecb Mon Sep 17 00:00:00 2001 From: marregui Date: Thu, 28 Nov 2019 16:01:36 +0100 Subject: [PATCH] Rebuild version map when opening internal engine With this change, we will rebuild the live version map and local checkpoint using documents (including soft-deleted) from the safe commit when opening an internal engine. This allows us to safely prune away _id of all soft-deleted documents as the version map is always in-sync with Lucene index. Relates #40741 (https://github.com/elastic/elasticsearch/pull/40741) Supersedes #42979 (https://github.com/elastic/elasticsearch/issues/42979) Port of https://github.com/elastic/elasticsearch/pull/43202 (cherry picked from commit bb23cb7b200dbc9c6425fcc8a5bd35e253e15fb3) # Conflicts: # es/es-server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java # es/es-server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java --- .../elasticsearch/common/lucene/Lucene.java | 39 ----- .../index/engine/CombinedDocValues.java | 93 ++++++++++ .../index/engine/InternalEngine.java | 110 ++++++++++-- .../index/engine/LuceneChangesSnapshot.java | 63 ------- .../index/fieldvisitor/IDVisitor.java | 87 ++++++++++ .../index/engine/InternalEngineTests.java | 161 ++++++++++++++++-- .../index/engine/EngineTestCase.java | 16 +- .../doc/lucene/IdCollectorExpression.java | 44 +---- 8 files changed, 442 insertions(+), 171 deletions(-) create mode 100644 es/es-server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java create mode 100644 es/es-server/src/main/java/org/elasticsearch/index/fieldvisitor/IDVisitor.java diff --git a/es/es-server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/es/es-server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 5be07abceec2..f3772abee8aa 100644 --- a/es/es-server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/es/es-server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -26,7 +26,6 @@ import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.PostingsFormat; -import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; @@ -42,7 +41,6 @@ import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NoMergePolicy; -import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; @@ -67,7 +65,6 @@ import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.analysis.AnalyzerScope; import org.elasticsearch.index.analysis.NamedAnalyzer; -import org.elasticsearch.index.mapper.SeqNoFieldMapper; import java.io.IOException; import java.text.ParseException; @@ -76,7 +73,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.function.LongConsumer; public class Lucene { public static final String LATEST_DOC_VALUES_FORMAT = "Lucene80"; @@ -519,39 +515,4 @@ public CacheHelper getReaderCacheHelper() { public static NumericDocValuesField newSoftDeletesField() { return new NumericDocValuesField(SOFT_DELETES_FIELD, 1); } - - /** - * Scans sequence numbers (i.e., {@link SeqNoFieldMapper#NAME}) between {@code fromSeqNo}(inclusive) and {@code toSeqNo}(inclusive) - * in the provided directory reader. This method invokes the callback {@code onNewSeqNo} whenever a sequence number value is found. - * - * @param directoryReader the directory reader to scan - * @param fromSeqNo the lower bound of a range of seq_no to scan (inclusive) - * @param toSeqNo the upper bound of a range of seq_no to scan (inclusive) - * @param onNewSeqNo the callback to be called whenever a new valid sequence number is found - */ - public static void scanSeqNosInReader(DirectoryReader directoryReader, long fromSeqNo, long toSeqNo, - LongConsumer onNewSeqNo) throws IOException { - final DirectoryReader reader = Lucene.wrapAllDocsLive(directoryReader); - final IndexSearcher searcher = new IndexSearcher(reader); - searcher.setQueryCache(null); - final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo); - final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); - for (LeafReaderContext leaf : reader.leaves()) { - final Scorer scorer = weight.scorer(leaf); - if (scorer == null) { - continue; - } - final DocIdSetIterator docIdSetIterator = scorer.iterator(); - final NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); - int docId; - while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - if (seqNoDocValues == null || seqNoDocValues.advanceExact(docId) == false) { - throw new IllegalStateException("seq_no doc_values not found for doc_id=" + docId); - } - final long seqNo = seqNoDocValues.longValue(); - assert fromSeqNo <= seqNo && seqNo <= toSeqNo : "from_seq_no=" + fromSeqNo + " seq_no=" + seqNo + " to_seq_no=" + toSeqNo; - onNewSeqNo.accept(seqNo); - } - } - } } diff --git a/es/es-server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java b/es/es-server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java new file mode 100644 index 000000000000..456231814118 --- /dev/null +++ b/es/es-server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java @@ -0,0 +1,93 @@ +/* + * Licensed to Crate under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional + * information regarding copyright ownership. Crate licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + * However, if you have executed another commercial license agreement + * with Crate these terms will supersede the license and you may use the + * software solely pursuant to the terms of the relevant commercial + * agreement. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.NumericDocValues; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.VersionFieldMapper; + +import java.io.IOException; +import java.util.Objects; + +final class CombinedDocValues { + private final NumericDocValues versionDV; + private final NumericDocValues seqNoDV; + private final NumericDocValues primaryTermDV; + private final NumericDocValues tombstoneDV; + private final NumericDocValues recoverySource; + + CombinedDocValues(LeafReader leafReader) throws IOException { + this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing"); + this.seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing"); + this.primaryTermDV = Objects.requireNonNull( + leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME), "PrimaryTermDV is missing"); + this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); + this.recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME); + } + + long docVersion(int segmentDocId) throws IOException { + assert versionDV.docID() < segmentDocId; + if (versionDV.advanceExact(segmentDocId) == false) { + throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found"); + } + return versionDV.longValue(); + } + + long docSeqNo(int segmentDocId) throws IOException { + assert seqNoDV.docID() < segmentDocId; + if (seqNoDV.advanceExact(segmentDocId) == false) { + throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found"); + } + return seqNoDV.longValue(); + } + + long docPrimaryTerm(int segmentDocId) throws IOException { + if (primaryTermDV == null) { + return -1L; + } + assert primaryTermDV.docID() < segmentDocId; + // Use -1 for docs which don't have primary term. The caller considers those docs as nested docs. + if (primaryTermDV.advanceExact(segmentDocId) == false) { + return -1; + } + return primaryTermDV.longValue(); + } + + boolean isTombstone(int segmentDocId) throws IOException { + if (tombstoneDV == null) { + return false; + } + assert tombstoneDV.docID() < segmentDocId; + return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0; + } + + boolean hasRecoverySource(int segmentDocId) throws IOException { + if (recoverySource == null) { + return false; + } + assert recoverySource.docID() < segmentDocId; + return recoverySource.advanceExact(segmentDocId); + } +} diff --git a/es/es-server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/es/es-server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index bf5d38ba390e..edac8def634c 100644 --- a/es/es-server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/es/es-server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.document.Field; +import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; @@ -29,17 +30,23 @@ import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; import org.apache.lucene.search.SearcherFactory; import org.apache.lucene.search.SearcherManager; import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.Weight; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.LockObtainFailedException; @@ -63,12 +70,14 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.fieldvisitor.IDVisitor; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.SeqNoStats; @@ -84,7 +93,6 @@ import java.io.Closeable; import java.io.IOException; import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -99,7 +107,11 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.LongSupplier; +<<<<<<< HEAD import java.util.function.Supplier; +======= +import java.util.stream.Collectors; +>>>>>>> bb23cb7b20... Rebuild version map when opening internal engine import java.util.stream.Stream; public class InternalEngine extends Engine { @@ -230,6 +242,15 @@ public InternalEngine(EngineConfig engineConfig) { this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint()); this.internalSearcherManager.addListener(lastRefreshedCheckpointListener); maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo())); + if (softDeleteEnabled && localCheckpointTracker.getPersistedCheckpoint() < localCheckpointTracker.getMaxSeqNo()) { + try (Searcher searcher = + acquireSearcher("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL)) { + restoreVersionMapAndCheckpointTracker(Lucene.wrapAllDocsLive(searcher.getDirectoryReader())); + } catch (IOException e) { + throw new EngineCreationFailureException(config().getShardId(), + "failed to restore version map and local checkpoint tracker", e); + } + } success = true; } finally { if (success == false) { @@ -673,21 +694,26 @@ enum OpVsLuceneDocStatus { LUCENE_DOC_NOT_FOUND } + private static OpVsLuceneDocStatus compareOpToVersionMapOnSeqNo(String id, long seqNo, long primaryTerm, VersionValue versionValue) { + Objects.requireNonNull(versionValue); + if (seqNo > versionValue.seqNo) { + return OpVsLuceneDocStatus.OP_NEWER; + } else if (seqNo == versionValue.seqNo) { + assert versionValue.term == primaryTerm : "primary term not matched; id=" + id + " seq_no=" + seqNo + + " op_term=" + primaryTerm + " existing_term=" + versionValue.term; + return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + } else { + return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + } + } + private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found"; final OpVsLuceneDocStatus status; VersionValue versionValue = getVersionFromMap(op.uid().bytes()); assert incrementVersionLookup(); if (versionValue != null) { - if (op.seqNo() > versionValue.seqNo) { - status = OpVsLuceneDocStatus.OP_NEWER; - } else if (op.seqNo() == versionValue.seqNo) { - assert versionValue.term == op.primaryTerm() : "primary term not matched; id=" + op.id() + " seq_no=" + op.seqNo() - + " op_term=" + op.primaryTerm() + " existing_term=" + versionValue.term; - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; - } else { - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; - } + status = compareOpToVersionMapOnSeqNo(op.id(), op.seqNo(), op.primaryTerm(), versionValue); } else { // load from index assert incrementIndexVersionLookup(); @@ -1903,8 +1929,9 @@ void clearDeletedTombstones() { } // for testing - final Collection getDeletedTombstones() { - return versionMap.getAllTombstones().values(); + final Map getVersionMap() { + return Stream.concat(versionMap.getAllCurrent().entrySet().stream(), versionMap.getAllTombstones().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } @Override @@ -2502,10 +2529,6 @@ private boolean incrementIndexVersionLookup() { return true; } - int getVersionMapSize() { - return versionMap.getAllCurrent().size(); - } - boolean isSafeAccessRequired() { return versionMap.isSafeAccessRequired(); } @@ -2743,4 +2766,59 @@ private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean a assert seqNo <= maxSeqNoOfUpdates : "id=" + id + " seq_no=" + seqNo + " msu=" + maxSeqNoOfUpdates; return true; } + + /** + * Restores the live version map and local checkpoint of this engine using documents (including soft-deleted) + * after the local checkpoint in the safe commit. This step ensures the live version map and checkpoint tracker + * are in sync with the Lucene commit. + */ + private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryReader) throws IOException { + final IndexSearcher searcher = new IndexSearcher(directoryReader); + searcher.setQueryCache(null); + final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE); + final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); + for (LeafReaderContext leaf : directoryReader.leaves()) { + final Scorer scorer = weight.scorer(leaf); + if (scorer == null) { + continue; + } + final CombinedDocValues dv = new CombinedDocValues(leaf.reader()); + final IDVisitor idFieldVisitor = new IDVisitor(IdFieldMapper.NAME); + final DocIdSetIterator iterator = scorer.iterator(); + int docId; + while ((docId = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + final long primaryTerm = dv.docPrimaryTerm(docId); + if (primaryTerm == -1L) { + continue; // skip children docs which do not have primary term + } + final long seqNo = dv.docSeqNo(docId); + localCheckpointTracker.markSeqNoAsProcessed(seqNo); + localCheckpointTracker.markSeqNoAsPersisted(seqNo); + idFieldVisitor.reset(); + leaf.reader().document(docId, idFieldVisitor); + if (idFieldVisitor.getId() == null) { + assert dv.isTombstone(docId); + continue; + } + final BytesRef uid = new Term(IdFieldMapper.NAME, Uid.encodeId(idFieldVisitor.getId())).bytes(); + try (Releasable ignored = versionMap.acquireLock(uid)) { + final VersionValue curr = versionMap.getUnderLock(uid); + if (curr == null || + compareOpToVersionMapOnSeqNo(idFieldVisitor.getId(), seqNo, primaryTerm, curr) == OpVsLuceneDocStatus.OP_NEWER) { + if (dv.isTombstone(docId)) { + // use 0L for the start time so we can prune this delete tombstone quickly + // when the local checkpoint advances (i.e., after a recovery completed). + final long startTime = 0L; + versionMap.putDeleteUnderLock(uid, new DeleteVersionValue(dv.docVersion(docId), seqNo, primaryTerm, startTime)); + } else { + versionMap.putIndexUnderLock(uid, new IndexVersionValue(null, dv.docVersion(docId), seqNo, primaryTerm)); + } + } + } + } + } + // remove live entries in the version map + refresh("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL); + } + } diff --git a/es/es-server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/es/es-server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index a967d845fbf9..bd0292048550 100644 --- a/es/es-server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/es/es-server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -40,14 +40,12 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.Uid; -import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.translog.Translog; import java.io.Closeable; import java.io.IOException; import java.util.Comparator; import java.util.List; -import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -306,65 +304,4 @@ private static final class ParallelArray { leafReaderContexts = new LeafReaderContext[size]; } } - - private static final class CombinedDocValues { - private final NumericDocValues versionDV; - private final NumericDocValues seqNoDV; - private final NumericDocValues primaryTermDV; - private final NumericDocValues tombstoneDV; - private final NumericDocValues recoverySource; - - CombinedDocValues(LeafReader leafReader) throws IOException { - this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing"); - this.seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing"); - this.primaryTermDV = Objects.requireNonNull( - leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME), "PrimaryTermDV is missing"); - this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); - this.recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME); - } - - long docVersion(int segmentDocId) throws IOException { - assert versionDV.docID() < segmentDocId; - if (versionDV.advanceExact(segmentDocId) == false) { - throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found"); - } - return versionDV.longValue(); - } - - long docSeqNo(int segmentDocId) throws IOException { - assert seqNoDV.docID() < segmentDocId; - if (seqNoDV.advanceExact(segmentDocId) == false) { - throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found"); - } - return seqNoDV.longValue(); - } - - long docPrimaryTerm(int segmentDocId) throws IOException { - if (primaryTermDV == null) { - return -1L; - } - assert primaryTermDV.docID() < segmentDocId; - // Use -1 for docs which don't have primary term. The caller considers those docs as nested docs. - if (primaryTermDV.advanceExact(segmentDocId) == false) { - return -1; - } - return primaryTermDV.longValue(); - } - - boolean isTombstone(int segmentDocId) throws IOException { - if (tombstoneDV == null) { - return false; - } - assert tombstoneDV.docID() < segmentDocId; - return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0; - } - - boolean hasRecoverySource(int segmentDocId) throws IOException { - if (recoverySource == null) { - return false; - } - assert recoverySource.docID() < segmentDocId; - return recoverySource.advanceExact(segmentDocId); - } - } } diff --git a/es/es-server/src/main/java/org/elasticsearch/index/fieldvisitor/IDVisitor.java b/es/es-server/src/main/java/org/elasticsearch/index/fieldvisitor/IDVisitor.java new file mode 100644 index 000000000000..902200efc062 --- /dev/null +++ b/es/es-server/src/main/java/org/elasticsearch/index/fieldvisitor/IDVisitor.java @@ -0,0 +1,87 @@ +/* + * Licensed to Crate under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional + * information regarding copyright ownership. Crate licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + * However, if you have executed another commercial license agreement + * with Crate these terms will supersede the license and you may use the + * software solely pursuant to the terms of the relevant commercial + * agreement. + */ + +package org.elasticsearch.index.fieldvisitor; + +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.StoredFieldVisitor; +import org.elasticsearch.index.mapper.Uid; + +import java.nio.charset.StandardCharsets; + +public class IDVisitor extends StoredFieldVisitor { + + private boolean canStop = false; + private String id; + private final String columnName; + + public IDVisitor(String columnName) { + this.columnName = columnName; + } + + public String getColumnName() { + return columnName; + } + + public void setCanStop(boolean canStop) { + this.canStop = canStop; + } + + + public boolean canStop() { + return canStop; + } + + public String getId() { + return id; + } + + @Override + public Status needsField(FieldInfo fieldInfo) { + if (canStop) { + return Status.STOP; + } + if (columnName.equals(fieldInfo.name)) { + canStop = true; + return Status.YES; + } + return Status.NO; + } + + @Override + public void binaryField(FieldInfo fieldInfo, byte[] value) { + assert columnName.equals(fieldInfo.name) : "binaryField must only be called for id"; + id = Uid.decodeId(value); + } + + @Override + public void stringField(FieldInfo fieldInfo, byte[] value) { + assert columnName.equals(fieldInfo.name) : "stringField must only be called for id"; + // Indices prior to CrateDB 3.0 have id stored as string + id = new String(value, StandardCharsets.UTF_8); + } + + public void reset() { + id = null; + canStop = false; + } +} diff --git a/es/es-server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/es/es-server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 05ac4c6fab8d..f48445990ac5 100644 --- a/es/es-server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/es/es-server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -195,7 +195,7 @@ public void testVersionMapAfterAutoIDDocument() throws IOException { Engine.Index update = indexForDoc(doc); engine.index(update); assertTrue(engine.isSafeAccessRequired()); - assertEquals(1, engine.getVersionMapSize()); + assertThat(engine.getVersionMap().values(), hasSize(1)); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { assertEquals(0, searcher.reader().numDocs()); } @@ -227,7 +227,7 @@ public void testVersionMapAfterAutoIDDocument() throws IOException { : appendOnlyReplica(doc, false, 1, generateNewSeqNo(engine)); engine.index(operation); assertTrue("safe access should be required", engine.isSafeAccessRequired()); - assertEquals(1, engine.getVersionMapSize()); // now we add this to the map + assertThat(engine.getVersionMap().values(), hasSize(1)); // now we add this to the map engine.refresh("test"); if (randomBoolean()) { // randomly refresh here again engine.refresh("test"); @@ -4588,18 +4588,19 @@ public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception { engine.delete(replicaDeleteForDoc(UUIDs.randomBase64UUID(), 1, seqno, threadPool.relativeTimeInMillis())); } } - List tombstones = new ArrayList<>(engine.getDeletedTombstones()); + + List tombstones = new ArrayList<>(tombstonesInVersionMap(engine).values()); engine.config().setEnableGcDeletes(true); // Prune tombstones whose seqno < gap_seqno and timestamp < clock-gcInterval. clock.set(randomLongBetween(gcInterval, deleteBatch + gcInterval)); engine.refresh("test"); tombstones.removeIf(v -> v.seqNo < gapSeqNo && v.time < clock.get() - gcInterval); - assertThat(engine.getDeletedTombstones(), containsInAnyOrder(tombstones.toArray())); + assertThat(tombstonesInVersionMap(engine).values(), containsInAnyOrder(tombstones.toArray())); // Prune tombstones whose seqno at most the local checkpoint (eg. seqno < gap_seqno). clock.set(randomLongBetween(deleteBatch + gcInterval * 4/3, 100)); // Need a margin for gcInterval/4. engine.refresh("test"); tombstones.removeIf(v -> v.seqNo < gapSeqNo); - assertThat(engine.getDeletedTombstones(), containsInAnyOrder(tombstones.toArray())); + assertThat(tombstonesInVersionMap(engine).values(), containsInAnyOrder(tombstones.toArray())); // Fill the seqno gap - should prune all tombstones. clock.set(between(0, 100)); if (randomBoolean()) { @@ -4611,7 +4612,7 @@ public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception { } clock.set(randomLongBetween(100 + gcInterval * 4/3, Long.MAX_VALUE)); // Need a margin for gcInterval/4. engine.refresh("test"); - assertThat(engine.getDeletedTombstones(), empty()); + assertThat(tombstonesInVersionMap(engine).values(), empty()); } } @@ -5017,9 +5018,10 @@ public void testTrackMaxSeqNoOfUpdatesOrDeletesOnPrimary() throws Exception { } @Test - public void testRebuildLocalCheckpointTracker() throws Exception { + public void testRebuildLocalCheckpointTrackerAndVersionMap() throws Exception { Settings.Builder settings = Settings.builder() .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10000) .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); @@ -5053,22 +5055,34 @@ public void testRebuildLocalCheckpointTracker() throws Exception { engine.syncTranslog(); docs = getDocIds(engine, true); } +<<<<<<< HEAD trimUnsafeCommits(config); List safeCommit = null; +======= + List operationsInSafeCommit = null; +>>>>>>> bb23cb7b20... Rebuild version map when opening internal engine for (int i = commits.size() - 1; i >= 0; i--) { if (commits.get(i).stream().allMatch(op -> op.seqNo() <= globalCheckpoint.get())) { - safeCommit = commits.get(i); + operationsInSafeCommit = commits.get(i); break; } } - assertThat(safeCommit, notNullValue()); + assertThat(operationsInSafeCommit, notNullValue()); try (InternalEngine engine = new InternalEngine(config)) { // do not recover from translog final LocalCheckpointTracker tracker = engine.getLocalCheckpointTracker(); - final Set seqNosInSafeCommit = safeCommit.stream().map(op -> op.seqNo()).collect(Collectors.toSet()); + final Set seqNosInSafeCommit = operationsInSafeCommit.stream().map(op -> op.seqNo()).collect(Collectors.toSet()); for (Engine.Operation op : operations) { +<<<<<<< HEAD assertThat("seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + " checkpoint=" + tracker.getCheckpoint(), tracker.contains(op.seqNo()), equalTo(seqNosInSafeCommit.contains(op.seqNo()))); +======= + boolean hasProcessedSeqNo = tracker.hasProcessed(op.seqNo()); + boolean seqNoIsInSafeCommit = seqNosInSafeCommit.contains(op.seqNo()); + assertThat("seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + " checkpoint=" + tracker.getProcessedCheckpoint() + " hasProcessedSeqNo=" + hasProcessedSeqNo + " seqNoIsInSafeCommit=" + seqNoIsInSafeCommit, + hasProcessedSeqNo, + equalTo(seqNoIsInSafeCommit)); +>>>>>>> bb23cb7b20... Rebuild version map when opening internal engine } engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertThat(getDocIds(engine, true), equalTo(docs)); @@ -5154,6 +5168,133 @@ public void testMaxSeqNoInCommitUserData() throws Exception { assertMaxSeqNoInCommitUserData(engine); } +<<<<<<< HEAD +======= + @Test + public void testNoOpOnClosingEngine() throws Exception { + engine.close(); + Settings settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( + IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build()); + assertTrue(indexSettings.isSoftDeleteEnabled()); + try (Store store = createStore(); + InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))) { + engine.close(); + expectThrows(AlreadyClosedException.class, () -> engine.noOp( + new Engine.NoOp(2, primaryTerm.get(), LOCAL_TRANSLOG_RECOVERY, System.nanoTime(), "reason"))); + } + } + + @Test + public void testRecoverFromLocalTranslog() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + Path translogPath = createTempDir(); + List operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean()); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get); + final List docs; + try (InternalEngine engine = createEngine(config)) { + for (Engine.Operation op : operations) { + applyOperation(engine, op); + if (randomBoolean()) { + engine.syncTranslog(); + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getPersistedLocalCheckpoint())); + } + if (randomInt(100) < 10) { + engine.refresh("test"); + } + if (randomInt(100) < 5) { + engine.flush(); + } + if (randomInt(100) < 5) { + engine.forceMerge(randomBoolean(), 1, false, false, false); + } + } + docs = getDocIds(engine, true); + } + try (InternalEngine engine = new InternalEngine(config)) { + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + assertThat(getDocIds(engine, randomBoolean()), equalTo(docs)); + } + } + } + + private Map tombstonesInVersionMap(InternalEngine engine) { + return engine.getVersionMap().entrySet().stream() + .filter(e -> e.getValue() instanceof DeleteVersionValue) + .collect(Collectors.toMap(e -> e.getKey(), e -> (DeleteVersionValue) e.getValue())); + } + + @Test + public void testNoOpFailure() throws IOException { + engine.close(); + final Settings settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( + IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build()); + try (Store store = createStore(); + Engine engine = createEngine((dir, iwc) -> new IndexWriter(dir, iwc) { + + @Override + public long addDocument(Iterable doc) { + throw new IllegalArgumentException("fatal"); + } + + }, null, null, config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))) { + final Engine.NoOp op = new Engine.NoOp(0, 0, PRIMARY, System.currentTimeMillis(), "test"); + final IllegalArgumentException e = expectThrows(IllegalArgumentException. class, () -> engine.noOp(op)); + assertThat(e.getMessage(), equalTo("fatal")); + assertTrue(engine.isClosed.get()); + assertThat(engine.failedEngine.get(), not(nullValue())); + assertThat(engine.failedEngine.get(), instanceOf(IllegalArgumentException.class)); + assertThat(engine.failedEngine.get().getMessage(), equalTo("fatal")); + } + } + + @Test + public void testDeleteDocumentFailuresShouldFailEngine() throws IOException { + engine.close(); + + final Settings settings = Settings.builder() + .put(defaultSettings.getSettings()) + .build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( + IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build()); + final AtomicReference iw = new AtomicReference<>(); + try (Store store = createStore(); + InternalEngine engine = createEngine( + (dir, iwc) -> { + iw.set(new ThrowingIndexWriter(dir, iwc)); + return iw.get(); + }, + null, + null, + config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))) { + + engine.index(new Engine.Index( + newUid("0"), InternalEngineTests.createParsedDoc("0", null), UNASSIGNED_SEQ_NO, primaryTerm.get(), + Versions.MATCH_DELETED, VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0)); + + Engine.Delete op = new Engine.Delete( + "_doc", "0", newUid("0"), UNASSIGNED_SEQ_NO, + primaryTerm.get(), Versions.MATCH_ANY, VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, System.nanoTime(), UNASSIGNED_SEQ_NO, 0); + + iw.get().setThrowFailure(() -> new IllegalArgumentException("fatal")); + final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> engine.delete(op)); + assertThat(e.getMessage(), equalTo("fatal")); + assertThat(engine.isClosed.get(), is(true)); + assertThat(engine.failedEngine.get(), not(nullValue())); + assertThat(engine.failedEngine.get(), instanceOf(IllegalArgumentException.class)); + assertThat(engine.failedEngine.get().getMessage(), equalTo("fatal")); + } + } + +>>>>>>> bb23cb7b20... Rebuild version map when opening internal engine private static void trimUnsafeCommits(EngineConfig config) throws IOException { final Store store = config.getStore(); final TranslogConfig translogConfig = config.getTranslogConfig(); diff --git a/es/es-testing/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/es/es-testing/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index d48b37f9c5af..c7e85df972f0 100644 --- a/es/es-testing/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/es/es-testing/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -37,6 +37,7 @@ import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.Term; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.ReferenceManager; @@ -1014,10 +1015,8 @@ public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exceptio List commits = DirectoryReader.listCommits(engine.store.directory()); for (IndexCommit commit : commits) { try (DirectoryReader reader = DirectoryReader.open(commit)) { - AtomicLong maxSeqNoFromDocs = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - Lucene.scanSeqNosInReader(reader, 0, Long.MAX_VALUE, n -> maxSeqNoFromDocs.set(Math.max(n, maxSeqNoFromDocs.get()))); assertThat(Long.parseLong(commit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)), - greaterThanOrEqualTo(maxSeqNoFromDocs.get())); + greaterThanOrEqualTo(maxSeqNosInReader(reader))); } } } @@ -1064,4 +1063,15 @@ public long getAsLong() { return get(); } } + + static long maxSeqNosInReader(DirectoryReader reader) throws IOException { + long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; + for (LeafReaderContext leaf : reader.leaves()) { + final NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + while (seqNoDocValues.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNoDocValues.longValue()); + } + } + return maxSeqNo; + } } diff --git a/sql/src/main/java/io/crate/expression/reference/doc/lucene/IdCollectorExpression.java b/sql/src/main/java/io/crate/expression/reference/doc/lucene/IdCollectorExpression.java index 05718053e499..be3faeac34fc 100644 --- a/sql/src/main/java/io/crate/expression/reference/doc/lucene/IdCollectorExpression.java +++ b/sql/src/main/java/io/crate/expression/reference/doc/lucene/IdCollectorExpression.java @@ -22,19 +22,15 @@ package io.crate.expression.reference.doc.lucene; import io.crate.metadata.doc.DocSysColumns; -import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.StoredFieldVisitor; -import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.fieldvisitor.IDVisitor; import java.io.IOException; -import java.nio.charset.StandardCharsets; public final class IdCollectorExpression extends LuceneCollectorExpression { - public static final String COLUMN_NAME = DocSysColumns.ID.name(); - private final IDVisitor visitor = new IDVisitor(); + private final IDVisitor visitor = new IDVisitor(DocSysColumns.ID.name()); private LeafReader reader; public IdCollectorExpression() { @@ -42,49 +38,17 @@ public IdCollectorExpression() { @Override public void setNextDocId(int docId) throws IOException { - visitor.canStop = false; + visitor.setCanStop(false); reader.document(docId, visitor); } @Override public String value() { - return visitor.id; + return visitor.getId(); } @Override public void setNextReader(LeafReaderContext context) { reader = context.reader(); } - - - private static class IDVisitor extends StoredFieldVisitor { - - private boolean canStop = false; - private String id; - - @Override - public Status needsField(FieldInfo fieldInfo) { - if (canStop) { - return Status.STOP; - } - if (COLUMN_NAME.equals(fieldInfo.name)) { - canStop = true; - return Status.YES; - } - return Status.NO; - } - - @Override - public void binaryField(FieldInfo fieldInfo, byte[] value) { - assert COLUMN_NAME.equals(fieldInfo.name) : "binaryField must only be called for id"; - id = Uid.decodeId(value); - } - - @Override - public void stringField(FieldInfo fieldInfo, byte[] value) { - assert COLUMN_NAME.equals(fieldInfo.name) : "stringField must only be called for id"; - // Indices prior to CrateDB 3.0 have id stored as string - id = new String(value, StandardCharsets.UTF_8); - } - } }