Skip to content

Commit

Permalink
Change
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim-Brooks committed May 26, 2023
1 parent 4de3058 commit 22c7804
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void testDurableFlagHasEffect() {
// the lastWriteLocaltion has a Integer.MAX_VALUE size so we have to create a new one
return tlog.ensureSynced(
new Translog.Location(lastWriteLocation.generation, lastWriteLocation.translogLocation, 0),
SequenceNumbers.NO_OPS_PERFORMED
SequenceNumbers.UNASSIGNED_SEQ_NO
);
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ private AsyncIOProcessor<Tuple<Long, Translog.Location>> createTranslogSyncProce
protected void write(List<Tuple<Tuple<Long, Translog.Location>, Consumer<Exception>>> candidates) throws IOException {
try {
Translog.Location location = Translog.Location.EMPTY;
long processGlobalCheckpoint = SequenceNumbers.NO_OPS_PERFORMED;
long processGlobalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
for (Tuple<Tuple<Long, Translog.Location>, Consumer<Exception>> syncMarkers : candidates) {
Tuple<Long, Translog.Location> marker = syncMarkers.v1();
processGlobalCheckpoint = SequenceNumbers.max(processGlobalCheckpoint, marker.v1());
Expand All @@ -647,7 +647,7 @@ protected void write(List<Tuple<Tuple<Long, Translog.Location>, Consumer<Excepti

@Override
public void asyncEnsureTranslogSynced(Translog.Location location, Consumer<Exception> listener) {
translogSyncProcessor.put(new Tuple<>(SequenceNumbers.NO_OPS_PERFORMED, location), listener);
translogSyncProcessor.put(new Tuple<>(SequenceNumbers.UNASSIGNED_SEQ_NO, location), listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public GlobalCheckpointSyncAction(
actionFilters,
Request::new,
Request::new,
ThreadPool.Names.WRITE
ThreadPool.Names.WRITE,
false,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3617,7 +3617,7 @@ public final void sync(Translog.Location location, Consumer<Exception> syncListe

/**
* This method provides the same behavior as #sync but for persisting the global checkpoint. It will initiate a sync
* if the request global checkpoint is greater than the currently persisted global checkpoint. However, sam as #sync it
* if the request global checkpoint is greater than the currently persisted global checkpoint. However, same as #sync it
* will not ensure that the request global checkpoint is available to be synced. It is the caller's duty to only call this
* method with a valid processed global checkpoint that is available to sync.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
Expand All @@ -61,7 +60,6 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;

import static org.elasticsearch.core.Strings.format;
Expand Down Expand Up @@ -860,27 +858,6 @@ public boolean ensureSynced(Location location, long globalCheckpoint) throws IOE
return false;
}

/**
* Ensures that all locations and processed global checkpoints in the given stream have been synced / written to the underlying storage.
* This method allows for internal optimization to minimize the amount of fsync operations if multiple locations must be synced.
*
* @return Returns <code>true</code> iff this call caused an actual sync operation otherwise <code>false</code>
*/
public boolean ensureSynced(Stream<Location> locations, LongStream globalCheckpoints) throws IOException {
final Optional<Location> maxLocation = locations.max(Location::compareTo);
final OptionalLong maxGlobalCheckpoint = globalCheckpoints.max();
// we only need to sync the max location since it will sync all other
// locations implicitly
if (maxGlobalCheckpoint.isPresent() || maxLocation.isPresent()) {
return ensureSynced(
maxLocation.orElse(new Translog.Location(0, 0, 0)),
maxGlobalCheckpoint.orElse(SequenceNumbers.NO_OPS_PERFORMED)
);
} else {
return false;
}
}

/**
* Closes the translog if the current translog writer experienced a tragic exception.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ synchronized boolean assertNoSeqAbove(long belowTerm, long aboveSeqNo) {
* raising the exception.
*/
public void sync() throws IOException {
syncUpTo(Long.MAX_VALUE, Long.MAX_VALUE);
syncUpTo(Long.MAX_VALUE, SequenceNumbers.UNASSIGNED_SEQ_NO);
}

/**
Expand Down Expand Up @@ -464,6 +464,7 @@ private long getWrittenOffset() throws IOException {
*/
final boolean syncUpTo(long offset, long globalCheckpointToPersist) throws IOException {
if ((lastSyncedCheckpoint.offset < offset || lastSyncedCheckpoint.globalCheckpoint < globalCheckpointToPersist) && syncNeeded()) {
assert globalCheckpointToPersist <= globalCheckpointSupplier.getAsLong();
synchronized (syncLock) { // only one sync/checkpoint should happen concurrently but we wait
if ((lastSyncedCheckpoint.offset < offset || lastSyncedCheckpoint.globalCheckpoint < globalCheckpointToPersist)
&& syncNeeded()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,10 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception {
}

if (durability == Translog.Durability.ASYNC || lastSyncedGlobalCheckpoint == globalCheckpoint) {
verify(indexShard, never()).sync();
verify(indexShard, never()).syncGlobalCheckpoint(anyLong(), any());
} else {
verify(indexShard, never()).sync();
verify(indexShard).syncGlobalCheckpoint(eq(globalCheckpoint), any());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1535,7 +1535,34 @@ public void run() {
}

public void testAsyncPersistGlobalCheckpointSync() throws InterruptedException, IOException {
IndexShard shard = newStartedShard();
final ShardId shardId = new ShardId("index", "_na_", 0);
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
shardId,
randomAlphaOfLength(8),
true,
ShardRoutingState.INITIALIZING,
RecoverySource.EmptyStoreRecoverySource.INSTANCE
);
final Settings settings = indexSettings(Version.CURRENT, 1, 2).build();
final IndexMetadata.Builder indexMetadata = IndexMetadata.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1);
IndexShard shard = newShard(
shardRouting,
indexMetadata.build(),
null,
new InternalEngineFactory(),
ignoredShardId -> {},
RetentionLeaseSyncer.EMPTY
);
recoverShardFromStore(shard);

final int maxSeqNo = randomIntBetween(0, 128);
for (int i = 0; i <= maxSeqNo; i++) {
EngineTestCase.generateNewSeqNo(shard.getEngine());
}
final long checkpoint = rarely() ? maxSeqNo - scaledRandomIntBetween(0, maxSeqNo) : maxSeqNo;
shard.updateLocalCheckpointForShard(shardRouting.allocationId().getId(), checkpoint);
shard.updateGlobalCheckpointForShard(shard.routingEntry().allocationId().getId(), shard.getLocalCheckpoint());

Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);
Thread[] thread = new Thread[randomIntBetween(3, 5)];
CountDownLatch latch = new CountDownLatch(thread.length);
Expand All @@ -1548,7 +1575,10 @@ public void run() {
latch.await();
for (int i = 0; i < 10000; i++) {
semaphore.acquire();
shard.syncGlobalCheckpoint(randomLongBetween(1, 10000), (ex) -> semaphore.release());
shard.syncGlobalCheckpoint(
randomLongBetween(0, shard.getLastKnownGlobalCheckpoint()),
(ex) -> semaphore.release()
);
}
} catch (Exception ex) {
throw new RuntimeException(ex);
Expand All @@ -1562,6 +1592,7 @@ public void run() {
thread[i].join();
}
assertTrue(semaphore.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS));
assertEquals(shard.getLastKnownGlobalCheckpoint(), shard.getLastSyncedGlobalCheckpoint());

closeShards(shard);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1169,29 +1169,41 @@ public void testSyncUpToLocationAndCheckpoint() throws IOException {
);

int iters = randomIntBetween(25, 50);
Location alreadySynced = Location.EMPTY;
long alreadySyncedCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
for (int i = 0; i < iters; i++) {
int translogOperations = randomIntBetween(10, 100);
int count = 0;

Location location = null;
final ArrayList<Location> locations = new ArrayList<>();
final ArrayList<Location> locationsInCurrentGeneration = new ArrayList<>();
for (int op = 0; op < translogOperations; op++) {
if (rarely()) {
translog.rollGeneration();
locationsInCurrentGeneration.clear();
}
location = translog.add(indexOp("" + op, op, primaryTerm.get(), Integer.toString(++count)));
globalCheckpoint.incrementAndGet();
locations.add(location);
locationsInCurrentGeneration.add(location);
}

assertFalse("should have been synced on previous iteration", translog.ensureSynced(alreadySynced, alreadySyncedCheckpoint));

if (randomBoolean()) {
assertTrue("at least one operation pending", translog.syncNeeded());
if (randomBoolean()) {
assertTrue("this operation has not been synced", translog.ensureSynced(location, SequenceNumbers.NO_OPS_PERFORMED));
Location randomLocationToSync = locationsInCurrentGeneration.get(randomInt(locationsInCurrentGeneration.size() - 1));
assertTrue(
"this operation has not been synced",
translog.ensureSynced(randomLocationToSync, SequenceNumbers.NO_OPS_PERFORMED)
);
} else {
long globalCheckpointToSync = randomLongBetween(translog.getLastSyncedGlobalCheckpoint() + 1, globalCheckpoint.get());
assertTrue(
"this global checkpoint has not been persisted",
translog.ensureSynced(Location.EMPTY, globalCheckpoint.get())
translog.ensureSynced(Location.EMPTY, globalCheckpointToSync)
);
}
// everything should be synced
Expand All @@ -1213,6 +1225,9 @@ public void testSyncUpToLocationAndCheckpoint() throws IOException {
for (Location l : locations) {
assertFalse("all of the locations should be synced: " + l, translog.ensureSynced(l, SequenceNumbers.NO_OPS_PERFORMED));
}

alreadySynced = location;
alreadySyncedCheckpoint = globalCheckpoint.get();
}
}

Expand Down Expand Up @@ -3878,7 +3893,11 @@ public void testSyncConcurrently() throws Exception {
long globalCheckpoint = lastGlobalCheckpoint.get();
final boolean synced;
if (randomBoolean()) {
synced = translog.ensureSynced(location, globalCheckpoint);
if (randomBoolean()) {
synced = translog.ensureSynced(location, globalCheckpoint);
} else {
synced = translog.ensureSynced(location, SequenceNumbers.UNASSIGNED_SEQ_NO);
}
} else {
translog.sync();
synced = true;
Expand Down

0 comments on commit 22c7804

Please sign in to comment.