Skip to content

Commit

Permalink
Backport of #9346
Browse files Browse the repository at this point in the history
Advance max_seq_no before add operation to Lucene.

This commit fixes the issue of potential mismatches between the max_seq_no in
the commit's user_data and the seq_no of some documents in the Lucene commit.
The mismatch could arise when processing an operation on a replica engine, as
we first added it to Lucene, then to the translog, to finally mark seq_no as
completed. If a flush occurred after step1, but before the marking, then the
max_seq_no in the commit's user_data would be smaller than the seq_no of some
documents in the Lucene commit.

Port of elastic/elasticsearch#38879
  • Loading branch information
marregui committed Nov 18, 2019
1 parent a02c1f3 commit f5bb0b7
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
}
}
}
markSeqNoAsSeen(index.seqNo());
return plan;
}

Expand Down Expand Up @@ -1321,6 +1322,7 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws
delete.seqNo(), delete.version());
}
}
markSeqNoAsSeen(delete.seqNo());
return plan;
}

Expand Down Expand Up @@ -2421,6 +2423,13 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException {
localCheckpointTracker.waitForOpsToComplete(seqNo);
}

/**
* Marks the given seq_no as seen and advances the max_seq_no of this engine to at least that value.
*/
protected final void markSeqNoAsSeen(long seqNo) {
localCheckpointTracker.advanceMaxSeqNo(seqNo);
}

/**
* Checks if the given operation has been processed in this engine or not.
* @return true if the given operation was processed; otherwise false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.carrotsearch.hppc.LongObjectHashMap;
import org.elasticsearch.common.SuppressForbidden;

import java.util.concurrent.atomic.AtomicLong;

/**
* This class generates sequences numbers and keeps track of the so-called "local checkpoint" which is the highest number for which all
* previous sequence numbers have been processed (inclusive).
Expand All @@ -48,7 +50,7 @@ public class LocalCheckpointTracker {
/**
* The next available sequence number.
*/
private volatile long nextSeqNo;
final AtomicLong nextSeqNo = new AtomicLong();

/**
* Initialize the local checkpoint service. The {@code maxSeqNo} should be set to the last sequence number assigned, or
Expand All @@ -62,13 +64,14 @@ public LocalCheckpointTracker(final long maxSeqNo, final long localCheckpoint) {
if (localCheckpoint < 0 && localCheckpoint != SequenceNumbers.NO_OPS_PERFORMED) {
throw new IllegalArgumentException(
"local checkpoint must be non-negative or [" + SequenceNumbers.NO_OPS_PERFORMED + "] "
+ "but was [" + localCheckpoint + "]");
+ "but was [" + localCheckpoint + "]");
}
if (maxSeqNo < 0 && maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED) {
throw new IllegalArgumentException(
"max seq. no. must be non-negative or [" + SequenceNumbers.NO_OPS_PERFORMED + "] but was [" + maxSeqNo + "]");
"max seq. no. must be non-negative or [" + SequenceNumbers.NO_OPS_PERFORMED + "] but was [" + maxSeqNo +
"]");
}
nextSeqNo = maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1;
nextSeqNo.set(maxSeqNo + 1);
checkpoint = localCheckpoint;
}

Expand All @@ -78,7 +81,14 @@ public LocalCheckpointTracker(final long maxSeqNo, final long localCheckpoint) {
* @return the next assigned sequence number
*/
public synchronized long generateSeqNo() {
return nextSeqNo++;
return nextSeqNo.getAndIncrement();
}

/**
* Marks the provided sequence number as seen and updates the max_seq_no if needed.
*/
public synchronized void advanceMaxSeqNo(final long seqNo) {
nextSeqNo.accumulateAndGet(seqNo + 1, Math::max);
}

/**
Expand All @@ -87,10 +97,7 @@ public synchronized long generateSeqNo() {
* @param seqNo the sequence number to mark as completed
*/
public synchronized void markSeqNoAsCompleted(final long seqNo) {
// make sure we track highest seen sequence number
if (seqNo >= nextSeqNo) {
nextSeqNo = seqNo + 1;
}
advanceMaxSeqNo(seqNo);
if (seqNo <= checkpoint) {
// this is possible during recovery where we might replay an operation that was also replicated
return;
Expand Down Expand Up @@ -131,13 +138,13 @@ public long getCheckpoint() {
* @return the maximum sequence number
*/
public long getMaxSeqNo() {
return nextSeqNo - 1;
return nextSeqNo.get() - 1;
}


/**
* constructs a {@link SeqNoStats} object, using local state and the supplied global checkpoint
*
* <p>
* This is needed to make sure the local checkpoint and max seq no are consistent
*/
public synchronized SeqNoStats getStats(final long globalCheckpoint) {
Expand All @@ -163,7 +170,7 @@ public synchronized void waitForOpsToComplete(final long seqNo) throws Interrupt
*/
public boolean contains(final long seqNo) {
assert seqNo >= 0 : "invalid seq_no=" + seqNo;
if (seqNo >= nextSeqNo) {
if (seqNo >= nextSeqNo.get()) {
return false;
}
if (seqNo <= checkpoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5157,6 +5157,45 @@ public long addDocument(Iterable<? extends IndexableField> doc) throws IOExcepti
}
}

@Test
public void testMaxSeqNoInCommitUserData() throws Exception {
AtomicBoolean running = new AtomicBoolean(true);
Thread rollTranslog = new Thread(() -> {
while (running.get() && engine.getTranslog().currentFileGeneration() < 500) {
engine.rollTranslogGeneration(); // make adding operations to translog slower
}
});
rollTranslog.start();

Thread indexing = new Thread(() -> {
long seqNo = 0;
while (running.get() && seqNo <= 1000) {
try {
String id = Long.toString(between(1, 50));
if (randomBoolean()) {
ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
engine.index(replicaIndexForDoc(doc, 1L, seqNo, false));
} else {
engine.delete(replicaDeleteForDoc(id, 1L, seqNo, 0L));
}
seqNo++;
} catch (IOException e) {
throw new AssertionError(e);
}
}
});
indexing.start();

int numCommits = between(5, 20);
for (int i = 0; i < numCommits; i++) {
engine.flush(false, true);
}
running.set(false);
indexing.join();
rollTranslog.join();
assertMaxSeqNoInCommitUserData(engine);
}

private static void trimUnsafeCommits(EngineConfig config) throws IOException {
final Store store = config.getStore();
final TranslogConfig translogConfig = config.getTranslogConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReader;
Expand All @@ -48,6 +50,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
Expand Down Expand Up @@ -113,6 +116,7 @@
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;

public abstract class EngineTestCase extends ESTestCase {
Expand Down Expand Up @@ -237,13 +241,21 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) {
@After
public void tearDown() throws Exception {
super.tearDown();
if (engine != null && engine.isClosed.get() == false) {
engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test"));
}
if (replicaEngine != null && replicaEngine.isClosed.get() == false) {
replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test"));
try {
if (engine != null && engine.isClosed.get() == false) {
engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test"));
assertMaxSeqNoInCommitUserData(engine);
}
if (replicaEngine != null && replicaEngine.isClosed.get() == false) {
replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test"));
assertMaxSeqNoInCommitUserData(replicaEngine);
}
assertThat(engine.config().getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L));
assertThat(replicaEngine.config().getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L));
} finally {
IOUtils.close(replicaEngine, storeReplica, engine, store, () -> terminate(threadPool));
}
IOUtils.close(
replicaEngine, storeReplica,
Expand Down Expand Up @@ -920,6 +932,21 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e
}
}

/**
* Asserts that the max_seq_no stored in the commit's user_data is never smaller than seq_no of any document in the commit.
*/
public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exception {
List<IndexCommit> commits = DirectoryReader.listCommits(engine.store.directory());
for (IndexCommit commit : commits) {
try (DirectoryReader reader = DirectoryReader.open(commit)) {
AtomicLong maxSeqNoFromDocs = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Lucene.scanSeqNosInReader(reader, 0, Long.MAX_VALUE, n -> maxSeqNoFromDocs.set(Math.max(n, maxSeqNoFromDocs.get())));
assertThat(Long.parseLong(commit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
greaterThanOrEqualTo(maxSeqNoFromDocs.get()));
}
}
}

public static MapperService createMapperService(String type) throws IOException {
IndexMetaData indexMetaData = IndexMetaData.builder("test")
.settings(Settings.builder()
Expand Down

0 comments on commit f5bb0b7

Please sign in to comment.