From 0cf80f4f2d87091860214d9b1d0725fad2833d1d Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 29 May 2023 13:44:10 -0600 Subject: [PATCH] Move global checkpoint sync to write threadpool (#96364) This commit moves the global checkpoint sync action to the write thread pool. Additionally, it moves the sync pathway to the same pathway as the location sync so that location syncs and global checkpoint syncs will worksteal against each other instead of generating independent syncs. --- .../action/RejectionActionIT.java | 3 +- .../index/shard/IndexShardIT.java | 5 +- .../elasticsearch/index/engine/Engine.java | 5 + .../index/engine/InternalEngine.java | 26 ++++- .../index/engine/ReadOnlyEngine.java | 5 + .../seqno/GlobalCheckpointSyncAction.java | 26 ++--- .../elasticsearch/index/shard/IndexShard.java | 11 +++ .../index/translog/Translog.java | 32 ++----- .../index/translog/TranslogWriter.java | 15 ++- .../GlobalCheckpointSyncActionTests.java | 15 ++- .../index/shard/IndexShardTests.java | 63 ++++++++++++ .../index/translog/TranslogTests.java | 95 ++++++++++--------- 12 files changed, 211 insertions(+), 90 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/RejectionActionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/RejectionActionIT.java index 178175b8b5554..6678cc5ac2701 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/RejectionActionIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/RejectionActionIT.java @@ -37,7 +37,8 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { .put("thread_pool.search.size", 1) .put("thread_pool.search.queue_size", 1) .put("thread_pool.write.size", 1) - .put("thread_pool.write.queue_size", 1) + // Needs to be 2 since we have concurrent indexing and global checkpoint syncs + .put("thread_pool.write.queue_size", 2) .put("thread_pool.get.size", 1) .put("thread_pool.get.queue_size", 1) .build(); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 82600e30adb98..20a9bc9781a37 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -160,7 +160,10 @@ public void testDurableFlagHasEffect() { Translog.Location lastWriteLocation = tlog.getLastWriteLocation(); try { // the lastWriteLocaltion has a Integer.MAX_VALUE size so we have to create a new one - return tlog.ensureSynced(new Translog.Location(lastWriteLocation.generation, lastWriteLocation.translogLocation, 0)); + return tlog.ensureSynced( + new Translog.Location(lastWriteLocation.generation, lastWriteLocation.translogLocation, 0), + SequenceNumbers.UNASSIGNED_SEQ_NO + ); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 8901b1ded7d38..5ea2d08b8a709 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -758,6 +758,11 @@ public enum SearcherScope { */ public abstract void asyncEnsureTranslogSynced(Translog.Location location, Consumer listener); + /** + * Ensures that the global checkpoint has been persisted to the underlying storage. + */ + public abstract void asyncEnsureGlobalCheckpointSynced(long globalCheckpoint, Consumer listener); + public abstract void syncTranslog() throws IOException; /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 8cc6ef53f6d9d..24c2242708ead 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -179,7 +179,7 @@ public class InternalEngine extends Engine { private final SoftDeletesPolicy softDeletesPolicy; private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener; private final FlushListeners flushListener; - private final AsyncIOProcessor translogSyncProcessor; + private final AsyncIOProcessor> translogSyncProcessor; private final CompletionStatsCache completionStatsCache; @@ -617,12 +617,23 @@ public boolean isTranslogSyncNeeded() { return getTranslog().syncNeeded(); } - private AsyncIOProcessor createTranslogSyncProcessor(Logger logger, ThreadContext threadContext) { + private AsyncIOProcessor> createTranslogSyncProcessor(Logger logger, ThreadContext threadContext) { return new AsyncIOProcessor<>(logger, 1024, threadContext) { @Override - protected void write(List>> candidates) throws IOException { + protected void write(List, Consumer>> candidates) throws IOException { try { - final boolean synced = translog.ensureSynced(candidates.stream().map(Tuple::v1)); + Translog.Location location = Translog.Location.EMPTY; + long processGlobalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; + for (Tuple, Consumer> syncMarkers : candidates) { + Tuple marker = syncMarkers.v1(); + long globalCheckpointToSync = marker.v1(); + if (globalCheckpointToSync != SequenceNumbers.UNASSIGNED_SEQ_NO) { + processGlobalCheckpoint = SequenceNumbers.max(processGlobalCheckpoint, globalCheckpointToSync); + } + location = location.compareTo(marker.v2()) >= 0 ? location : marker.v2(); + } + + final boolean synced = translog.ensureSynced(location, processGlobalCheckpoint); if (synced) { revisitIndexDeletionPolicyOnTranslogSynced(); } @@ -639,7 +650,12 @@ protected void write(List>> candida @Override public void asyncEnsureTranslogSynced(Translog.Location location, Consumer listener) { - translogSyncProcessor.put(location, listener); + translogSyncProcessor.put(new Tuple<>(SequenceNumbers.NO_OPS_PERFORMED, location), listener); + } + + @Override + public void asyncEnsureGlobalCheckpointSynced(long globalCheckpoint, Consumer listener) { + translogSyncProcessor.put(new Tuple<>(globalCheckpoint, Translog.Location.EMPTY), listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index b75d0906debea..71c9e2ed294cc 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -338,6 +338,11 @@ public void asyncEnsureTranslogSynced(Translog.Location location, Consumer listener) { + listener.accept(null); + } + @Override public void syncTranslog() {} diff --git a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index d466d0988abfb..782a49ab7228a 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -62,7 +62,9 @@ public GlobalCheckpointSyncAction( actionFilters, Request::new, Request::new, - ThreadPool.Names.MANAGEMENT + ThreadPool.Names.WRITE, + false, + true ); } @@ -77,24 +79,26 @@ protected void shardOperationOnPrimary( IndexShard indexShard, ActionListener> listener ) { - ActionListener.completeWith(listener, () -> { - maybeSyncTranslog(indexShard); - return new PrimaryResult<>(request, new ReplicationResponse()); - }); + maybeSyncTranslog(indexShard, listener.map(v -> new PrimaryResult<>(request, new ReplicationResponse()))); } @Override protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { - ActionListener.completeWith(listener, () -> { - maybeSyncTranslog(replica); - return new ReplicaResult(); - }); + maybeSyncTranslog(replica, listener.map(v -> new ReplicaResult())); } - private static void maybeSyncTranslog(final IndexShard indexShard) throws IOException { + private static void maybeSyncTranslog(IndexShard indexShard, ActionListener listener) { if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST && indexShard.getLastSyncedGlobalCheckpoint() < indexShard.getLastKnownGlobalCheckpoint()) { - indexShard.sync(); + indexShard.syncGlobalCheckpoint(indexShard.getLastKnownGlobalCheckpoint(), e -> { + if (e == null) { + listener.onResponse(null); + } else { + listener.onFailure(e); + } + }); + } else { + listener.onResponse(null); } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 4ab3cd521526e..fd53b07e04013 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -3615,6 +3615,17 @@ public final void sync(Translog.Location location, Consumer syncListe getEngine().asyncEnsureTranslogSynced(location, syncListener); } + /** + * This method provides the same behavior as #sync but for persisting the global checkpoint. It will initiate a sync + * if the request global checkpoint is greater than the currently persisted global checkpoint. However, same as #sync it + * will not ensure that the request global checkpoint is available to be synced. It is the caller's duty to only call this + * method with a valid processed global checkpoint that is available to sync. + */ + public void syncGlobalCheckpoint(long globalCheckpoint, Consumer syncListener) { + verifyNotClosed(); + getEngine().asyncEnsureGlobalCheckpointSynced(globalCheckpoint, syncListener); + } + public void sync() throws IOException { verifyNotClosed(); getEngine().syncTranslog(); diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 011b68b38a9c1..8759f1cf4b9d1 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -51,7 +51,6 @@ import java.util.Iterator; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; @@ -839,15 +838,18 @@ public void trimOperations(long belowTerm, long aboveSeqNo) throws IOException { } /** - * Ensures that the given location has be synced / written to the underlying storage. + * Ensures that the given location and global checkpoint has be synced / written to the underlying storage. * * @return Returns true iff this call caused an actual sync operation otherwise false */ - public boolean ensureSynced(Location location) throws IOException { + public boolean ensureSynced(Location location, long globalCheckpoint) throws IOException { try (ReleasableLock lock = readLock.acquire()) { - if (location.generation == current.getGeneration()) { // if we have a new one it's already synced + // if we have a new generation and the persisted global checkpoint is greater than or equal to the sync global checkpoint it's + // already synced + long persistedGlobalCheckpoint = current.getLastSyncedCheckpoint().globalCheckpoint; + if (location.generation == current.getGeneration() || persistedGlobalCheckpoint < globalCheckpoint) { ensureOpen(); - return current.syncUpTo(location.translogLocation + location.size); + return current.syncUpTo(location.translogLocation + location.size, globalCheckpoint); } } catch (final Exception ex) { closeOnTragicEvent(ex); @@ -856,24 +858,6 @@ public boolean ensureSynced(Location location) throws IOException { return false; } - /** - * Ensures that all locations in the given stream have been synced / written to the underlying storage. - * This method allows for internal optimization to minimize the amount of fsync operations if multiple - * locations must be synced. - * - * @return Returns true iff this call caused an actual sync operation otherwise false - */ - public boolean ensureSynced(Stream locations) throws IOException { - final Optional max = locations.max(Location::compareTo); - // we only need to sync the max location since it will sync all other - // locations implicitly - if (max.isPresent()) { - return ensureSynced(max.get()); - } else { - return false; - } - } - /** * Closes the translog if the current translog writer experienced a tragic exception. * @@ -929,6 +913,8 @@ public TranslogDeletionPolicy getDeletionPolicy() { public static class Location implements Comparable { + public static Location EMPTY = new Location(0, 0, 0); + public final long generation; public final long translogLocation; public final int size; diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index c390ace777d3b..c3715e17efae8 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -346,7 +346,7 @@ synchronized boolean assertNoSeqAbove(long belowTerm, long aboveSeqNo) { * raising the exception. */ public void sync() throws IOException { - syncUpTo(Long.MAX_VALUE); + syncUpTo(Long.MAX_VALUE, SequenceNumbers.UNASSIGNED_SEQ_NO); } /** @@ -462,10 +462,17 @@ private long getWrittenOffset() throws IOException { * * @return true if this call caused an actual sync operation */ - final boolean syncUpTo(long offset) throws IOException { - if (lastSyncedCheckpoint.offset < offset && syncNeeded()) { + final boolean syncUpTo(long offset, long globalCheckpointToPersist) throws IOException { + if ((lastSyncedCheckpoint.offset < offset || lastSyncedCheckpoint.globalCheckpoint < globalCheckpointToPersist) && syncNeeded()) { + assert globalCheckpointToPersist <= globalCheckpointSupplier.getAsLong() + : "globalCheckpointToPersist [" + + globalCheckpointToPersist + + "] greater than global checkpoint [" + + globalCheckpointSupplier.getAsLong() + + "]"; synchronized (syncLock) { // only one sync/checkpoint should happen concurrently but we wait - if (lastSyncedCheckpoint.offset < offset && syncNeeded()) { + if ((lastSyncedCheckpoint.offset < offset || lastSyncedCheckpoint.globalCheckpoint < globalCheckpointToPersist) + && syncNeeded()) { // double checked locking - we don't want to fsync unless we have to and now that we have // the lock we should check again since if this code is busy we might have fsynced enough already final Checkpoint checkpointToSync; diff --git a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java index 69b2ff769d710..934bd54720ed2 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java @@ -27,8 +27,13 @@ import org.elasticsearch.transport.TransportService; import java.util.Collections; +import java.util.function.Consumer; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -100,6 +105,11 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception { when(indexShard.getLastKnownGlobalCheckpoint()).thenReturn(globalCheckpoint); when(indexShard.getLastSyncedGlobalCheckpoint()).thenReturn(lastSyncedGlobalCheckpoint); + doAnswer(invocation -> { + Consumer argument = invocation.getArgument(1); + argument.accept(null); + return null; + }).when(indexShard).syncGlobalCheckpoint(anyLong(), any()); final GlobalCheckpointSyncAction action = new GlobalCheckpointSyncAction( Settings.EMPTY, @@ -123,9 +133,10 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception { if (durability == Translog.Durability.ASYNC || lastSyncedGlobalCheckpoint == globalCheckpoint) { verify(indexShard, never()).sync(); + verify(indexShard, never()).syncGlobalCheckpoint(anyLong(), any()); } else { - verify(indexShard).sync(); + verify(indexShard, never()).sync(); + verify(indexShard).syncGlobalCheckpoint(eq(globalCheckpoint), any()); } } - } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index a090507fd3290..2317b8a010e3f 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1534,6 +1534,69 @@ public void run() { closeShards(shard); } + public void testAsyncPersistGlobalCheckpointSync() throws InterruptedException, IOException { + final ShardId shardId = new ShardId("index", "_na_", 0); + final ShardRouting shardRouting = TestShardRouting.newShardRouting( + shardId, + randomAlphaOfLength(8), + true, + ShardRoutingState.INITIALIZING, + RecoverySource.EmptyStoreRecoverySource.INSTANCE + ); + final Settings settings = indexSettings(Version.CURRENT, 1, 2).build(); + final IndexMetadata.Builder indexMetadata = IndexMetadata.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1); + IndexShard shard = newShard( + shardRouting, + indexMetadata.build(), + null, + new InternalEngineFactory(), + ignoredShardId -> {}, + RetentionLeaseSyncer.EMPTY + ); + recoverShardFromStore(shard); + + final int maxSeqNo = randomIntBetween(0, 128); + for (int i = 0; i <= maxSeqNo; i++) { + EngineTestCase.generateNewSeqNo(shard.getEngine()); + } + final long checkpoint = rarely() ? maxSeqNo - scaledRandomIntBetween(0, maxSeqNo) : maxSeqNo; + shard.updateLocalCheckpointForShard(shardRouting.allocationId().getId(), checkpoint); + shard.updateGlobalCheckpointForShard(shard.routingEntry().allocationId().getId(), shard.getLocalCheckpoint()); + + Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); + Thread[] thread = new Thread[randomIntBetween(3, 5)]; + CountDownLatch latch = new CountDownLatch(thread.length); + for (int i = 0; i < thread.length; i++) { + thread[i] = new Thread() { + @Override + public void run() { + try { + latch.countDown(); + latch.await(); + for (int i = 0; i < 10000; i++) { + semaphore.acquire(); + shard.syncGlobalCheckpoint( + randomLongBetween(0, shard.getLastKnownGlobalCheckpoint()), + (ex) -> semaphore.release() + ); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + }; + thread[i].start(); + } + + for (int i = 0; i < thread.length; i++) { + thread[i].join(); + } + assertTrue(semaphore.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS)); + assertEquals(shard.getLastKnownGlobalCheckpoint(), shard.getLastSyncedGlobalCheckpoint()); + + closeShards(shard); + } + public void testShardStats() throws IOException { IndexShard shard = newStartedShard(); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 790fed4621683..85db41f745543 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -1024,7 +1024,7 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep fail("duplicate op [" + op + "], old entry at " + location); } if (id % writers.length == threadId) { - translog.ensureSynced(location); + translog.ensureSynced(location, SequenceNumbers.UNASSIGNED_SEQ_NO); } if (id % flushEveryOps == 0) { synchronized (flushMutex) { @@ -1162,67 +1162,72 @@ protected void doRun() throws Exception { logger.info("--> test done. total ops written [{}]", writtenOps.size()); } - public void testSyncUpTo() throws IOException { - int translogOperations = randomIntBetween(10, 100); - int count = 0; - for (int op = 0; op < translogOperations; op++) { - int seqNo = ++count; - final Translog.Location location = translog.add(TranslogOperationsUtils.indexOp("" + op, seqNo, primaryTerm.get())); - if (randomBoolean()) { - assertTrue("at least one operation pending", translog.syncNeeded()); - assertTrue("this operation has not been synced", translog.ensureSynced(location)); - // we are the last location so everything should be synced - assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); - seqNo = ++count; - translog.add(TranslogOperationsUtils.indexOp("" + op, seqNo, primaryTerm.get())); - assertTrue("one pending operation", translog.syncNeeded()); - assertFalse("this op has been synced before", translog.ensureSynced(location)); // not syncing now - assertTrue("we only synced a previous operation yet", translog.syncNeeded()); - } - if (rarely()) { - translog.rollGeneration(); - assertFalse("location is from a previous translog - already synced", translog.ensureSynced(location)); // not syncing now - assertFalse("no sync needed since no operations in current translog", translog.syncNeeded()); - } - - if (randomBoolean()) { - translog.sync(); - assertFalse("translog has been synced already", translog.ensureSynced(location)); - } - } - } + public void testSyncUpToLocationAndCheckpoint() throws IOException { + assertFalse( + "translog empty location and not ops performed will not require sync", + translog.ensureSynced(Location.EMPTY, SequenceNumbers.UNASSIGNED_SEQ_NO) + ); - public void testSyncUpToStream() throws IOException { - int iters = randomIntBetween(5, 10); + int iters = randomIntBetween(25, 50); + Location alreadySynced = Location.EMPTY; + long alreadySyncedCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; for (int i = 0; i < iters; i++) { int translogOperations = randomIntBetween(10, 100); int count = 0; - ArrayList locations = new ArrayList<>(); + + Location location = null; + final ArrayList locations = new ArrayList<>(); + final ArrayList locationsInCurrentGeneration = new ArrayList<>(); for (int op = 0; op < translogOperations; op++) { if (rarely()) { translog.rollGeneration(); + locationsInCurrentGeneration.clear(); } - final Translog.Location location = translog.add(indexOp("" + op, op, primaryTerm.get(), Integer.toString(++count))); + location = translog.add(indexOp("" + op, op, primaryTerm.get(), Integer.toString(++count))); + globalCheckpoint.incrementAndGet(); locations.add(location); + locationsInCurrentGeneration.add(location); } - Collections.shuffle(locations, random()); + + assertFalse("should have been synced on previous iteration", translog.ensureSynced(alreadySynced, alreadySyncedCheckpoint)); + if (randomBoolean()) { assertTrue("at least one operation pending", translog.syncNeeded()); - assertTrue("this operation has not been synced", translog.ensureSynced(locations.stream())); - // we are the last location so everything should be synced + if (randomBoolean()) { + Location randomLocationToSync = locationsInCurrentGeneration.get(randomInt(locationsInCurrentGeneration.size() - 1)); + assertTrue( + "this operation has not been synced", + translog.ensureSynced(randomLocationToSync, SequenceNumbers.UNASSIGNED_SEQ_NO) + ); + } else { + long globalCheckpointToSync = randomLongBetween(translog.getLastSyncedGlobalCheckpoint() + 1, globalCheckpoint.get()); + assertTrue( + "this global checkpoint has not been persisted", + translog.ensureSynced(Location.EMPTY, globalCheckpointToSync) + ); + } + // everything should be synced assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); } else if (rarely()) { translog.rollGeneration(); // not syncing now - assertFalse("location is from a previous translog - already synced", translog.ensureSynced(locations.stream())); + assertFalse( + "location is from a previous translog - already synced", + translog.ensureSynced(location, globalCheckpoint.get()) + ); assertFalse("no sync needed since no operations in current translog", translog.syncNeeded()); } else { translog.sync(); - assertFalse("translog has been synced already", translog.ensureSynced(locations.stream())); + assertFalse("translog has been synced already", translog.ensureSynced(location, globalCheckpoint.get())); } - for (Location location : locations) { - assertFalse("all of the locations should be synced: " + location, translog.ensureSynced(location)); + + Collections.shuffle(locations, random()); + for (Location l : locations) { + assertFalse("all of the locations should be synced: " + l, translog.ensureSynced(l, SequenceNumbers.UNASSIGNED_SEQ_NO)); } + + alreadySynced = location; + alreadySyncedCheckpoint = globalCheckpoint.get(); } } @@ -2550,7 +2555,7 @@ public void testTragicEventCanBeAnyException() throws IOException { try { Translog.Location location = translog.add(indexOp("2", 1, primaryTerm.get(), lineFileDocs.nextDoc().toString())); if (randomBoolean()) { - translog.ensureSynced(location); + translog.ensureSynced(location, SequenceNumbers.UNASSIGNED_SEQ_NO); } else { translog.sync(); } @@ -3888,7 +3893,11 @@ public void testSyncConcurrently() throws Exception { long globalCheckpoint = lastGlobalCheckpoint.get(); final boolean synced; if (randomBoolean()) { - synced = translog.ensureSynced(location); + if (randomBoolean()) { + synced = translog.ensureSynced(location, globalCheckpoint); + } else { + synced = translog.ensureSynced(location, SequenceNumbers.UNASSIGNED_SEQ_NO); + } } else { translog.sync(); synced = true;