Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine the execution of an exclusive replica operation with primary term update #36116

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
142 changes: 101 additions & 41 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand All @@ -63,6 +62,7 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AsyncIOProcessor;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -2316,9 +2316,24 @@ public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable>
indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
}

private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm, final CheckedRunnable<E> onBlocked) {
@FunctionalInterface
private interface PrimaryTermUpdateListener extends ActionListener<Releasable> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the logic still quite convoluted with this interface. I wonder if it's easier to optionally add the ActionListener with which to combine the term bump into the bumpPrimaryTerm method. Here's my try at this. Let me know what you prefer.

diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index ad06d8449a0..d8299cab5a5 100644
--- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -48,6 +48,7 @@ import org.elasticsearch.cluster.routing.RecoverySource;
 import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.common.Booleans;
+import org.elasticsearch.common.CheckedRunnable;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -548,7 +549,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                             } catch (final AlreadyClosedException e) {
                                 // okay, the index was deleted
                             }
-                        });
+                        }, null);
                 }
             }
             // set this last, once we finished updating all internal state.
@@ -2316,22 +2317,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
     }
 
-    @FunctionalInterface
-    private interface PrimaryTermUpdateListener extends ActionListener<Releasable> {
-
-        void onPrimaryTermUpdate() throws Exception;
-
-        @Override
-        default void onResponse(final Releasable releasable) {
-            Releasables.close(releasable);
-        }
-
-        @Override
-        default void onFailure(final Exception e) {
-        }
-    }
-
-    private void bumpPrimaryTerm(final long newPrimaryTerm, final PrimaryTermUpdateListener listener) {
+    private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm, final CheckedRunnable<E> onBlocked,
+                                                       @Nullable ActionListener<Releasable> combineWithAction) {
         assert Thread.holdsLock(mutex);
         assert newPrimaryTerm >= pendingPrimaryTerm;
         assert operationPrimaryTerm <= pendingPrimaryTerm;
@@ -2339,19 +2326,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         indexShardOperationPermits.asyncBlockOperations(new ActionListener<Releasable>() {
             @Override
             public void onFailure(final Exception e) {
+                try {
+                    innerFail(e);
+                } finally {
+                    if (combineWithAction != null) {
+                        combineWithAction.onFailure(e);
+                    }
+                }
+            }
+
+            private void innerFail(final Exception e) {
                 try {
                     failShard("exception during primary term transition", e);
                 } catch (AlreadyClosedException ace) {
                     // ignore, shard is already closed
-                } finally {
-                    listener.onFailure(e);
                 }
             }
 
             @Override
             public void onResponse(final Releasable releasable) {
                 final RunOnce releaseOnce = new RunOnce(releasable::close);
-                boolean success = false;
                 try {
                     assert operationPrimaryTerm <= pendingPrimaryTerm;
                     termUpdated.await();
@@ -2359,14 +2353,18 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                     // in the order submitted. We need to guard against another term bump
                     if (operationPrimaryTerm < newPrimaryTerm) {
                         operationPrimaryTerm = newPrimaryTerm;
-                        listener.onPrimaryTermUpdate();
+                        onBlocked.run();
                     }
-                    listener.onResponse(releaseOnce::run);
-                    success = true;
                 } catch (final Exception e) {
-                    onFailure(e);
+                    if (combineWithAction == null) {
+                        // otherwise leave it to combineWithAction to release the permit
+                        releaseOnce.run();
+                    }
+                    innerFail(e);
                 } finally {
-                    if (success == false) {
+                    if (combineWithAction != null) {
+                        combineWithAction.onResponse(releasable);
+                    } else {
                         releaseOnce.run();
                     }
                 }
@@ -2480,37 +2478,18 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                         throw new IndexShardNotStartedException(shardId, shardState);
                     }
 
-                    bumpPrimaryTerm(opPrimaryTerm, new PrimaryTermUpdateListener() {
-                        @Override
-                        public void onPrimaryTermUpdate() throws Exception {
-                            updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
-                            final long currentGlobalCheckpoint = getGlobalCheckpoint();
-                            final long maxSeqNo = seqNoStats().getMaxSeqNo();
-                            logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
-                                opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
-                            if (currentGlobalCheckpoint < maxSeqNo) {
-                                resetEngineToGlobalCheckpoint();
-                            } else {
-                                getEngine().rollTranslogGeneration();
-                            }
+                    bumpPrimaryTerm(opPrimaryTerm, () -> {
+                        updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
+                        final long currentGlobalCheckpoint = getGlobalCheckpoint();
+                        final long maxSeqNo = seqNoStats().getMaxSeqNo();
+                        logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
+                            opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
+                        if (currentGlobalCheckpoint < maxSeqNo) {
+                            resetEngineToGlobalCheckpoint();
+                        } else {
+                            getEngine().rollTranslogGeneration();
                         }
-
-                        @Override
-                        public void onResponse(final Releasable releasable) {
-                            if (allowCombineOperationWithPrimaryTermUpdate) {
-                                operationListener.onResponse(releasable);
-                            } else {
-                                Releasables.close(releasable);
-                            }
-                        }
-
-                        @Override
-                        public void onFailure(Exception e) {
-                            if (allowCombineOperationWithPrimaryTermUpdate) {
-                                operationListener.onFailure(e);
-                            }
-                        }
-                    });
+                    }, allowCombineOperationWithPrimaryTermUpdate ? operationListener : null);
 
                     if (allowCombineOperationWithPrimaryTermUpdate) {
                         logger.debug("operation execution has been combined with primary term update");

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is indeed much clearer, thanks. I integrated it as it is.


void onPrimaryTermUpdate() throws Exception;

@Override
default void onResponse(final Releasable releasable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do this have a default implementation? It's never used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used when IndexShard.updateShardState() calls bumpPrimaryTerm() to update the term.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I missed that there was a second caller of bumpPrimaryTerm that only implements onPrimaryTermUpdate

Releasables.close(releasable);
}

@Override
default void onFailure(final Exception e) {
}
}

private void bumpPrimaryTerm(final long newPrimaryTerm, final PrimaryTermUpdateListener listener) {
assert Thread.holdsLock(mutex);
assert newPrimaryTerm > pendingPrimaryTerm;
assert newPrimaryTerm >= pendingPrimaryTerm;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe strengthen this assertion along the ways of
assert newPrimaryTerm > pendingPrimaryTerm || (newPrimaryTerm >= pendingPrimaryTerm && combineWithAction != null)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's also more explicit

assert operationPrimaryTerm <= pendingPrimaryTerm;
final CountDownLatch termUpdated = new CountDownLatch(1);
indexShardOperationPermits.asyncBlockOperations(new ActionListener<Releasable>() {
Expand All @@ -2328,22 +2343,32 @@ public void onFailure(final Exception e) {
failShard("exception during primary term transition", e);
} catch (AlreadyClosedException ace) {
// ignore, shard is already closed
} finally {
listener.onFailure(e);
}
}

@Override
public void onResponse(final Releasable releasable) {
try (Releasable ignored = releasable) {
final RunOnce releaseOnce = new RunOnce(releasable::close);
boolean success = false;
try {
assert operationPrimaryTerm <= pendingPrimaryTerm;
termUpdated.await();
// indexShardOperationPermits doesn't guarantee that async submissions are executed
// in the order submitted. We need to guard against another term bump
if (operationPrimaryTerm < newPrimaryTerm) {
operationPrimaryTerm = newPrimaryTerm;
onBlocked.run();
listener.onPrimaryTermUpdate();
}
listener.onResponse(releaseOnce::run);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think a failure in this method should also fail the shard. Even though we are combining two things here, the exception handling should be distinct for both.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

success = true;
} catch (final Exception e) {
onFailure(e);
} finally {
if (success == false) {
releaseOnce.run();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any particular reason to run release after the onFailure? With the try-with-resources, the release was done before running the catch block. I would prefer to keep the semantics like that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was mostly a safeguard in case the listener throws an exception and didn't close the release itself.

I would prefer to keep the semantics like that.

+1

}
}
}
}, 30, TimeUnit.MINUTES);
Expand Down Expand Up @@ -2371,7 +2396,7 @@ public void onResponse(final Releasable releasable) {
public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay,
final Object debugInfo) {
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired,
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, false,
(listener) -> indexShardOperationPermits.acquire(listener, executorOnDelay, true, debugInfo));
}

Expand All @@ -2393,49 +2418,24 @@ public void acquireAllReplicaOperationsPermits(final long opPrimaryTerm,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<Releasable> onPermitAcquired,
final TimeValue timeout) {
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired,
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, true,
(listener) -> indexShardOperationPermits.asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit()));
}

private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<Releasable> onPermitAcquired,
final Consumer<ActionListener<Releasable>> consumer) {
final boolean allowCombineOperationWithPrimaryTermUpdate,
final Consumer<ActionListener<Releasable>> operationExecutor) {
verifyNotClosed();
if (opPrimaryTerm > pendingPrimaryTerm) {
synchronized (mutex) {
if (opPrimaryTerm > pendingPrimaryTerm) {
final IndexShardState shardState = state();
// only roll translog and update primary term if shard has made it past recovery
// Having a new primary term here means that the old primary failed and that there is a new primary, which again
// means that the master will fail this shard as all initializing shards are failed when a primary is selected
// We abort early here to prevent an ongoing recovery from the failed primary to mess with the global / local checkpoint
if (shardState != IndexShardState.POST_RECOVERY &&
shardState != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, shardState);
}

if (opPrimaryTerm > pendingPrimaryTerm) {
bumpPrimaryTerm(opPrimaryTerm, () -> {
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
final long currentGlobalCheckpoint = getGlobalCheckpoint();
final long maxSeqNo = seqNoStats().getMaxSeqNo();
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
if (currentGlobalCheckpoint < maxSeqNo) {
resetEngineToGlobalCheckpoint();
} else {
getEngine().rollTranslogGeneration();
}
});
}
}
}
}
assert opPrimaryTerm <= pendingPrimaryTerm
: "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]";
consumer.accept(new ActionListener<Releasable>() {
// This listener is used for the execution of the operation. If the operation requires all the permits for its
// execution and the primary term must be updated first, we can combine the operation execution with the
// primary term update. Since indexShardOperationPermits doesn't guarantee that async submissions are executed
// in the order submitted, combining both operations ensure that the term is updated before the operation is
// executed. It also has the side effect of acquiring all the permits one time instead of two.
final ActionListener<Releasable> operationListener = new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
if (opPrimaryTerm < operationPrimaryTerm) {
Expand Down Expand Up @@ -2465,7 +2465,67 @@ public void onResponse(final Releasable releasable) {
public void onFailure(final Exception e) {
onPermitAcquired.onFailure(e);
}
});
};

if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) {
synchronized (mutex) {
if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) {
final IndexShardState shardState = state();
// only roll translog and update primary term if shard has made it past recovery
// Having a new primary term here means that the old primary failed and that there is a new primary, which again
// means that the master will fail this shard as all initializing shards are failed when a primary is selected
// We abort early here to prevent an ongoing recovery from the failed primary to mess with the global / local checkpoint
if (shardState != IndexShardState.POST_RECOVERY &&
shardState != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, shardState);
}

bumpPrimaryTerm(opPrimaryTerm, new PrimaryTermUpdateListener() {
@Override
public void onPrimaryTermUpdate() throws Exception {
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
final long currentGlobalCheckpoint = getGlobalCheckpoint();
final long maxSeqNo = seqNoStats().getMaxSeqNo();
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
if (currentGlobalCheckpoint < maxSeqNo) {
resetEngineToGlobalCheckpoint();
} else {
getEngine().rollTranslogGeneration();
}
}

@Override
public void onResponse(final Releasable releasable) {
if (allowCombineOperationWithPrimaryTermUpdate) {
operationListener.onResponse(releasable);
} else {
Releasables.close(releasable);
}
}

@Override
public void onFailure(Exception e) {
if (allowCombineOperationWithPrimaryTermUpdate) {
operationListener.onFailure(e);
}
}
});

if (allowCombineOperationWithPrimaryTermUpdate) {
logger.debug("operation execution has been combined with primary term update");
return;
}
}
}
}
assert opPrimaryTerm <= pendingPrimaryTerm
: "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]";
operationExecutor.accept(operationListener);
}

private boolean requirePrimaryTermUpdate(final long opPrimaryTerm, final boolean allPermits) {
return (opPrimaryTerm > pendingPrimaryTerm) || (allPermits && opPrimaryTerm > operationPrimaryTerm);
}

public int getActiveOperationsCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,6 @@ private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard
return fut.get();
}

@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850")
public void testOperationPermitOnReplicaShards() throws Exception {
final ShardId shardId = new ShardId("test", "_na_", 0);
final IndexShard indexShard;
Expand Down Expand Up @@ -1023,7 +1022,6 @@ public void testGlobalCheckpointSync() throws IOException {
closeShards(replicaShard, primaryShard);
}

@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850")
public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
Expand Down Expand Up @@ -1088,7 +1086,6 @@ public void onFailure(Exception e) {
closeShard(indexShard, false);
}

@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850")
public void testRollbackReplicaEngineOnPromotion() throws IOException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);

Expand Down Expand Up @@ -3599,6 +3596,67 @@ public void testResetEngine() throws Exception {
closeShard(shard, false);
}

public void testConcurrentAcquireAllReplicaOperationsPermitsWithPrimaryTermUpdate() throws Exception {
final IndexShard replica = newStartedShard(false);
indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint()));

final int nbTermUpdates = randomIntBetween(1, 5);

for (int i = 0; i < nbTermUpdates; i++) {
long opPrimaryTerm = replica.getOperationPrimaryTerm() + 1;
final long globalCheckpoint = replica.getGlobalCheckpoint();
final long maxSeqNoOfUpdatesOrDeletes = replica.getMaxSeqNoOfUpdatesOrDeletes();

final int operations = scaledRandomIntBetween(10, 64);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need that much concurrency for this to trigger?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes the test most likely to fail, at least on my workstation. But since Jenkins is usually more busy than my box I'll decrease the number of ops here.

final CyclicBarrier barrier = new CyclicBarrier(1 + operations);
final CountDownLatch latch = new CountDownLatch(operations);

final Thread[] threads = new Thread[operations];
for (int j = 0; j < operations; j++) {
threads[j] = new Thread(() -> {
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
replica.acquireAllReplicaOperationsPermits(
opPrimaryTerm,
globalCheckpoint,
maxSeqNoOfUpdatesOrDeletes,
new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
try (Releasable ignored = releasable) {
assertThat(replica.getPendingPrimaryTerm(), greaterThanOrEqualTo(opPrimaryTerm));
assertThat(replica.getOperationPrimaryTerm(), equalTo(opPrimaryTerm));
} finally {
latch.countDown();
}
}

@Override
public void onFailure(final Exception e) {
try {
throw new RuntimeException(e);
} finally {
latch.countDown();
}
}
}, TimeValue.timeValueMinutes(30L));
});
threads[j].start();
}
barrier.await();
latch.await();

for (Thread thread : threads) {
thread.join();
}
}

closeShard(replica, false);
}

@Override
public Settings threadPoolSettings() {
return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.estimated_time_interval", "5ms").build();
Expand Down