diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/RejectionActionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/RejectionActionIT.java index 178175b8b5554..6678cc5ac2701 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/RejectionActionIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/RejectionActionIT.java @@ -37,7 +37,8 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { .put("thread_pool.search.size", 1) .put("thread_pool.search.queue_size", 1) .put("thread_pool.write.size", 1) - .put("thread_pool.write.queue_size", 1) + // Needs to be 2 since we have concurrent indexing and global checkpoint syncs + .put("thread_pool.write.queue_size", 2) .put("thread_pool.get.size", 1) .put("thread_pool.get.queue_size", 1) .build(); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 82600e30adb98..20a9bc9781a37 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -160,7 +160,10 @@ public void testDurableFlagHasEffect() { Translog.Location lastWriteLocation = tlog.getLastWriteLocation(); try { // the lastWriteLocaltion has a Integer.MAX_VALUE size so we have to create a new one - return tlog.ensureSynced(new Translog.Location(lastWriteLocation.generation, lastWriteLocation.translogLocation, 0)); + return tlog.ensureSynced( + new Translog.Location(lastWriteLocation.generation, lastWriteLocation.translogLocation, 0), + SequenceNumbers.UNASSIGNED_SEQ_NO + ); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 8901b1ded7d38..5ea2d08b8a709 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -758,6 +758,11 @@ public enum SearcherScope { */ public abstract void asyncEnsureTranslogSynced(Translog.Location location, Consumer listener); + /** + * Ensures that the global checkpoint has been persisted to the underlying storage. + */ + public abstract void asyncEnsureGlobalCheckpointSynced(long globalCheckpoint, Consumer listener); + public abstract void syncTranslog() throws IOException; /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 8cc6ef53f6d9d..24c2242708ead 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -179,7 +179,7 @@ public class InternalEngine extends Engine { private final SoftDeletesPolicy softDeletesPolicy; private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener; private final FlushListeners flushListener; - private final AsyncIOProcessor translogSyncProcessor; + private final AsyncIOProcessor> translogSyncProcessor; private final CompletionStatsCache completionStatsCache; @@ -617,12 +617,23 @@ public boolean isTranslogSyncNeeded() { return getTranslog().syncNeeded(); } - private AsyncIOProcessor createTranslogSyncProcessor(Logger logger, ThreadContext threadContext) { + private AsyncIOProcessor> createTranslogSyncProcessor(Logger logger, ThreadContext threadContext) { return new AsyncIOProcessor<>(logger, 1024, threadContext) { @Override - protected void write(List>> candidates) throws IOException { + protected void write(List, Consumer>> candidates) throws IOException { try { - final boolean synced = translog.ensureSynced(candidates.stream().map(Tuple::v1)); + Translog.Location location = Translog.Location.EMPTY; + long processGlobalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; + for (Tuple, Consumer> syncMarkers : candidates) { + Tuple marker = syncMarkers.v1(); + long globalCheckpointToSync = marker.v1(); + if (globalCheckpointToSync != SequenceNumbers.UNASSIGNED_SEQ_NO) { + processGlobalCheckpoint = SequenceNumbers.max(processGlobalCheckpoint, globalCheckpointToSync); + } + location = location.compareTo(marker.v2()) >= 0 ? location : marker.v2(); + } + + final boolean synced = translog.ensureSynced(location, processGlobalCheckpoint); if (synced) { revisitIndexDeletionPolicyOnTranslogSynced(); } @@ -639,7 +650,12 @@ protected void write(List>> candida @Override public void asyncEnsureTranslogSynced(Translog.Location location, Consumer listener) { - translogSyncProcessor.put(location, listener); + translogSyncProcessor.put(new Tuple<>(SequenceNumbers.NO_OPS_PERFORMED, location), listener); + } + + @Override + public void asyncEnsureGlobalCheckpointSynced(long globalCheckpoint, Consumer listener) { + translogSyncProcessor.put(new Tuple<>(globalCheckpoint, Translog.Location.EMPTY), listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index b75d0906debea..71c9e2ed294cc 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -338,6 +338,11 @@ public void asyncEnsureTranslogSynced(Translog.Location location, Consumer listener) { + listener.accept(null); + } + @Override public void syncTranslog() {} diff --git a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index d466d0988abfb..782a49ab7228a 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -62,7 +62,9 @@ public GlobalCheckpointSyncAction( actionFilters, Request::new, Request::new, - ThreadPool.Names.MANAGEMENT + ThreadPool.Names.WRITE, + false, + true ); } @@ -77,24 +79,26 @@ protected void shardOperationOnPrimary( IndexShard indexShard, ActionListener> listener ) { - ActionListener.completeWith(listener, () -> { - maybeSyncTranslog(indexShard); - return new PrimaryResult<>(request, new ReplicationResponse()); - }); + maybeSyncTranslog(indexShard, listener.map(v -> new PrimaryResult<>(request, new ReplicationResponse()))); } @Override protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { - ActionListener.completeWith(listener, () -> { - maybeSyncTranslog(replica); - return new ReplicaResult(); - }); + maybeSyncTranslog(replica, listener.map(v -> new ReplicaResult())); } - private static void maybeSyncTranslog(final IndexShard indexShard) throws IOException { + private static void maybeSyncTranslog(IndexShard indexShard, ActionListener listener) { if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST && indexShard.getLastSyncedGlobalCheckpoint() < indexShard.getLastKnownGlobalCheckpoint()) { - indexShard.sync(); + indexShard.syncGlobalCheckpoint(indexShard.getLastKnownGlobalCheckpoint(), e -> { + if (e == null) { + listener.onResponse(null); + } else { + listener.onFailure(e); + } + }); + } else { + listener.onResponse(null); } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 4ab3cd521526e..fd53b07e04013 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -3615,6 +3615,17 @@ public final void sync(Translog.Location location, Consumer syncListe getEngine().asyncEnsureTranslogSynced(location, syncListener); } + /** + * This method provides the same behavior as #sync but for persisting the global checkpoint. It will initiate a sync + * if the request global checkpoint is greater than the currently persisted global checkpoint. However, same as #sync it + * will not ensure that the request global checkpoint is available to be synced. It is the caller's duty to only call this + * method with a valid processed global checkpoint that is available to sync. + */ + public void syncGlobalCheckpoint(long globalCheckpoint, Consumer syncListener) { + verifyNotClosed(); + getEngine().asyncEnsureGlobalCheckpointSynced(globalCheckpoint, syncListener); + } + public void sync() throws IOException { verifyNotClosed(); getEngine().syncTranslog(); diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 011b68b38a9c1..8759f1cf4b9d1 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -51,7 +51,6 @@ import java.util.Iterator; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; @@ -839,15 +838,18 @@ public void trimOperations(long belowTerm, long aboveSeqNo) throws IOException { } /** - * Ensures that the given location has be synced / written to the underlying storage. + * Ensures that the given location and global checkpoint has be synced / written to the underlying storage. * * @return Returns true iff this call caused an actual sync operation otherwise false */ - public boolean ensureSynced(Location location) throws IOException { + public boolean ensureSynced(Location location, long globalCheckpoint) throws IOException { try (ReleasableLock lock = readLock.acquire()) { - if (location.generation == current.getGeneration()) { // if we have a new one it's already synced + // if we have a new generation and the persisted global checkpoint is greater than or equal to the sync global checkpoint it's + // already synced + long persistedGlobalCheckpoint = current.getLastSyncedCheckpoint().globalCheckpoint; + if (location.generation == current.getGeneration() || persistedGlobalCheckpoint < globalCheckpoint) { ensureOpen(); - return current.syncUpTo(location.translogLocation + location.size); + return current.syncUpTo(location.translogLocation + location.size, globalCheckpoint); } } catch (final Exception ex) { closeOnTragicEvent(ex); @@ -856,24 +858,6 @@ public boolean ensureSynced(Location location) throws IOException { return false; } - /** - * Ensures that all locations in the given stream have been synced / written to the underlying storage. - * This method allows for internal optimization to minimize the amount of fsync operations if multiple - * locations must be synced. - * - * @return Returns true iff this call caused an actual sync operation otherwise false - */ - public boolean ensureSynced(Stream locations) throws IOException { - final Optional max = locations.max(Location::compareTo); - // we only need to sync the max location since it will sync all other - // locations implicitly - if (max.isPresent()) { - return ensureSynced(max.get()); - } else { - return false; - } - } - /** * Closes the translog if the current translog writer experienced a tragic exception. * @@ -929,6 +913,8 @@ public TranslogDeletionPolicy getDeletionPolicy() { public static class Location implements Comparable { + public static Location EMPTY = new Location(0, 0, 0); + public final long generation; public final long translogLocation; public final int size; diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index c390ace777d3b..c3715e17efae8 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -346,7 +346,7 @@ synchronized boolean assertNoSeqAbove(long belowTerm, long aboveSeqNo) { * raising the exception. */ public void sync() throws IOException { - syncUpTo(Long.MAX_VALUE); + syncUpTo(Long.MAX_VALUE, SequenceNumbers.UNASSIGNED_SEQ_NO); } /** @@ -462,10 +462,17 @@ private long getWrittenOffset() throws IOException { * * @return true if this call caused an actual sync operation */ - final boolean syncUpTo(long offset) throws IOException { - if (lastSyncedCheckpoint.offset < offset && syncNeeded()) { + final boolean syncUpTo(long offset, long globalCheckpointToPersist) throws IOException { + if ((lastSyncedCheckpoint.offset < offset || lastSyncedCheckpoint.globalCheckpoint < globalCheckpointToPersist) && syncNeeded()) { + assert globalCheckpointToPersist <= globalCheckpointSupplier.getAsLong() + : "globalCheckpointToPersist [" + + globalCheckpointToPersist + + "] greater than global checkpoint [" + + globalCheckpointSupplier.getAsLong() + + "]"; synchronized (syncLock) { // only one sync/checkpoint should happen concurrently but we wait - if (lastSyncedCheckpoint.offset < offset && syncNeeded()) { + if ((lastSyncedCheckpoint.offset < offset || lastSyncedCheckpoint.globalCheckpoint < globalCheckpointToPersist) + && syncNeeded()) { // double checked locking - we don't want to fsync unless we have to and now that we have // the lock we should check again since if this code is busy we might have fsynced enough already final Checkpoint checkpointToSync; diff --git a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java index 69b2ff769d710..934bd54720ed2 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java @@ -27,8 +27,13 @@ import org.elasticsearch.transport.TransportService; import java.util.Collections; +import java.util.function.Consumer; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -100,6 +105,11 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception { when(indexShard.getLastKnownGlobalCheckpoint()).thenReturn(globalCheckpoint); when(indexShard.getLastSyncedGlobalCheckpoint()).thenReturn(lastSyncedGlobalCheckpoint); + doAnswer(invocation -> { + Consumer argument = invocation.getArgument(1); + argument.accept(null); + return null; + }).when(indexShard).syncGlobalCheckpoint(anyLong(), any()); final GlobalCheckpointSyncAction action = new GlobalCheckpointSyncAction( Settings.EMPTY, @@ -123,9 +133,10 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception { if (durability == Translog.Durability.ASYNC || lastSyncedGlobalCheckpoint == globalCheckpoint) { verify(indexShard, never()).sync(); + verify(indexShard, never()).syncGlobalCheckpoint(anyLong(), any()); } else { - verify(indexShard).sync(); + verify(indexShard, never()).sync(); + verify(indexShard).syncGlobalCheckpoint(eq(globalCheckpoint), any()); } } - } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index a090507fd3290..2317b8a010e3f 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1534,6 +1534,69 @@ public void run() { closeShards(shard); } + public void testAsyncPersistGlobalCheckpointSync() throws InterruptedException, IOException { + final ShardId shardId = new ShardId("index", "_na_", 0); + final ShardRouting shardRouting = TestShardRouting.newShardRouting( + shardId, + randomAlphaOfLength(8), + true, + ShardRoutingState.INITIALIZING, + RecoverySource.EmptyStoreRecoverySource.INSTANCE + ); + final Settings settings = indexSettings(Version.CURRENT, 1, 2).build(); + final IndexMetadata.Builder indexMetadata = IndexMetadata.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1); + IndexShard shard = newShard( + shardRouting, + indexMetadata.build(), + null, + new InternalEngineFactory(), + ignoredShardId -> {}, + RetentionLeaseSyncer.EMPTY + ); + recoverShardFromStore(shard); + + final int maxSeqNo = randomIntBetween(0, 128); + for (int i = 0; i <= maxSeqNo; i++) { + EngineTestCase.generateNewSeqNo(shard.getEngine()); + } + final long checkpoint = rarely() ? maxSeqNo - scaledRandomIntBetween(0, maxSeqNo) : maxSeqNo; + shard.updateLocalCheckpointForShard(shardRouting.allocationId().getId(), checkpoint); + shard.updateGlobalCheckpointForShard(shard.routingEntry().allocationId().getId(), shard.getLocalCheckpoint()); + + Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); + Thread[] thread = new Thread[randomIntBetween(3, 5)]; + CountDownLatch latch = new CountDownLatch(thread.length); + for (int i = 0; i < thread.length; i++) { + thread[i] = new Thread() { + @Override + public void run() { + try { + latch.countDown(); + latch.await(); + for (int i = 0; i < 10000; i++) { + semaphore.acquire(); + shard.syncGlobalCheckpoint( + randomLongBetween(0, shard.getLastKnownGlobalCheckpoint()), + (ex) -> semaphore.release() + ); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + }; + thread[i].start(); + } + + for (int i = 0; i < thread.length; i++) { + thread[i].join(); + } + assertTrue(semaphore.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS)); + assertEquals(shard.getLastKnownGlobalCheckpoint(), shard.getLastSyncedGlobalCheckpoint()); + + closeShards(shard); + } + public void testShardStats() throws IOException { IndexShard shard = newStartedShard(); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 790fed4621683..85db41f745543 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -1024,7 +1024,7 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep fail("duplicate op [" + op + "], old entry at " + location); } if (id % writers.length == threadId) { - translog.ensureSynced(location); + translog.ensureSynced(location, SequenceNumbers.UNASSIGNED_SEQ_NO); } if (id % flushEveryOps == 0) { synchronized (flushMutex) { @@ -1162,67 +1162,72 @@ protected void doRun() throws Exception { logger.info("--> test done. total ops written [{}]", writtenOps.size()); } - public void testSyncUpTo() throws IOException { - int translogOperations = randomIntBetween(10, 100); - int count = 0; - for (int op = 0; op < translogOperations; op++) { - int seqNo = ++count; - final Translog.Location location = translog.add(TranslogOperationsUtils.indexOp("" + op, seqNo, primaryTerm.get())); - if (randomBoolean()) { - assertTrue("at least one operation pending", translog.syncNeeded()); - assertTrue("this operation has not been synced", translog.ensureSynced(location)); - // we are the last location so everything should be synced - assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); - seqNo = ++count; - translog.add(TranslogOperationsUtils.indexOp("" + op, seqNo, primaryTerm.get())); - assertTrue("one pending operation", translog.syncNeeded()); - assertFalse("this op has been synced before", translog.ensureSynced(location)); // not syncing now - assertTrue("we only synced a previous operation yet", translog.syncNeeded()); - } - if (rarely()) { - translog.rollGeneration(); - assertFalse("location is from a previous translog - already synced", translog.ensureSynced(location)); // not syncing now - assertFalse("no sync needed since no operations in current translog", translog.syncNeeded()); - } - - if (randomBoolean()) { - translog.sync(); - assertFalse("translog has been synced already", translog.ensureSynced(location)); - } - } - } + public void testSyncUpToLocationAndCheckpoint() throws IOException { + assertFalse( + "translog empty location and not ops performed will not require sync", + translog.ensureSynced(Location.EMPTY, SequenceNumbers.UNASSIGNED_SEQ_NO) + ); - public void testSyncUpToStream() throws IOException { - int iters = randomIntBetween(5, 10); + int iters = randomIntBetween(25, 50); + Location alreadySynced = Location.EMPTY; + long alreadySyncedCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; for (int i = 0; i < iters; i++) { int translogOperations = randomIntBetween(10, 100); int count = 0; - ArrayList locations = new ArrayList<>(); + + Location location = null; + final ArrayList locations = new ArrayList<>(); + final ArrayList locationsInCurrentGeneration = new ArrayList<>(); for (int op = 0; op < translogOperations; op++) { if (rarely()) { translog.rollGeneration(); + locationsInCurrentGeneration.clear(); } - final Translog.Location location = translog.add(indexOp("" + op, op, primaryTerm.get(), Integer.toString(++count))); + location = translog.add(indexOp("" + op, op, primaryTerm.get(), Integer.toString(++count))); + globalCheckpoint.incrementAndGet(); locations.add(location); + locationsInCurrentGeneration.add(location); } - Collections.shuffle(locations, random()); + + assertFalse("should have been synced on previous iteration", translog.ensureSynced(alreadySynced, alreadySyncedCheckpoint)); + if (randomBoolean()) { assertTrue("at least one operation pending", translog.syncNeeded()); - assertTrue("this operation has not been synced", translog.ensureSynced(locations.stream())); - // we are the last location so everything should be synced + if (randomBoolean()) { + Location randomLocationToSync = locationsInCurrentGeneration.get(randomInt(locationsInCurrentGeneration.size() - 1)); + assertTrue( + "this operation has not been synced", + translog.ensureSynced(randomLocationToSync, SequenceNumbers.UNASSIGNED_SEQ_NO) + ); + } else { + long globalCheckpointToSync = randomLongBetween(translog.getLastSyncedGlobalCheckpoint() + 1, globalCheckpoint.get()); + assertTrue( + "this global checkpoint has not been persisted", + translog.ensureSynced(Location.EMPTY, globalCheckpointToSync) + ); + } + // everything should be synced assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); } else if (rarely()) { translog.rollGeneration(); // not syncing now - assertFalse("location is from a previous translog - already synced", translog.ensureSynced(locations.stream())); + assertFalse( + "location is from a previous translog - already synced", + translog.ensureSynced(location, globalCheckpoint.get()) + ); assertFalse("no sync needed since no operations in current translog", translog.syncNeeded()); } else { translog.sync(); - assertFalse("translog has been synced already", translog.ensureSynced(locations.stream())); + assertFalse("translog has been synced already", translog.ensureSynced(location, globalCheckpoint.get())); } - for (Location location : locations) { - assertFalse("all of the locations should be synced: " + location, translog.ensureSynced(location)); + + Collections.shuffle(locations, random()); + for (Location l : locations) { + assertFalse("all of the locations should be synced: " + l, translog.ensureSynced(l, SequenceNumbers.UNASSIGNED_SEQ_NO)); } + + alreadySynced = location; + alreadySyncedCheckpoint = globalCheckpoint.get(); } } @@ -2550,7 +2555,7 @@ public void testTragicEventCanBeAnyException() throws IOException { try { Translog.Location location = translog.add(indexOp("2", 1, primaryTerm.get(), lineFileDocs.nextDoc().toString())); if (randomBoolean()) { - translog.ensureSynced(location); + translog.ensureSynced(location, SequenceNumbers.UNASSIGNED_SEQ_NO); } else { translog.sync(); } @@ -3888,7 +3893,11 @@ public void testSyncConcurrently() throws Exception { long globalCheckpoint = lastGlobalCheckpoint.get(); final boolean synced; if (randomBoolean()) { - synced = translog.ensureSynced(location); + if (randomBoolean()) { + synced = translog.ensureSynced(location, globalCheckpoint); + } else { + synced = translog.ensureSynced(location, SequenceNumbers.UNASSIGNED_SEQ_NO); + } } else { translog.sync(); synced = true;