Skip to content

Commit

Permalink
Fix InternalEngineTests#assertOpsOnPrimary (#37746) (#38990)
Browse files Browse the repository at this point in the history
The assertion `assertOpsOnPrimary` does not store seq_no and primary
term of successful deletes to the `lastOpSeqNo` and `lastOpTerm`. This
leads to failures of the subsequence CAS deletes or indexes with seq_no
and term. Moreover, this assertion trips a translog assertion because it
bumps the primary term of some operations but not the primary term of
the engine.

Relates #36467
Closes #37684
  • Loading branch information
talevy committed Feb 16, 2019
1 parent 7b42081 commit f9a06ff
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 12 deletions.
Expand Up @@ -1811,10 +1811,13 @@ public void testVersionOnPrimaryWithConcurrentRefresh() throws Exception {
}
});
refreshThread.start();
latch.await();
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine);
running.set(false);
refreshThread.join();
try {
latch.await();
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine);
} finally {
running.set(false);
refreshThread.join();
}
}

private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion, boolean docDeleted, InternalEngine engine)
Expand All @@ -1824,7 +1827,7 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
long lastOpVersion = currentOpVersion;
long lastOpSeqNo = UNASSIGNED_SEQ_NO;
long lastOpTerm = UNASSIGNED_PRIMARY_TERM;
final AtomicLong currentTerm = new AtomicLong(1);
PrimaryTermSupplier currentTerm = (PrimaryTermSupplier) engine.engineConfig.getPrimaryTermSupplier();
BiFunction<Long, Engine.Index, Engine.Index> indexWithVersion = (version, index) -> new Engine.Index(index.uid(), index.parsedDoc(),
UNASSIGNED_SEQ_NO, currentTerm.get(), version, index.versionType(), index.origin(), index.startTime(),
index.getAutoGeneratedIdTimestamp(), index.isRetry(), UNASSIGNED_SEQ_NO, 0);
Expand All @@ -1837,6 +1840,12 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
TriFunction<Long, Long, Engine.Delete, Engine.Delete> delWithSeq = (seqNo, term, delete) -> new Engine.Delete(delete.type(),
delete.id(), delete.uid(), UNASSIGNED_SEQ_NO, currentTerm.get(), delete.version(), delete.versionType(), delete.origin(),
delete.startTime(), seqNo, term);
Function<Engine.Index, Engine.Index> indexWithCurrentTerm = index -> new Engine.Index(index.uid(),
index.parsedDoc(), UNASSIGNED_SEQ_NO, currentTerm.get(), index.version(), index.versionType(), index.origin(),
index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry(), index.getIfSeqNo(), index.getIfPrimaryTerm());
Function<Engine.Delete, Engine.Delete> deleteWithCurrentTerm = delete -> new Engine.Delete(delete.type(),
delete.id(), delete.uid(), UNASSIGNED_SEQ_NO, currentTerm.get(), delete.version(), delete.versionType(), delete.origin(),
delete.startTime(), delete.getIfSeqNo(), delete.getIfPrimaryTerm());
for (Engine.Operation op : ops) {
final boolean versionConflict = rarely();
final boolean versionedOp = versionConflict || randomBoolean();
Expand All @@ -1848,7 +1857,8 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
lastOpSeqNo;
final long conflictingTerm = conflictingSeqNo == lastOpSeqNo || randomBoolean() ? lastOpTerm + 1 : lastOpTerm;
if (rarely()) {
currentTerm.incrementAndGet();
currentTerm.set(currentTerm.get() + 1L);
engine.rollTranslogGeneration();
}
final long correctVersion = docDeleted && randomBoolean() ? Versions.MATCH_DELETED : lastOpVersion;
logger.info("performing [{}]{}{}",
Expand Down Expand Up @@ -1879,7 +1889,7 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
result = engine.index(indexWithVersion.apply(correctVersion, index));
}
} else {
result = engine.index(index);
result = engine.index(indexWithCurrentTerm.apply(index));
}
assertThat(result.isCreated(), equalTo(docDeleted));
assertThat(result.getVersion(), equalTo(Math.max(lastOpVersion + 1, 1)));
Expand Down Expand Up @@ -1913,16 +1923,16 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
} else if (versionedOp) {
result = engine.delete(delWithVersion.apply(correctVersion, delete));
} else {
result = engine.delete(delete);
result = engine.delete(deleteWithCurrentTerm.apply(delete));
}
assertThat(result.isFound(), equalTo(docDeleted == false));
assertThat(result.getVersion(), equalTo(Math.max(lastOpVersion + 1, 1)));
assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS));
assertThat(result.getFailure(), nullValue());
docDeleted = true;
lastOpVersion = result.getVersion();
lastOpSeqNo = UNASSIGNED_SEQ_NO;
lastOpTerm = UNASSIGNED_PRIMARY_TERM;
lastOpSeqNo = result.getSeqNo();
lastOpTerm = result.getTerm();
opsPerformed++;
}
}
Expand Down Expand Up @@ -1950,6 +1960,8 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
engine.clearDeletedTombstones();
if (docDeleted) {
lastOpVersion = Versions.NOT_FOUND;
lastOpSeqNo = UNASSIGNED_SEQ_NO;
lastOpTerm = UNASSIGNED_PRIMARY_TERM;
}
}
}
Expand Down
Expand Up @@ -149,7 +149,7 @@ public abstract class EngineTestCase extends ESTestCase {
protected Path primaryTranslogDir;
protected Path replicaTranslogDir;
// A default primary term is used by engine instances created in this test.
protected AtomicLong primaryTerm = new AtomicLong();
protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(0L);

protected static void assertVisibleCount(Engine engine, int numDocs) throws IOException {
assertVisibleCount(engine, numDocs, true);
Expand Down Expand Up @@ -606,7 +606,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) {
breakerService,
globalCheckpointSupplier == null ?
new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}) :
globalCheckpointSupplier, primaryTerm::get, tombstoneDocSupplier());
globalCheckpointSupplier, primaryTerm, tombstoneDocSupplier());
return config;
}

Expand Down Expand Up @@ -1033,4 +1033,25 @@ public static Translog getTranslog(Engine engine) {
InternalEngine internalEngine = (InternalEngine) engine;
return internalEngine.getTranslog();
}

public static final class PrimaryTermSupplier implements LongSupplier {
private final AtomicLong term;

PrimaryTermSupplier(long initialTerm) {
this.term = new AtomicLong(initialTerm);
}

public long get() {
return term.get();
}

public void set(long newTerm) {
this.term.set(newTerm);
}

@Override
public long getAsLong() {
return get();
}
}
}

0 comments on commit f9a06ff

Please sign in to comment.