Skip to content

Commit

Permalink
Move global checkpoint sync to write threadpool (#96506)
Browse files Browse the repository at this point in the history
This commit moves the global checkpoint sync action to the write thread
pool. Additionally, it moves the sync pathway to the same pathway as the
location sync so that location syncs and global checkpoint syncs will
worksteal against each other instead of generating independent syncs.
  • Loading branch information
Tim-Brooks committed Jun 1, 2023
1 parent fe49e4f commit fa531b5
Show file tree
Hide file tree
Showing 12 changed files with 211 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
.put("thread_pool.search.size", 1)
.put("thread_pool.search.queue_size", 1)
.put("thread_pool.write.size", 1)
.put("thread_pool.write.queue_size", 1)
// Needs to be 2 since we have concurrent indexing and global checkpoint syncs
.put("thread_pool.write.queue_size", 2)
.put("thread_pool.get.size", 1)
.put("thread_pool.get.queue_size", 1)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ public void testDurableFlagHasEffect() {
Translog.Location lastWriteLocation = tlog.getLastWriteLocation();
try {
// the lastWriteLocaltion has a Integer.MAX_VALUE size so we have to create a new one
return tlog.ensureSynced(new Translog.Location(lastWriteLocation.generation, lastWriteLocation.translogLocation, 0));
return tlog.ensureSynced(
new Translog.Location(lastWriteLocation.generation, lastWriteLocation.translogLocation, 0),
SequenceNumbers.UNASSIGNED_SEQ_NO
);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,11 @@ public enum SearcherScope {
*/
public abstract void asyncEnsureTranslogSynced(Translog.Location location, Consumer<Exception> listener);

/**
* Ensures that the global checkpoint has been persisted to the underlying storage.
*/
public abstract void asyncEnsureGlobalCheckpointSynced(long globalCheckpoint, Consumer<Exception> listener);

public abstract void syncTranslog() throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public class InternalEngine extends Engine {
private final SoftDeletesPolicy softDeletesPolicy;
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;
private final FlushListeners flushListener;
private final AsyncIOProcessor<Translog.Location> translogSyncProcessor;
private final AsyncIOProcessor<Tuple<Long, Translog.Location>> translogSyncProcessor;

private final CompletionStatsCache completionStatsCache;

Expand Down Expand Up @@ -602,12 +602,23 @@ public boolean isTranslogSyncNeeded() {
return getTranslog().syncNeeded();
}

private AsyncIOProcessor<Translog.Location> createTranslogSyncProcessor(Logger logger, ThreadContext threadContext) {
private AsyncIOProcessor<Tuple<Long, Translog.Location>> createTranslogSyncProcessor(Logger logger, ThreadContext threadContext) {
return new AsyncIOProcessor<>(logger, 1024, threadContext) {
@Override
protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candidates) throws IOException {
protected void write(List<Tuple<Tuple<Long, Translog.Location>, Consumer<Exception>>> candidates) throws IOException {
try {
final boolean synced = translog.ensureSynced(candidates.stream().map(Tuple::v1));
Translog.Location location = Translog.Location.EMPTY;
long processGlobalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
for (Tuple<Tuple<Long, Translog.Location>, Consumer<Exception>> syncMarkers : candidates) {
Tuple<Long, Translog.Location> marker = syncMarkers.v1();
long globalCheckpointToSync = marker.v1();
if (globalCheckpointToSync != SequenceNumbers.UNASSIGNED_SEQ_NO) {
processGlobalCheckpoint = SequenceNumbers.max(processGlobalCheckpoint, globalCheckpointToSync);
}
location = location.compareTo(marker.v2()) >= 0 ? location : marker.v2();
}

final boolean synced = translog.ensureSynced(location, processGlobalCheckpoint);
if (synced) {
revisitIndexDeletionPolicyOnTranslogSynced();
}
Expand All @@ -624,7 +635,12 @@ protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candida

@Override
public void asyncEnsureTranslogSynced(Translog.Location location, Consumer<Exception> listener) {
translogSyncProcessor.put(location, listener);
translogSyncProcessor.put(new Tuple<>(SequenceNumbers.NO_OPS_PERFORMED, location), listener);
}

@Override
public void asyncEnsureGlobalCheckpointSynced(long globalCheckpoint, Consumer<Exception> listener) {
translogSyncProcessor.put(new Tuple<>(globalCheckpoint, Translog.Location.EMPTY), listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,11 @@ public void asyncEnsureTranslogSynced(Translog.Location location, Consumer<Excep
listener.accept(null);
}

@Override
public void asyncEnsureGlobalCheckpointSynced(long globalCheckpoint, Consumer<Exception> listener) {
listener.accept(null);
}

@Override
public void syncTranslog() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ public GlobalCheckpointSyncAction(
actionFilters,
Request::new,
Request::new,
ThreadPool.Names.MANAGEMENT
ThreadPool.Names.WRITE,
false,
true
);
}

Expand All @@ -77,24 +79,26 @@ protected void shardOperationOnPrimary(
IndexShard indexShard,
ActionListener<PrimaryResult<Request, ReplicationResponse>> listener
) {
ActionListener.completeWith(listener, () -> {
maybeSyncTranslog(indexShard);
return new PrimaryResult<>(request, new ReplicationResponse());
});
maybeSyncTranslog(indexShard, listener.map(v -> new PrimaryResult<>(request, new ReplicationResponse())));
}

@Override
protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
maybeSyncTranslog(replica);
return new ReplicaResult();
});
maybeSyncTranslog(replica, listener.map(v -> new ReplicaResult()));
}

private static void maybeSyncTranslog(final IndexShard indexShard) throws IOException {
private static <T> void maybeSyncTranslog(IndexShard indexShard, ActionListener<Void> listener) {
if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST
&& indexShard.getLastSyncedGlobalCheckpoint() < indexShard.getLastKnownGlobalCheckpoint()) {
indexShard.sync();
indexShard.syncGlobalCheckpoint(indexShard.getLastKnownGlobalCheckpoint(), e -> {
if (e == null) {
listener.onResponse(null);
} else {
listener.onFailure(e);
}
});
} else {
listener.onResponse(null);
}
}

Expand Down
11 changes: 11 additions & 0 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -3611,6 +3611,17 @@ public final void sync(Translog.Location location, Consumer<Exception> syncListe
getEngine().asyncEnsureTranslogSynced(location, syncListener);
}

/**
* This method provides the same behavior as #sync but for persisting the global checkpoint. It will initiate a sync
* if the request global checkpoint is greater than the currently persisted global checkpoint. However, same as #sync it
* will not ensure that the request global checkpoint is available to be synced. It is the caller's duty to only call this
* method with a valid processed global checkpoint that is available to sync.
*/
public void syncGlobalCheckpoint(long globalCheckpoint, Consumer<Exception> syncListener) {
verifyNotClosed();
getEngine().asyncEnsureGlobalCheckpointSynced(globalCheckpoint, syncListener);
}

public void sync() throws IOException {
verifyNotClosed();
getEngine().syncTranslog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
Expand Down Expand Up @@ -839,15 +838,18 @@ public void trimOperations(long belowTerm, long aboveSeqNo) throws IOException {
}

/**
* Ensures that the given location has be synced / written to the underlying storage.
* Ensures that the given location and global checkpoint has be synced / written to the underlying storage.
*
* @return Returns <code>true</code> iff this call caused an actual sync operation otherwise <code>false</code>
*/
public boolean ensureSynced(Location location) throws IOException {
public boolean ensureSynced(Location location, long globalCheckpoint) throws IOException {
try (ReleasableLock lock = readLock.acquire()) {
if (location.generation == current.getGeneration()) { // if we have a new one it's already synced
// if we have a new generation and the persisted global checkpoint is greater than or equal to the sync global checkpoint it's
// already synced
long persistedGlobalCheckpoint = current.getLastSyncedCheckpoint().globalCheckpoint;
if (location.generation == current.getGeneration() || persistedGlobalCheckpoint < globalCheckpoint) {
ensureOpen();
return current.syncUpTo(location.translogLocation + location.size);
return current.syncUpTo(location.translogLocation + location.size, globalCheckpoint);
}
} catch (final Exception ex) {
closeOnTragicEvent(ex);
Expand All @@ -856,24 +858,6 @@ public boolean ensureSynced(Location location) throws IOException {
return false;
}

/**
* Ensures that all locations in the given stream have been synced / written to the underlying storage.
* This method allows for internal optimization to minimize the amount of fsync operations if multiple
* locations must be synced.
*
* @return Returns <code>true</code> iff this call caused an actual sync operation otherwise <code>false</code>
*/
public boolean ensureSynced(Stream<Location> locations) throws IOException {
final Optional<Location> max = locations.max(Location::compareTo);
// we only need to sync the max location since it will sync all other
// locations implicitly
if (max.isPresent()) {
return ensureSynced(max.get());
} else {
return false;
}
}

/**
* Closes the translog if the current translog writer experienced a tragic exception.
*
Expand Down Expand Up @@ -929,6 +913,8 @@ public TranslogDeletionPolicy getDeletionPolicy() {

public static class Location implements Comparable<Location> {

public static Location EMPTY = new Location(0, 0, 0);

public final long generation;
public final long translogLocation;
public final int size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ synchronized boolean assertNoSeqAbove(long belowTerm, long aboveSeqNo) {
* raising the exception.
*/
public void sync() throws IOException {
syncUpTo(Long.MAX_VALUE);
syncUpTo(Long.MAX_VALUE, SequenceNumbers.UNASSIGNED_SEQ_NO);
}

/**
Expand Down Expand Up @@ -455,10 +455,17 @@ private long getWrittenOffset() throws IOException {
*
* @return <code>true</code> if this call caused an actual sync operation
*/
final boolean syncUpTo(long offset) throws IOException {
if (lastSyncedCheckpoint.offset < offset && syncNeeded()) {
final boolean syncUpTo(long offset, long globalCheckpointToPersist) throws IOException {
if ((lastSyncedCheckpoint.offset < offset || lastSyncedCheckpoint.globalCheckpoint < globalCheckpointToPersist) && syncNeeded()) {
assert globalCheckpointToPersist <= globalCheckpointSupplier.getAsLong()
: "globalCheckpointToPersist ["
+ globalCheckpointToPersist
+ "] greater than global checkpoint ["
+ globalCheckpointSupplier.getAsLong()
+ "]";
synchronized (syncLock) { // only one sync/checkpoint should happen concurrently but we wait
if (lastSyncedCheckpoint.offset < offset && syncNeeded()) {
if ((lastSyncedCheckpoint.offset < offset || lastSyncedCheckpoint.globalCheckpoint < globalCheckpointToPersist)
&& syncNeeded()) {
// double checked locking - we don't want to fsync unless we have to and now that we have
// the lock we should check again since if this code is busy we might have fsynced enough already
final Checkpoint checkpointToSync;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@
import org.elasticsearch.transport.TransportService;

import java.util.Collections;
import java.util.function.Consumer;

import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -100,6 +105,11 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception {

when(indexShard.getLastKnownGlobalCheckpoint()).thenReturn(globalCheckpoint);
when(indexShard.getLastSyncedGlobalCheckpoint()).thenReturn(lastSyncedGlobalCheckpoint);
doAnswer(invocation -> {
Consumer<Exception> argument = invocation.getArgument(1);
argument.accept(null);
return null;
}).when(indexShard).syncGlobalCheckpoint(anyLong(), any());

final GlobalCheckpointSyncAction action = new GlobalCheckpointSyncAction(
Settings.EMPTY,
Expand All @@ -123,9 +133,10 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception {

if (durability == Translog.Durability.ASYNC || lastSyncedGlobalCheckpoint == globalCheckpoint) {
verify(indexShard, never()).sync();
verify(indexShard, never()).syncGlobalCheckpoint(anyLong(), any());
} else {
verify(indexShard).sync();
verify(indexShard, never()).sync();
verify(indexShard).syncGlobalCheckpoint(eq(globalCheckpoint), any());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1542,6 +1542,69 @@ public void run() {
closeShards(shard);
}

public void testAsyncPersistGlobalCheckpointSync() throws InterruptedException, IOException {
final ShardId shardId = new ShardId("index", "_na_", 0);
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
shardId,
randomAlphaOfLength(8),
true,
ShardRoutingState.INITIALIZING,
RecoverySource.EmptyStoreRecoverySource.INSTANCE
);
final Settings settings = indexSettings(Version.CURRENT, 1, 2).build();
final IndexMetadata.Builder indexMetadata = IndexMetadata.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1);
IndexShard shard = newShard(
shardRouting,
indexMetadata.build(),
null,
new InternalEngineFactory(),
() -> {},
RetentionLeaseSyncer.EMPTY
);
recoverShardFromStore(shard);

final int maxSeqNo = randomIntBetween(0, 128);
for (int i = 0; i <= maxSeqNo; i++) {
EngineTestCase.generateNewSeqNo(shard.getEngine());
}
final long checkpoint = rarely() ? maxSeqNo - scaledRandomIntBetween(0, maxSeqNo) : maxSeqNo;
shard.updateLocalCheckpointForShard(shardRouting.allocationId().getId(), checkpoint);
shard.updateGlobalCheckpointForShard(shard.routingEntry().allocationId().getId(), shard.getLocalCheckpoint());

Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);
Thread[] thread = new Thread[randomIntBetween(3, 5)];
CountDownLatch latch = new CountDownLatch(thread.length);
for (int i = 0; i < thread.length; i++) {
thread[i] = new Thread() {
@Override
public void run() {
try {
latch.countDown();
latch.await();
for (int i = 0; i < 10000; i++) {
semaphore.acquire();
shard.syncGlobalCheckpoint(
randomLongBetween(0, shard.getLastKnownGlobalCheckpoint()),
(ex) -> semaphore.release()
);
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
};
thread[i].start();
}

for (int i = 0; i < thread.length; i++) {
thread[i].join();
}
assertTrue(semaphore.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS));
assertEquals(shard.getLastKnownGlobalCheckpoint(), shard.getLastSyncedGlobalCheckpoint());

closeShards(shard);
}

public void testShardStats() throws IOException {

IndexShard shard = newStartedShard();
Expand Down

0 comments on commit fa531b5

Please sign in to comment.