Skip to content

Commit

Permalink
Rebuild version map when opening internal engine
Browse files Browse the repository at this point in the history
With this change, we will rebuild the live version map and local checkpoint using
documents (including soft-deleted) from the safe commit when opening an internal
engine. This allows us to safely prune away _id of all soft-deleted documents as
the version map is always in-sync with Lucene index.

Relates #40741 (elastic/elasticsearch#40741)
Supersedes #42979 (elastic/elasticsearch#42979)

Port of elastic/elasticsearch#43202

(cherry picked from commit bb23cb7)

# Conflicts:
#	es/es-server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
#	es/es-server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
  • Loading branch information
marregui authored and mergify-bot committed Dec 4, 2019
1 parent 2184d93 commit 87db681
Show file tree
Hide file tree
Showing 8 changed files with 442 additions and 171 deletions.
Expand Up @@ -26,7 +26,6 @@
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
Expand All @@ -42,7 +41,6 @@
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
Expand All @@ -67,7 +65,6 @@
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.index.analysis.AnalyzerScope;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;

import java.io.IOException;
import java.text.ParseException;
Expand All @@ -76,7 +73,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.LongConsumer;

public class Lucene {
public static final String LATEST_DOC_VALUES_FORMAT = "Lucene80";
Expand Down Expand Up @@ -519,39 +515,4 @@ public CacheHelper getReaderCacheHelper() {
public static NumericDocValuesField newSoftDeletesField() {
return new NumericDocValuesField(SOFT_DELETES_FIELD, 1);
}

/**
* Scans sequence numbers (i.e., {@link SeqNoFieldMapper#NAME}) between {@code fromSeqNo}(inclusive) and {@code toSeqNo}(inclusive)
* in the provided directory reader. This method invokes the callback {@code onNewSeqNo} whenever a sequence number value is found.
*
* @param directoryReader the directory reader to scan
* @param fromSeqNo the lower bound of a range of seq_no to scan (inclusive)
* @param toSeqNo the upper bound of a range of seq_no to scan (inclusive)
* @param onNewSeqNo the callback to be called whenever a new valid sequence number is found
*/
public static void scanSeqNosInReader(DirectoryReader directoryReader, long fromSeqNo, long toSeqNo,
LongConsumer onNewSeqNo) throws IOException {
final DirectoryReader reader = Lucene.wrapAllDocsLive(directoryReader);
final IndexSearcher searcher = new IndexSearcher(reader);
searcher.setQueryCache(null);
final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo);
final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f);
for (LeafReaderContext leaf : reader.leaves()) {
final Scorer scorer = weight.scorer(leaf);
if (scorer == null) {
continue;
}
final DocIdSetIterator docIdSetIterator = scorer.iterator();
final NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
int docId;
while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (seqNoDocValues == null || seqNoDocValues.advanceExact(docId) == false) {
throw new IllegalStateException("seq_no doc_values not found for doc_id=" + docId);
}
final long seqNo = seqNoDocValues.longValue();
assert fromSeqNo <= seqNo && seqNo <= toSeqNo : "from_seq_no=" + fromSeqNo + " seq_no=" + seqNo + " to_seq_no=" + toSeqNo;
onNewSeqNo.accept(seqNo);
}
}
}
}
@@ -0,0 +1,93 @@
/*
* Licensed to Crate under one or more contributor license agreements.
* See the NOTICE file distributed with this work for additional
* information regarding copyright ownership. Crate licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial
* agreement.
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.NumericDocValues;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;

import java.io.IOException;
import java.util.Objects;

final class CombinedDocValues {
private final NumericDocValues versionDV;
private final NumericDocValues seqNoDV;
private final NumericDocValues primaryTermDV;
private final NumericDocValues tombstoneDV;
private final NumericDocValues recoverySource;

CombinedDocValues(LeafReader leafReader) throws IOException {
this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing");
this.seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing");
this.primaryTermDV = Objects.requireNonNull(
leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME), "PrimaryTermDV is missing");
this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
this.recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME);
}

long docVersion(int segmentDocId) throws IOException {
assert versionDV.docID() < segmentDocId;
if (versionDV.advanceExact(segmentDocId) == false) {
throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found");
}
return versionDV.longValue();
}

long docSeqNo(int segmentDocId) throws IOException {
assert seqNoDV.docID() < segmentDocId;
if (seqNoDV.advanceExact(segmentDocId) == false) {
throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found");
}
return seqNoDV.longValue();
}

long docPrimaryTerm(int segmentDocId) throws IOException {
if (primaryTermDV == null) {
return -1L;
}
assert primaryTermDV.docID() < segmentDocId;
// Use -1 for docs which don't have primary term. The caller considers those docs as nested docs.
if (primaryTermDV.advanceExact(segmentDocId) == false) {
return -1;
}
return primaryTermDV.longValue();
}

boolean isTombstone(int segmentDocId) throws IOException {
if (tombstoneDV == null) {
return false;
}
assert tombstoneDV.docID() < segmentDocId;
return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0;
}

boolean hasRecoverySource(int segmentDocId) throws IOException {
if (recoverySource == null) {
return false;
}
assert recoverySource.docID() < segmentDocId;
return recoverySource.advanceExact(segmentDocId);
}
}
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.Logger;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
Expand All @@ -29,17 +30,23 @@
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockObtainFailedException;
Expand All @@ -63,12 +70,14 @@
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.fieldvisitor.IDVisitor;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SeqNoStats;
Expand All @@ -84,7 +93,6 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -99,7 +107,11 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
<<<<<<< HEAD
import java.util.function.Supplier;
=======
import java.util.stream.Collectors;
>>>>>>> bb23cb7b20... Rebuild version map when opening internal engine
import java.util.stream.Stream;

public class InternalEngine extends Engine {
Expand Down Expand Up @@ -230,6 +242,15 @@ public InternalEngine(EngineConfig engineConfig) {
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint());
this.internalSearcherManager.addListener(lastRefreshedCheckpointListener);
maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo()));
if (softDeleteEnabled && localCheckpointTracker.getPersistedCheckpoint() < localCheckpointTracker.getMaxSeqNo()) {
try (Searcher searcher =
acquireSearcher("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL)) {
restoreVersionMapAndCheckpointTracker(Lucene.wrapAllDocsLive(searcher.getDirectoryReader()));
} catch (IOException e) {
throw new EngineCreationFailureException(config().getShardId(),
"failed to restore version map and local checkpoint tracker", e);
}
}
success = true;
} finally {
if (success == false) {
Expand Down Expand Up @@ -673,21 +694,26 @@ enum OpVsLuceneDocStatus {
LUCENE_DOC_NOT_FOUND
}

private static OpVsLuceneDocStatus compareOpToVersionMapOnSeqNo(String id, long seqNo, long primaryTerm, VersionValue versionValue) {
Objects.requireNonNull(versionValue);
if (seqNo > versionValue.seqNo) {
return OpVsLuceneDocStatus.OP_NEWER;
} else if (seqNo == versionValue.seqNo) {
assert versionValue.term == primaryTerm : "primary term not matched; id=" + id + " seq_no=" + seqNo
+ " op_term=" + primaryTerm + " existing_term=" + versionValue.term;
return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
} else {
return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
}
}

private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException {
assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found";
final OpVsLuceneDocStatus status;
VersionValue versionValue = getVersionFromMap(op.uid().bytes());
assert incrementVersionLookup();
if (versionValue != null) {
if (op.seqNo() > versionValue.seqNo) {
status = OpVsLuceneDocStatus.OP_NEWER;
} else if (op.seqNo() == versionValue.seqNo) {
assert versionValue.term == op.primaryTerm() : "primary term not matched; id=" + op.id() + " seq_no=" + op.seqNo()
+ " op_term=" + op.primaryTerm() + " existing_term=" + versionValue.term;
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
} else {
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
}
status = compareOpToVersionMapOnSeqNo(op.id(), op.seqNo(), op.primaryTerm(), versionValue);
} else {
// load from index
assert incrementIndexVersionLookup();
Expand Down Expand Up @@ -1903,8 +1929,9 @@ void clearDeletedTombstones() {
}

// for testing
final Collection<DeleteVersionValue> getDeletedTombstones() {
return versionMap.getAllTombstones().values();
final Map<BytesRef, VersionValue> getVersionMap() {
return Stream.concat(versionMap.getAllCurrent().entrySet().stream(), versionMap.getAllTombstones().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@Override
Expand Down Expand Up @@ -2502,10 +2529,6 @@ private boolean incrementIndexVersionLookup() {
return true;
}

int getVersionMapSize() {
return versionMap.getAllCurrent().size();
}

boolean isSafeAccessRequired() {
return versionMap.isSafeAccessRequired();
}
Expand Down Expand Up @@ -2743,4 +2766,59 @@ private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean a
assert seqNo <= maxSeqNoOfUpdates : "id=" + id + " seq_no=" + seqNo + " msu=" + maxSeqNoOfUpdates;
return true;
}

/**
* Restores the live version map and local checkpoint of this engine using documents (including soft-deleted)
* after the local checkpoint in the safe commit. This step ensures the live version map and checkpoint tracker
* are in sync with the Lucene commit.
*/
private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryReader) throws IOException {
final IndexSearcher searcher = new IndexSearcher(directoryReader);
searcher.setQueryCache(null);
final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE);
final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f);
for (LeafReaderContext leaf : directoryReader.leaves()) {
final Scorer scorer = weight.scorer(leaf);
if (scorer == null) {
continue;
}
final CombinedDocValues dv = new CombinedDocValues(leaf.reader());
final IDVisitor idFieldVisitor = new IDVisitor(IdFieldMapper.NAME);
final DocIdSetIterator iterator = scorer.iterator();
int docId;
while ((docId = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
final long primaryTerm = dv.docPrimaryTerm(docId);
if (primaryTerm == -1L) {
continue; // skip children docs which do not have primary term
}
final long seqNo = dv.docSeqNo(docId);
localCheckpointTracker.markSeqNoAsProcessed(seqNo);
localCheckpointTracker.markSeqNoAsPersisted(seqNo);
idFieldVisitor.reset();
leaf.reader().document(docId, idFieldVisitor);
if (idFieldVisitor.getId() == null) {
assert dv.isTombstone(docId);
continue;
}
final BytesRef uid = new Term(IdFieldMapper.NAME, Uid.encodeId(idFieldVisitor.getId())).bytes();
try (Releasable ignored = versionMap.acquireLock(uid)) {
final VersionValue curr = versionMap.getUnderLock(uid);
if (curr == null ||
compareOpToVersionMapOnSeqNo(idFieldVisitor.getId(), seqNo, primaryTerm, curr) == OpVsLuceneDocStatus.OP_NEWER) {
if (dv.isTombstone(docId)) {
// use 0L for the start time so we can prune this delete tombstone quickly
// when the local checkpoint advances (i.e., after a recovery completed).
final long startTime = 0L;
versionMap.putDeleteUnderLock(uid, new DeleteVersionValue(dv.docVersion(docId), seqNo, primaryTerm, startTime));
} else {
versionMap.putIndexUnderLock(uid, new IndexVersionValue(null, dv.docVersion(docId), seqNo, primaryTerm));
}
}
}
}
}
// remove live entries in the version map
refresh("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL);
}

}

0 comments on commit 87db681

Please sign in to comment.