Skip to content

Commit

Permalink
[CCR] Read changes from Lucene instead of translog (#30120)
Browse files Browse the repository at this point in the history
This commit adds an API to read translog snapshot from Lucene,
then cut-over from the existing translog to the new API in CCR.

Relates #30086
Relates #29530
  • Loading branch information
martijnvg authored and dnhatn committed May 9, 2018
1 parent 5d99157 commit bb6586d
Show file tree
Hide file tree
Showing 12 changed files with 833 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
Expand Down Expand Up @@ -609,6 +610,12 @@ public Translog.Location getTranslogLastWriteLocation() {
return getTranslog().getLastWriteLocation();
}

/**
* Creates a new "translog" snapshot from Lucene for reading operations whose seqno in the requesting seqno range
*/
public abstract Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService mapperService,
long minSeqNo, long maxSeqNo, boolean requiredFullRange) throws IOException;

protected final void ensureOpen(Exception suppressed) {
if (isClosed.get()) {
AlreadyClosedException ace = new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
Expand Down Expand Up @@ -148,6 +149,7 @@ public class InternalEngine extends Engine {
private final CounterMetric numDocUpdates = new CounterMetric();
private final NumericDocValuesField softDeleteField = Lucene.newSoftDeleteField();
private final boolean softDeleteEnabled;
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;

/**
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
Expand Down Expand Up @@ -224,6 +226,8 @@ public InternalEngine(EngineConfig engineConfig) {
for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) {
this.internalSearcherManager.addListener(listener);
}
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint());
this.internalSearcherManager.addListener(lastRefreshedCheckpointListener);
success = true;
} finally {
if (success == false) {
Expand Down Expand Up @@ -2345,6 +2349,23 @@ long getNumDocUpdates() {
return numDocUpdates.count();
}

public Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService mapperService,
long minSeqNo, long maxSeqNo, boolean requiredFullRange) throws IOException {
// TODO: Should we defer the refresh until we really need it?
ensureOpen();
if (lastRefreshedCheckpoint() < maxSeqNo) {
refresh(source, SearcherScope.INTERNAL);
}
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
try {
LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, minSeqNo, maxSeqNo, requiredFullRange);
searcher = null;
return snapshot;
} finally {
IOUtils.close(searcher);
}
}

@Override
public boolean isRecovering() {
return pendingTranslogRecovery.get();
Expand Down Expand Up @@ -2391,4 +2412,28 @@ public long softUpdateDocuments(Term term, Iterable<? extends Iterable<? extends
return super.softUpdateDocuments(term, docs, softDeletes);
}
}

/**
* Returned the last local checkpoint value has been refreshed internally.
*/
final long lastRefreshedCheckpoint() {
return lastRefreshedCheckpointListener.refreshedCheckpoint.get();
}
private final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener {
final AtomicLong refreshedCheckpoint;
private long pendingCheckpoint;
LastRefreshedCheckpointListener(long initialLocalCheckpoint) {
this.refreshedCheckpoint = new AtomicLong(initialLocalCheckpoint);
}
@Override
public void beforeRefresh() {
pendingCheckpoint = localCheckpointTracker.getCheckpoint(); // All change until this point should be visible after refresh
}
@Override
public void afterRefresh(boolean didRefresh) {
if (didRefresh) {
refreshedCheckpoint.set(pendingCheckpoint);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.ReaderUtil;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.translog.Translog;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Objects;

/**
* A {@link Translog.Snapshot} from changes in a Lucene index
*/
final class LuceneChangesSnapshot implements Translog.Snapshot {
private final long fromSeqNo, toSeqNo;
private long lastSeenSeqNo;
private int skippedOperations;
private final boolean requiredFullRange;

private final IndexSearcher indexSearcher;
private final MapperService mapperService;
private int docIndex = 0;
private final TopDocs topDocs;

private final Closeable onClose;
private final CombinedDocValues[] docValues; // Cache of DocValues

/**
* Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range.
*
* @param engineSearcher the internal engine searcher which will be taken over if the snapshot is opened successfully
* @param mapperService the mapper service which will be mainly used to resolve the document's type and uid
* @param fromSeqNo the min requesting seq# - inclusive
* @param toSeqNo the maximum requesting seq# - inclusive
* @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo
*/
LuceneChangesSnapshot(Engine.Searcher engineSearcher, MapperService mapperService,
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
}
this.mapperService = mapperService;
this.fromSeqNo = fromSeqNo;
this.toSeqNo = toSeqNo;
this.lastSeenSeqNo = fromSeqNo - 1;
this.requiredFullRange = requiredFullRange;
this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
this.indexSearcher.setQueryCache(null);
this.topDocs = searchOperations(indexSearcher);
final List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
this.docValues = new CombinedDocValues[leaves.size()];
for (LeafReaderContext leaf : leaves) {
this.docValues[leaf.ord] = new CombinedDocValues(leaf.reader());
}
this.onClose = engineSearcher;
}

@Override
public void close() throws IOException {
onClose.close();
}

@Override
public int totalOperations() {
return Math.toIntExact(topDocs.totalHits);
}

@Override
public int overriddenOperations() {
return skippedOperations;
}

@Override
public Translog.Operation next() throws IOException {
Translog.Operation op = null;
for (int docId = nextDocId(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = nextDocId()) {
op = readDocAsOp(docId);
if (op != null) {
break;
}
}
if (requiredFullRange) {
rangeCheck(op);
}
if (op != null) {
lastSeenSeqNo = op.seqNo();
}
return op;
}

private void rangeCheck(Translog.Operation op) {
if (op == null) {
if (lastSeenSeqNo < toSeqNo) {
throw new IllegalStateException("Not all operations between min_seqno [" + fromSeqNo + "] " +
"and max_seqno [" + toSeqNo + "] found; prematurely terminated last_seen_seqno [" + lastSeenSeqNo + "]");
}
} else {
final long expectedSeqNo = lastSeenSeqNo + 1;
if (op.seqNo() != expectedSeqNo) {
throw new IllegalStateException("Not all operations between min_seqno [" + fromSeqNo + "] " +
"and max_seqno [" + toSeqNo + "] found; expected seqno [" + expectedSeqNo + "]; found [" + op + "]");
}
}
}

private int nextDocId() {
if (docIndex < topDocs.scoreDocs.length) {
final int docId = topDocs.scoreDocs[docIndex].doc;
docIndex++;
return docId;
} else {
return DocIdSetIterator.NO_MORE_DOCS;
}
}

private TopDocs searchOperations(IndexSearcher searcher) throws IOException {
final Query rangeQuery = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo);
final Sort sortedBySeqNoThenByTerm = new Sort(
new SortedNumericSortField(SeqNoFieldMapper.NAME, SortField.Type.LONG),
new SortedNumericSortField(SeqNoFieldMapper.PRIMARY_TERM_NAME, SortField.Type.LONG, true)
);
return searcher.search(rangeQuery, Integer.MAX_VALUE, sortedBySeqNoThenByTerm);
}

private Translog.Operation readDocAsOp(int docID) throws IOException {
final List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
final LeafReaderContext leaf = leaves.get(ReaderUtil.subIndex(docID, leaves));
final int segmentDocID = docID - leaf.docBase;
final long primaryTerm = docValues[leaf.ord].docPrimaryTerm(segmentDocID);
// We don't have to read the nested child documents - those docs don't have primary terms.
if (primaryTerm == -1) {
skippedOperations++;
return null;
}
final long seqNo = docValues[leaf.ord].docSeqNo(segmentDocID);
// Only pick the first seen seq#
if (seqNo == lastSeenSeqNo) {
skippedOperations++;
return null;
}
final long version = docValues[leaf.ord].docVersion(segmentDocID);
final FieldsVisitor fields = new FieldsVisitor(true);
indexSearcher.doc(docID, fields);
fields.postProcess(mapperService);

final Translog.Operation op;
final boolean isTombstone = docValues[leaf.ord].isTombstone(segmentDocID);
if (isTombstone && fields.uid() == null) {
op = new Translog.NoOp(seqNo, primaryTerm, ""); // TODO: store reason in ignored fields?
assert version == 1L : "Noop tombstone should have version 1L; actual version [" + version + "]";
assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Noop but soft_deletes field is not set [" + op + "]";
} else {
final String id = fields.uid().id();
final String type = fields.uid().type();
final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
if (isTombstone) {
op = new Translog.Delete(type, id, uid, seqNo, primaryTerm, version, VersionType.INTERNAL);
assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Delete op but soft_deletes field is not set [" + op + "]";
} else {
final BytesReference source = fields.source();
// TODO: pass the latest timestamp from engine.
final long autoGeneratedIdTimestamp = -1;
op = new Translog.Index(type, id, seqNo, primaryTerm, version, VersionType.INTERNAL,
source.toBytesRef().bytes, fields.routing(), autoGeneratedIdTimestamp);
}
}
assert fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && lastSeenSeqNo < op.seqNo() : "Unexpected operation; " +
"last_seen_seqno [" + lastSeenSeqNo + "], from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "], op [" + op + "]";
return op;
}

private boolean assertDocSoftDeleted(LeafReader leafReader, int segmentDocId) throws IOException {
final NumericDocValues ndv = leafReader.getNumericDocValues(Lucene.SOFT_DELETE_FIELD);
if (ndv == null || ndv.advanceExact(segmentDocId) == false) {
throw new IllegalStateException("DocValues for field [" + Lucene.SOFT_DELETE_FIELD + "] is not found");
}
return ndv.longValue() == 1;
}

private static final class CombinedDocValues {
private final LeafReader leafReader;
private NumericDocValues versionDV;
private NumericDocValues seqNoDV;
private NumericDocValues primaryTermDV;
private NumericDocValues tombstoneDV;

CombinedDocValues(LeafReader leafReader) throws IOException {
this.leafReader = leafReader;
this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing");
this.seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing");
this.primaryTermDV = Objects.requireNonNull(
leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME), "PrimaryTermDV is missing");
this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
}

long docVersion(int segmentDocId) throws IOException {
if (versionDV.docID() > segmentDocId) {
versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing");
}
if (versionDV.advanceExact(segmentDocId) == false) {
throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found");
}
return versionDV.longValue();
}

long docSeqNo(int segmentDocId) throws IOException {
if (seqNoDV.docID() > segmentDocId) {
seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing");
}
if (seqNoDV.advanceExact(segmentDocId) == false) {
throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found");
}
return seqNoDV.longValue();
}

long docPrimaryTerm(int segmentDocId) throws IOException {
if (primaryTermDV == null) {
return -1L;
}
if (primaryTermDV.docID() > segmentDocId) {
primaryTermDV = leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
}
// Use -1 for docs which don't have primary term. The caller considers those docs as nested docs.
if (primaryTermDV.advanceExact(segmentDocId) == false) {
return -1;
}
return primaryTermDV.longValue();
}

boolean isTombstone(int segmentDocId) throws IOException {
if (tombstoneDV == null) {
return false;
}
if (tombstoneDV.docID() > segmentDocId) {
tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
}
return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0;
}
}
}
Loading

0 comments on commit bb6586d

Please sign in to comment.