Skip to content

Commit

Permalink
Fix testKeepTranslogAfterGlobalCheckpoint (#55868)
Browse files Browse the repository at this point in the history
If we advance the global checkpoint during commit and sync that
checkpoint after commit, then the assertions in the test won't hold
because the deletion policy did not see the latest global checkpoint
but only the value before committing.

Closes #55680
  • Loading branch information
dnhatn committed Apr 28, 2020
1 parent 11f994b commit 2c4ecf7
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4886,9 +4886,11 @@ public void testKeepTranslogAfterGlobalCheckpoint() throws Exception {

final EngineConfig engineConfig = config(indexSettings, store, translogPath,
NoMergePolicy.INSTANCE, null, null, () -> globalCheckpoint.get());
final AtomicLong lastSyncedGlobalCheckpointBeforeCommit = new AtomicLong(Translog.readGlobalCheckpoint(translogPath, translogUUID));
try (InternalEngine engine = new InternalEngine(engineConfig) {
@Override
protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
lastSyncedGlobalCheckpointBeforeCommit.set(Translog.readGlobalCheckpoint(translogPath, translogUUID));
// Advance the global checkpoint during the flush to create a lag between a persisted global checkpoint in the translog
// (this value is visible to the deletion policy) and an in memory global checkpoint in the SequenceNumbersService.
if (rarely()) {
Expand All @@ -4909,21 +4911,20 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s
}
if (frequently()) {
engine.flush(randomBoolean(), true);
final long lastSyncedGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID);
final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
// Keep only one safe commit as the oldest commit.
final IndexCommit safeCommit = commits.get(0);
if (lastSyncedGlobalCheckpoint == UNASSIGNED_SEQ_NO) {
if (lastSyncedGlobalCheckpointBeforeCommit.get() == UNASSIGNED_SEQ_NO) {
// If the global checkpoint is still unassigned, we keep an empty(eg. initial) commit as a safe commit.
assertThat(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
equalTo(SequenceNumbers.NO_OPS_PERFORMED));
} else {
assertThat(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
lessThanOrEqualTo(lastSyncedGlobalCheckpoint));
lessThanOrEqualTo(lastSyncedGlobalCheckpointBeforeCommit.get()));
}
for (int i = 1; i < commits.size(); i++) {
assertThat(Long.parseLong(commits.get(i).getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
greaterThan(lastSyncedGlobalCheckpoint));
greaterThan(lastSyncedGlobalCheckpointBeforeCommit.get()));
}
// Make sure we keep all translog operations after the local checkpoint of the safe commit.
long localCheckpointFromSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
Expand Down

0 comments on commit 2c4ecf7

Please sign in to comment.