Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine the execution of an exclusive replica operation with primary term update #36116

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
122 changes: 81 additions & 41 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AsyncIOProcessor;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -548,7 +549,7 @@ public void onFailure(Exception e) {
} catch (final AlreadyClosedException e) {
// okay, the index was deleted
}
});
}, null);
}
}
// set this last, once we finished updating all internal state.
Expand Down Expand Up @@ -2316,14 +2317,26 @@ public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable>
indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
}

private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm, final CheckedRunnable<E> onBlocked) {
private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm,
final CheckedRunnable<E> onBlocked,
@Nullable ActionListener<Releasable> combineWithAction) {
assert Thread.holdsLock(mutex);
assert newPrimaryTerm > pendingPrimaryTerm;
assert newPrimaryTerm > pendingPrimaryTerm || (newPrimaryTerm >= pendingPrimaryTerm && combineWithAction != null);
assert operationPrimaryTerm <= pendingPrimaryTerm;
final CountDownLatch termUpdated = new CountDownLatch(1);
indexShardOperationPermits.asyncBlockOperations(new ActionListener<Releasable>() {
@Override
public void onFailure(final Exception e) {
try {
innerFail(e);
} finally {
if (combineWithAction != null) {
combineWithAction.onFailure(e);
}
}
}

private void innerFail(final Exception e) {
try {
failShard("exception during primary term transition", e);
} catch (AlreadyClosedException ace) {
Expand All @@ -2333,7 +2346,8 @@ public void onFailure(final Exception e) {

@Override
public void onResponse(final Releasable releasable) {
try (Releasable ignored = releasable) {
final RunOnce releaseOnce = new RunOnce(releasable::close);
try {
assert operationPrimaryTerm <= pendingPrimaryTerm;
termUpdated.await();
// indexShardOperationPermits doesn't guarantee that async submissions are executed
Expand All @@ -2343,7 +2357,17 @@ public void onResponse(final Releasable releasable) {
onBlocked.run();
}
} catch (final Exception e) {
onFailure(e);
if (combineWithAction == null) {
// otherwise leave it to combineWithAction to release the permit
releaseOnce.run();
}
innerFail(e);
} finally {
if (combineWithAction != null) {
combineWithAction.onResponse(releasable);
} else {
releaseOnce.run();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any particular reason to run release after the onFailure? With the try-with-resources, the release was done before running the catch block. I would prefer to keep the semantics like that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was mostly a safeguard in case the listener throws an exception and didn't close the release itself.

I would prefer to keep the semantics like that.

+1

}
}
}
}, 30, TimeUnit.MINUTES);
Expand Down Expand Up @@ -2371,7 +2395,7 @@ public void onResponse(final Releasable releasable) {
public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay,
final Object debugInfo) {
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired,
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, false,
(listener) -> indexShardOperationPermits.acquire(listener, executorOnDelay, true, debugInfo));
}

Expand All @@ -2393,49 +2417,24 @@ public void acquireAllReplicaOperationsPermits(final long opPrimaryTerm,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<Releasable> onPermitAcquired,
final TimeValue timeout) {
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired,
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, true,
(listener) -> indexShardOperationPermits.asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit()));
}

private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<Releasable> onPermitAcquired,
final Consumer<ActionListener<Releasable>> consumer) {
final boolean allowCombineOperationWithPrimaryTermUpdate,
final Consumer<ActionListener<Releasable>> operationExecutor) {
verifyNotClosed();
if (opPrimaryTerm > pendingPrimaryTerm) {
synchronized (mutex) {
if (opPrimaryTerm > pendingPrimaryTerm) {
final IndexShardState shardState = state();
// only roll translog and update primary term if shard has made it past recovery
// Having a new primary term here means that the old primary failed and that there is a new primary, which again
// means that the master will fail this shard as all initializing shards are failed when a primary is selected
// We abort early here to prevent an ongoing recovery from the failed primary to mess with the global / local checkpoint
if (shardState != IndexShardState.POST_RECOVERY &&
shardState != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, shardState);
}

if (opPrimaryTerm > pendingPrimaryTerm) {
bumpPrimaryTerm(opPrimaryTerm, () -> {
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
final long currentGlobalCheckpoint = getGlobalCheckpoint();
final long maxSeqNo = seqNoStats().getMaxSeqNo();
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
if (currentGlobalCheckpoint < maxSeqNo) {
resetEngineToGlobalCheckpoint();
} else {
getEngine().rollTranslogGeneration();
}
});
}
}
}
}
assert opPrimaryTerm <= pendingPrimaryTerm
: "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]";
consumer.accept(new ActionListener<Releasable>() {
// This listener is used for the execution of the operation. If the operation requires all the permits for its
// execution and the primary term must be updated first, we can combine the operation execution with the
// primary term update. Since indexShardOperationPermits doesn't guarantee that async submissions are executed
// in the order submitted, combining both operations ensure that the term is updated before the operation is
// executed. It also has the side effect of acquiring all the permits one time instead of two.
final ActionListener<Releasable> operationListener = new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
if (opPrimaryTerm < operationPrimaryTerm) {
Expand Down Expand Up @@ -2465,7 +2464,48 @@ public void onResponse(final Releasable releasable) {
public void onFailure(final Exception e) {
onPermitAcquired.onFailure(e);
}
});
};

if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) {
synchronized (mutex) {
if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) {
final IndexShardState shardState = state();
// only roll translog and update primary term if shard has made it past recovery
// Having a new primary term here means that the old primary failed and that there is a new primary, which again
// means that the master will fail this shard as all initializing shards are failed when a primary is selected
// We abort early here to prevent an ongoing recovery from the failed primary to mess with the global / local checkpoint
if (shardState != IndexShardState.POST_RECOVERY &&
shardState != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, shardState);
}

bumpPrimaryTerm(opPrimaryTerm, () -> {
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
final long currentGlobalCheckpoint = getGlobalCheckpoint();
final long maxSeqNo = seqNoStats().getMaxSeqNo();
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
if (currentGlobalCheckpoint < maxSeqNo) {
resetEngineToGlobalCheckpoint();
} else {
getEngine().rollTranslogGeneration();
}
}, allowCombineOperationWithPrimaryTermUpdate ? operationListener : null);

if (allowCombineOperationWithPrimaryTermUpdate) {
logger.debug("operation execution has been combined with primary term update");
return;
}
}
}
}
assert opPrimaryTerm <= pendingPrimaryTerm
: "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]";
operationExecutor.accept(operationListener);
}

private boolean requirePrimaryTermUpdate(final long opPrimaryTerm, final boolean allPermits) {
return (opPrimaryTerm > pendingPrimaryTerm) || (allPermits && opPrimaryTerm > operationPrimaryTerm);
}

public int getActiveOperationsCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,6 @@ private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard
return fut.get();
}

@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850")
public void testOperationPermitOnReplicaShards() throws Exception {
final ShardId shardId = new ShardId("test", "_na_", 0);
final IndexShard indexShard;
Expand Down Expand Up @@ -1023,7 +1022,6 @@ public void testGlobalCheckpointSync() throws IOException {
closeShards(replicaShard, primaryShard);
}

@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850")
public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
Expand Down Expand Up @@ -1088,7 +1086,6 @@ public void onFailure(Exception e) {
closeShard(indexShard, false);
}

@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850")
public void testRollbackReplicaEngineOnPromotion() throws IOException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);

Expand Down Expand Up @@ -3599,6 +3596,67 @@ public void testResetEngine() throws Exception {
closeShard(shard, false);
}

public void testConcurrentAcquireAllReplicaOperationsPermitsWithPrimaryTermUpdate() throws Exception {
final IndexShard replica = newStartedShard(false);
indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint()));

final int nbTermUpdates = randomIntBetween(1, 5);

for (int i = 0; i < nbTermUpdates; i++) {
long opPrimaryTerm = replica.getOperationPrimaryTerm() + 1;
final long globalCheckpoint = replica.getGlobalCheckpoint();
final long maxSeqNoOfUpdatesOrDeletes = replica.getMaxSeqNoOfUpdatesOrDeletes();

final int operations = scaledRandomIntBetween(5, 32);
final CyclicBarrier barrier = new CyclicBarrier(1 + operations);
final CountDownLatch latch = new CountDownLatch(operations);

final Thread[] threads = new Thread[operations];
for (int j = 0; j < operations; j++) {
threads[j] = new Thread(() -> {
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
replica.acquireAllReplicaOperationsPermits(
opPrimaryTerm,
globalCheckpoint,
maxSeqNoOfUpdatesOrDeletes,
new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
try (Releasable ignored = releasable) {
assertThat(replica.getPendingPrimaryTerm(), greaterThanOrEqualTo(opPrimaryTerm));
assertThat(replica.getOperationPrimaryTerm(), equalTo(opPrimaryTerm));
} finally {
latch.countDown();
}
}

@Override
public void onFailure(final Exception e) {
try {
throw new RuntimeException(e);
} finally {
latch.countDown();
}
}
}, TimeValue.timeValueMinutes(30L));
});
threads[j].start();
}
barrier.await();
latch.await();

for (Thread thread : threads) {
thread.join();
}
}

closeShard(replica, false);
}

@Override
public Settings threadPoolSettings() {
return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.estimated_time_interval", "5ms").build();
Expand Down