Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sequence numbers commit data for Lucene uses Iterable interface #20793

Merged
merged 10 commits into from
Oct 12, 2016
115 changes: 73 additions & 42 deletions core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
Expand All @@ -59,7 +58,6 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.SeqNoFieldMapper;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.SeqNoStats;
Expand All @@ -76,6 +74,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -86,8 +85,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import static org.elasticsearch.index.seqno.SequenceNumbersService.NO_OPS_PERFORMED;

public class InternalEngine extends Engine {

/**
Expand Down Expand Up @@ -121,6 +118,7 @@ public class InternalEngine extends Engine {
private final SequenceNumbersService seqNoService;
static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
static final String GLOBAL_CHECKPOINT_KEY = "global_checkpoint";
static final String MAX_SEQ_NO = "max_seq_no";

// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
Expand Down Expand Up @@ -285,7 +283,7 @@ private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) thr
boolean success = false;
try {
commitIndexWriter(writer, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
? writer.getCommitData().get(SYNC_COMMIT_ID) : null);
? commitDataAsMap(writer).get(SYNC_COMMIT_ID) : null);
success = true;
} finally {
if (success == false) {
Expand All @@ -310,7 +308,7 @@ public Translog getTranslog() {
private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer) throws IOException {
// commit on a just opened writer will commit even if there are no changes done to it
// we rely on that for the commit data translog id key
final Map<String, String> commitUserData = writer.getCommitData();
final Map<String, String> commitUserData = commitDataAsMap(writer);
if (commitUserData.containsKey("translog_id")) {
assert commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY) == false : "legacy commit contains translog UUID";
return new Translog.TranslogGeneration(null, Long.parseLong(commitUserData.get("translog_id")));
Expand All @@ -325,33 +323,26 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer)
return null;
}

// package private for testing
SeqNoStats loadSeqNoStatsFromCommit() throws IOException {
return loadSeqNoStatsFromCommit(indexWriter);
}

private SeqNoStats loadSeqNoStatsFromCommit(IndexWriter writer) throws IOException {
final long maxSeqNo;
try (IndexReader reader = DirectoryReader.open(writer)) {
final FieldStats stats = SeqNoFieldMapper.Defaults.FIELD_TYPE.stats(reader);
if (stats != null) {
maxSeqNo = (long) stats.getMaxValue();
} else {
maxSeqNo = NO_OPS_PERFORMED;
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
final String key = entry.getKey();
if (key.equals(LOCAL_CHECKPOINT_KEY)) {
localCheckpoint = Long.parseLong(entry.getValue());
} else if (key.equals(GLOBAL_CHECKPOINT_KEY)) {
globalCheckpoint = Long.parseLong(entry.getValue());
} else if (key.equals(MAX_SEQ_NO)) {
maxSeqNo = Long.parseLong(entry.getValue());
}
}

final Map<String, String> commitUserData = writer.getCommitData();

final long localCheckpoint;
if (commitUserData.containsKey(LOCAL_CHECKPOINT_KEY)) {
localCheckpoint = Long.parseLong(commitUserData.get(LOCAL_CHECKPOINT_KEY));
} else {
localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
}

final long globalCheckpoint;
if (commitUserData.containsKey(GLOBAL_CHECKPOINT_KEY)) {
globalCheckpoint = Long.parseLong(commitUserData.get(GLOBAL_CHECKPOINT_KEY));
} else {
globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}

return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
}

Expand Down Expand Up @@ -1323,24 +1314,52 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn
ensureCanFlush();
try {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final Map<String, String> commitData = new HashMap<>(5);

commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGeneration.translogFileGeneration));
commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID);
final String translogFileGen = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID;
final String localCheckpoint = Long.toString(seqNoService().getLocalCheckpoint());
final String globalCheckpoint = Long.toString(seqNoService().getGlobalCheckpoint());

commitData.put(LOCAL_CHECKPOINT_KEY, Long.toString(seqNoService().getLocalCheckpoint()));
commitData.put(GLOBAL_CHECKPOINT_KEY, Long.toString(seqNoService().getGlobalCheckpoint()));
writer.setLiveCommitData(new Iterable<Map.Entry<String, String>>() {
// save the max seq no the first time its computed, so subsequent iterations don't recompute,
// potentially getting a different value
private String computedMaxSeqNoEntry = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like we need this cashing because we load user data from the index writer. maybe we can use lastCommittedSegmentInfos, which can be read earlier when opening the engine?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes I agree with using lastCommittedSegmentInfos from the engine as a solution, but my concern here was that basically we have to document and ensure that no one uses IndexWriter#getLiveCommitData or depends on it for accurate information, otherwise the max_seq_no could be different than what is actually stored in the commit. That's why I added the caching part, which does increase complexity. Do you prefer I remove it and just document that we should never call IndexWriter#getLiveCommitData inside ES code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's OK to not use IndexWriter#getLiveCommitData - as it may not return what's in the last commit. I suspect this is why it was renamed to say live in the name. Is there anything specific you are concerned about?


if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
@Override
public Iterator<Map.Entry<String, String>> iterator() {
/**
* The user data captured above (e.g. local/global checkpoints) contains data that must be evaluated
* *before* Lucene flushes segments, including the local and global checkpoints amongst other values.
* The maximum sequence number is different - we never want the maximum sequence number to be
* less than the last sequence number to go into a Lucene commit, otherwise we run the risk
* of re-using a sequence number for two different documents when restoring from this commit
* point and subsequently writing new documents to the index. Since we only know which Lucene
* documents made it into the final commit after the {@link IndexWriter#commit()} call flushes
* all documents, we defer computation of the max_seq_no to the time of invocation of the commit
* data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(6);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(LOCAL_CHECKPOINT_KEY, localCheckpoint);
commitData.put(GLOBAL_CHECKPOINT_KEY, globalCheckpoint);
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
if (computedMaxSeqNoEntry == null) {
// evaluated once at the time of the first invocation of this method
computedMaxSeqNoEntry = Long.toString(seqNoService().getMaxSeqNo());
}
commitData.put(MAX_SEQ_NO, computedMaxSeqNoEntry);
return commitData.entrySet().iterator();
}
});

writer.commit();

if (logger.isTraceEnabled()) {
logger.trace("committing writer with commit data [{}]", commitData);
logger.trace("committed writer with commit data [{}]", commitDataAsMap(writer));
}

indexWriter.setCommitData(commitData);
writer.commit();
} catch (Exception ex) {
try {
failEngine("lucene commit failed", ex);
Expand Down Expand Up @@ -1395,7 +1414,8 @@ public MergeStats getMergeStats() {
public SequenceNumbersService seqNoService() {
return seqNoService;
}
@Override

@Override
public DocsStats getDocStats() {
final int numDocs = indexWriter.numDocs();
final int maxDoc = indexWriter.maxDoc();
Expand Down Expand Up @@ -1441,4 +1461,15 @@ boolean indexWriterHasDeletions() {
public boolean isRecovering() {
return pendingTranslogRecovery.get();
}

/**
* Gets the commit data from {@link IndexWriter} as a map.
*/
private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter) {
Map<String, String> commitData = new HashMap<>(6);
for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
commitData.put(entry.getKey(), entry.getValue());
}
return commitData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ public long generateSeqNo() {
return localCheckpointService.generateSeqNo();
}

/**
* Gets the maximum sequence number seen so far. See {@link LocalCheckpointService#getMaxSeqNo()} for details.
*/
public long getMaxSeqNo() {
return localCheckpointService.getMaxSeqNo();
}

/**
* marks the given seqNo as completed. See {@link LocalCheckpointService#markSeqNoAsCompleted(long)}
* more details
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
Expand All @@ -138,6 +139,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -583,6 +585,10 @@ public SequenceNumbersService seqNoService() {
assertThat(
Long.parseLong(stats1.getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)),
equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
assertThat(stats1.getUserData(), hasKey(InternalEngine.MAX_SEQ_NO));
assertThat(
Long.parseLong(stats1.getUserData().get(InternalEngine.MAX_SEQ_NO)),
equalTo(SequenceNumbersService.NO_OPS_PERFORMED));

maxSeqNo.set(rarely() ? SequenceNumbersService.NO_OPS_PERFORMED : randomIntBetween(0, 1024));
localCheckpoint.set(
Expand All @@ -608,6 +614,8 @@ public SequenceNumbersService seqNoService() {
assertThat(
Long.parseLong(stats2.getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)),
equalTo(globalCheckpoint.get()));
assertThat(stats2.getUserData(), hasKey(InternalEngine.MAX_SEQ_NO));
assertThat(Long.parseLong(stats2.getUserData().get(InternalEngine.MAX_SEQ_NO)), equalTo(maxSeqNo.get()));
} finally {
IOUtils.close(engine);
}
Expand Down Expand Up @@ -1618,13 +1626,14 @@ public void testIndexWriterInfoStream() throws IllegalAccessException {
}

public void testSeqNoAndCheckpoints() throws IOException {
// nocommit: does not test deletes
final int opCount = randomIntBetween(1, 256);
long primarySeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
final String[] ids = new String[]{"1", "2", "3"};
final Set<String> indexedIds = new HashSet<>();
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
long replicaLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
InternalEngine initialEngine = null;

try {
Expand All @@ -1633,26 +1642,61 @@ public void testSeqNoAndCheckpoints() throws IOException {
.seqNoService()
.updateAllocationIdsFromMaster(new HashSet<>(Arrays.asList("primary", "replica")), Collections.emptySet());
for (int op = 0; op < opCount; op++) {
final String id = randomFrom(ids);
ParsedDocument doc = testParsedDocument("test#" + id, id, "test", null, -1, -1, testDocumentWithTextField(), SOURCE, null);
final Engine.Index index = new Engine.Index(newUid("test#" + id), doc,
SequenceNumbersService.UNASSIGNED_SEQ_NO,
rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL,
PRIMARY, 0, -1, false);
try {
initialEngine.index(index);
final String id;
boolean versionConflict = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

// mostly index, sometimes delete
if (rarely() && indexedIds.isEmpty() == false) {
// we have some docs indexed, so delete one of them
id = randomFrom(indexedIds);
final Engine.Delete delete = new Engine.Delete(
"test", id, newUid("test#" + id), SequenceNumbersService.UNASSIGNED_SEQ_NO,
rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0, false);
try {
initialEngine.delete(delete);
indexedIds.remove(id);
} catch (VersionConflictEngineException e) {
versionConflict = true;
}
} else {
// index a document
id = randomFrom(ids);
ParsedDocument doc = testParsedDocument("test#" + id, id, "test", null, -1, -1, testDocumentWithTextField(), SOURCE, null);
final Engine.Index index = new Engine.Index(newUid("test#" + id), doc,
SequenceNumbersService.UNASSIGNED_SEQ_NO,
rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL,
PRIMARY, 0, -1, false);
try {
initialEngine.index(index);
indexedIds.add(id);
} catch (VersionConflictEngineException e) {
versionConflict = true;
}
}
if (versionConflict == false) {
primarySeqNo++;
} catch (VersionConflictEngineException e) {

}

replicaLocalCheckpoint =
rarely() ? replicaLocalCheckpoint : randomIntBetween(Math.toIntExact(replicaLocalCheckpoint), Math.toIntExact(primarySeqNo));
initialEngine.seqNoService().updateLocalCheckpointForShard("primary", initialEngine.seqNoService().getLocalCheckpoint());
initialEngine.seqNoService().updateLocalCheckpointForShard("replica", replicaLocalCheckpoint);

// make sure the max seq no in the latest commit hasn't advanced due to more documents having been added;
// the first time the commit data iterable gets an iterator, the max seq no from that point in time should
// remain from any subsequent call to IndexWriter#getLiveCommitData unless the commit data is overwritten by a
// subsequent call to IndexWriter#setLiveCommitData.
assertThat(
initialEngine.seqNoService().getMaxSeqNo(),
// its possible we haven't indexed any documents yet, or its possible that right after a commit, a version conflict
// exception happened so the max seq no was not updated, so here we check greater than or equal to
initialEngine.seqNoService().getMaxSeqNo() != SequenceNumbersService.NO_OPS_PERFORMED || versionConflict ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need this extra check? what were you aiming at testing ? feels like it's for the time we had caching?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't asserting anything relevant about the saved commit data any longer, so I'm removing it

greaterThanOrEqualTo(initialEngine.loadSeqNoStatsFromCommit().getMaxSeqNo()) :
greaterThan(initialEngine.loadSeqNoStatsFromCommit().getMaxSeqNo())
);

if (rarely()) {
localCheckpoint = primarySeqNo;
maxSeqNo = primarySeqNo;
globalCheckpoint = replicaLocalCheckpoint;
initialEngine.seqNoService().updateGlobalCheckpointOnPrimary();
initialEngine.flush(true, true);
Expand All @@ -1661,6 +1705,7 @@ public void testSeqNoAndCheckpoints() throws IOException {

initialEngine.seqNoService().updateGlobalCheckpointOnPrimary();

assertEquals(primarySeqNo, initialEngine.seqNoService().getMaxSeqNo());
assertThat(initialEngine.seqNoService().stats().getMaxSeqNo(), equalTo(primarySeqNo));
assertThat(initialEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(primarySeqNo));
assertThat(initialEngine.seqNoService().stats().getGlobalCheckpoint(), equalTo(replicaLocalCheckpoint));
Expand All @@ -1671,6 +1716,9 @@ public void testSeqNoAndCheckpoints() throws IOException {
assertThat(
Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)),
equalTo(globalCheckpoint));
assertThat(
Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)),
equalTo(maxSeqNo));

} finally {
IOUtils.close(initialEngine);
Expand All @@ -1681,12 +1729,17 @@ public void testSeqNoAndCheckpoints() throws IOException {
recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
recoveringEngine.recoverFromTranslog();

assertEquals(primarySeqNo, recoveringEngine.seqNoService().getMaxSeqNo());
assertThat(
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)),
equalTo(primarySeqNo));
assertThat(
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)),
equalTo(globalCheckpoint));
assertThat(
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)),
// after recovering from translog, all docs have been flushed to Lucene segments, so check against primarySeqNo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I follow this comment? can you elaborate?

equalTo(primarySeqNo));
assertThat(recoveringEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(primarySeqNo));
assertThat(recoveringEngine.seqNoService().stats().getMaxSeqNo(), equalTo(primarySeqNo));
assertThat(recoveringEngine.seqNoService().generateSeqNo(), equalTo(primarySeqNo + 1));
Expand Down