Skip to content

Commit

Permalink
Disable stored fields access optimization in recovery (#69385)
Browse files Browse the repository at this point in the history
We can't enable the sequential access optimization for stored fields of
changes snapshots used in peer recoveries because they are accessed by
multiple threads.

Relates to #68961
  • Loading branch information
dnhatn committed Feb 23, 2021
1 parent d7e018d commit db58272
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ public Translog.Snapshot readHistoryOperations(String reason, HistorySource hist
MapperService mapperService, long startingSeqNo) throws IOException {
if (historySource == HistorySource.INDEX) {
ensureSoftDeletesEnabled();
return newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
return newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false, false);
} else {
return getTranslog().newSnapshot(startingSeqNo, Long.MAX_VALUE);
}
Expand All @@ -570,8 +570,8 @@ public int estimateNumberOfHistoryOperations(String reason, HistorySource histor
MapperService mapperService, long startingSeqNo) throws IOException {
if (historySource == HistorySource.INDEX) {
ensureSoftDeletesEnabled();
try (Translog.Snapshot snapshot = newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo),
Long.MAX_VALUE, false)) {
try (Translog.Snapshot snapshot =
newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false, false)) {
return snapshot.totalOperations();
}
} else {
Expand Down Expand Up @@ -2684,16 +2684,15 @@ private void ensureSoftDeletesEnabled() {
}
}

@Override
public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService,
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService, long fromSeqNo, long toSeqNo,
boolean requiredFullRange, boolean singleConsumer) throws IOException {
ensureSoftDeletesEnabled();
ensureOpen();
refreshIfNeeded(source, toSeqNo);
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
try {
LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot(
searcher, mapperService, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE, fromSeqNo, toSeqNo, requiredFullRange);
searcher, mapperService, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE, fromSeqNo, toSeqNo, requiredFullRange, singleConsumer);
searcher = null;
return snapshot;
} catch (Exception e) {
Expand All @@ -2708,6 +2707,12 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS
}
}

@Override
public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService,
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
return newChangesSnapshot(source, mapperService, fromSeqNo, toSeqNo, requiredFullRange, true);
}

@Override
public boolean hasCompleteOperationHistory(String reason, HistorySource historySource,
MapperService mapperService, long startingSeqNo) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
private long lastSeenSeqNo;
private int skippedOperations;
private final boolean requiredFullRange;
private final boolean singleConsumer;

private final IndexSearcher indexSearcher;
private final MapperService mapperService;
Expand All @@ -65,6 +66,8 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
private int storedFieldsReaderOrd = -1;
private StoredFieldsReader storedFieldsReader = null;

private final Thread creationThread; // for assertion

/**
* Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range.
*
Expand All @@ -74,9 +77,10 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
* @param fromSeqNo the min requesting seq# - inclusive
* @param toSeqNo the maximum requesting seq# - inclusive
* @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo
* @param singleConsumer true if the snapshot is accessed by a single thread that creates the snapshot
*/
LuceneChangesSnapshot(Engine.Searcher engineSearcher, MapperService mapperService, int searchBatchSize,
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
long fromSeqNo, long toSeqNo, boolean requiredFullRange, boolean singleConsumer) throws IOException {
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
}
Expand All @@ -91,11 +95,13 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
};
this.mapperService = mapperService;
final long requestingSize = (toSeqNo - fromSeqNo) == Long.MAX_VALUE ? Long.MAX_VALUE : (toSeqNo - fromSeqNo + 1L);
this.creationThread = Thread.currentThread();
this.searchBatchSize = requestingSize < searchBatchSize ? Math.toIntExact(requestingSize) : searchBatchSize;
this.fromSeqNo = fromSeqNo;
this.toSeqNo = toSeqNo;
this.lastSeenSeqNo = fromSeqNo - 1;
this.requiredFullRange = requiredFullRange;
this.singleConsumer = singleConsumer;
this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
this.indexSearcher.setQueryCache(null);
this.parallelArray = new ParallelArray(this.searchBatchSize);
Expand All @@ -107,21 +113,25 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {

@Override
public void close() throws IOException {
assert assertAccessingThread();
onClose.close();
}

@Override
public int totalOperations() {
assert assertAccessingThread();
return totalHits;
}

@Override
public int skippedOperations() {
assert assertAccessingThread();
return skippedOperations;
}

@Override
public Translog.Operation next() throws IOException {
assert assertAccessingThread();
Translog.Operation op = null;
for (int idx = nextDocIndex(); idx != -1; idx = nextDocIndex()) {
op = readDocAsOp(idx);
Expand All @@ -138,6 +148,12 @@ public Translog.Operation next() throws IOException {
return op;
}

private boolean assertAccessingThread() {
assert singleConsumer == false || creationThread == Thread.currentThread() :
"created by [" + creationThread + "] != current thread [" + Thread.currentThread() + "]";
return true;
}

private void rangeCheck(Translog.Operation op) {
if (op == null) {
if (lastSeenSeqNo < toSeqNo) {
Expand Down Expand Up @@ -174,7 +190,7 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray
for (int i = 0; i < scoreDocs.length; i++) {
scoreDocs[i].shardIndex = i;
}
parallelArray.useSequentialStoredFieldsReader = scoreDocs.length >= 10 && hasSequentialAccess(scoreDocs);
parallelArray.useSequentialStoredFieldsReader = singleConsumer && scoreDocs.length >= 10 && hasSequentialAccess(scoreDocs);
if (parallelArray.useSequentialStoredFieldsReader == false) {
storedFieldsReaderOrd = -1;
storedFieldsReader = null;
Expand Down Expand Up @@ -262,6 +278,7 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException {
}
}
if (storedFieldsReader != null) {
assert singleConsumer : "Sequential access optimization must not be enabled for multiple consumers";
assert parallelArray.useSequentialStoredFieldsReader;
assert storedFieldsReaderOrd == leaf.ord : storedFieldsReaderOrd + " != " + leaf.ord;
storedFieldsReader.visitDocument(segmentDocID, fields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ public void testBasics() throws Exception {
long fromSeqNo = randomNonNegativeLong();
long toSeqNo = randomLongBetween(fromSeqNo, Long.MAX_VALUE);
// Empty engine
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true)) {
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true, randomBoolean())) {
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
assertThat(error.getMessage(),
containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found"));
}
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, false)) {
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, false, randomBoolean())) {
assertThat(snapshot, SnapshotMatchers.size(0));
}
int numOps = between(1, 100);
Expand All @@ -83,17 +83,17 @@ public void testBasics() throws Exception {
toSeqNo = randomLongBetween(fromSeqNo, numOps * 2);

Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, false)) {
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService,
between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, false, randomBoolean())) {
searcher = null;
assertThat(snapshot, SnapshotMatchers.size(0));
} finally {
IOUtils.close(searcher);
}

searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) {
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService,
between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true, randomBoolean())) {
searcher = null;
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
assertThat(error.getMessage(),
Expand All @@ -105,16 +105,16 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f
fromSeqNo = randomLongBetween(0, refreshedSeqNo);
toSeqNo = randomLongBetween(refreshedSeqNo + 1, numOps * 2);
Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, false)) {
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService,
between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, false, randomBoolean())) {
searcher = null;
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, refreshedSeqNo));
} finally {
IOUtils.close(searcher);
}
searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) {
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService,
between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true, randomBoolean())) {
searcher = null;
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
assertThat(error.getMessage(),
Expand All @@ -124,8 +124,8 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f
}
toSeqNo = randomLongBetween(fromSeqNo, refreshedSeqNo);
searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) {
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService,
between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true, randomBoolean())) {
searcher = null;
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo));
} finally {
Expand All @@ -135,7 +135,8 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f
// Get snapshot via engine will auto refresh
fromSeqNo = randomLongBetween(0, numOps - 1);
toSeqNo = randomLongBetween(fromSeqNo, numOps - 1);
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, randomBoolean())) {
try (Translog.Snapshot snapshot =
engine.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, randomBoolean(), randomBoolean())) {
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo));
}
}
Expand Down Expand Up @@ -166,7 +167,8 @@ public void testSkipNonRootOfNestedDocuments() throws Exception {
long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo();
engine.refresh("test");
Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, between(1, 100), 0, maxSeqNo, false)) {
try (Translog.Snapshot snapshot =
new LuceneChangesSnapshot(searcher, mapperService, between(1, 100), 0, maxSeqNo, false, randomBoolean())) {
assertThat(snapshot.totalOperations(), equalTo(seqNoToTerm.size()));
Translog.Operation op;
while ((op = snapshot.next()) != null) {
Expand Down Expand Up @@ -219,7 +221,7 @@ public void testUpdateAndReadChangesConcurrently() throws Exception {

public void testAccessStoredFieldsSequentially() throws Exception {
try (Store store = createStore();
Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) {
InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) {
int smallBatch = between(5, 9);
long seqNo = 0;
for (int i = 0; i < smallBatch; i++) {
Expand All @@ -236,28 +238,36 @@ public void testAccessStoredFieldsSequentially() throws Exception {
// disable optimization for a small batch
Translog.Operation op;
try (LuceneChangesSnapshot snapshot = (LuceneChangesSnapshot) engine.newChangesSnapshot(
"test", createMapperService("test"), 0L, between(1, smallBatch), false)) {
"test", createMapperService("test"), 0L, between(1, smallBatch), false, randomBoolean())) {
while ((op = snapshot.next()) != null) {
assertFalse(op.toString(), snapshot.useSequentialStoredFieldsReader());
}
assertFalse(snapshot.useSequentialStoredFieldsReader());
}
// disable optimization for non-sequential accesses
try (LuceneChangesSnapshot snapshot = (LuceneChangesSnapshot) engine.newChangesSnapshot(
"test", createMapperService("test"), between(1, 3), between(20, 100), false)) {
"test", createMapperService("test"), between(1, 3), between(20, 100), false, randomBoolean())) {
while ((op = snapshot.next()) != null) {
assertFalse(op.toString(), snapshot.useSequentialStoredFieldsReader());
}
assertFalse(snapshot.useSequentialStoredFieldsReader());
}
// enable optimization for sequential access of 10+ docs
try (LuceneChangesSnapshot snapshot = (LuceneChangesSnapshot) engine.newChangesSnapshot(
"test", createMapperService("test"), 11, between(21, 100), false)) {
"test", createMapperService("test"), 11, between(21, 100), false, true)) {
while ((op = snapshot.next()) != null) {
assertTrue(op.toString(), snapshot.useSequentialStoredFieldsReader());
}
assertTrue(snapshot.useSequentialStoredFieldsReader());
}
// disable optimization if snapshot is accessed by multiple consumers
try (LuceneChangesSnapshot snapshot = (LuceneChangesSnapshot) engine.newChangesSnapshot(
"test", createMapperService("test"), 11, between(21, 100), false, false)) {
while ((op = snapshot.next()) != null) {
assertFalse(op.toString(), snapshot.useSequentialStoredFieldsReader());
}
assertFalse(snapshot.useSequentialStoredFieldsReader());
}
}
}

Expand All @@ -284,7 +294,8 @@ void pullOperations(InternalEngine follower) throws IOException {
long fromSeqNo = followerCheckpoint + 1;
long batchSize = randomLongBetween(0, 100);
long toSeqNo = Math.min(fromSeqNo + batchSize, leaderCheckpoint);
try (Translog.Snapshot snapshot = leader.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true)) {
try (Translog.Snapshot snapshot =
leader.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true, randomBoolean())) {
translogHandler.run(follower, snapshot);
}
}
Expand Down Expand Up @@ -330,7 +341,7 @@ private List<Translog.Operation> drainAll(Translog.Snapshot snapshot) throws IOE
public void testOverFlow() throws Exception {
long fromSeqNo = randomLongBetween(0, 5);
long toSeqNo = randomLongBetween(Long.MAX_VALUE - 5, Long.MAX_VALUE);
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true)) {
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true, randomBoolean())) {
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
assertThat(error.getMessage(),
containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found"));
Expand Down

0 comments on commit db58272

Please sign in to comment.