Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine the execution of an exclusive replica operation with primary term update #36116

Conversation

tlrx
Copy link
Member

@tlrx tlrx commented Nov 30, 2018

This pull request changes how an operation which requires all index shard operations permits is executed when a primary term update is required: the operation and the update are combined so that the operation is executed after the primary term update under the same blocking operation.

This change is a fix for #35850 after a suggestion from @ywelsch. It introduces a functional interface PrimaryTermUpdateListener that is also an ActionListener and that can be used to combine both operations (the primary term update and the operation logic) under the same listener.

Closes #35850

@tlrx tlrx added >bug v7.0.0 :Distributed/Distributed A catch all label for anything in the Distributed Area. If you aren't sure, use this one. v6.6.0 labels Nov 30, 2018
@tlrx tlrx requested a review from ywelsch November 30, 2018 14:23
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there are more races here (needs a better test I think). Assume IndexShard is currently on term 1 and you have two consecutive invocations of acquireAllReplicaOperationsPermits with term 2. The first invocation will bump the pendingPrimaryTerm to 2 and combine the execution of the all permit acquisition with the term bumping. The second invocation however will not do so, because pendingPrimaryTerm has been set to 2 at that point. The second invocation of asyncBlockOperations will still race against the asyncBlockOperations of the term bumping though. I think we need to combine these in slightly different ways and also need more tests that concurrently invoke things here.

void onPrimaryTermUpdate() throws Exception;

@Override
default void onResponse(final Releasable releasable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do this have a default implementation? It's never used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used when IndexShard.updateShardState() calls bumpPrimaryTerm() to update the term.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I missed that there was a second caller of bumpPrimaryTerm that only implements onPrimaryTermUpdate

@tlrx
Copy link
Member Author

tlrx commented Dec 3, 2018

Thanks for your feedback @ywelsch, always helpful.

At this stage I think we could simply always combine the primary term bump with the all permits operation execution when the operation's primary term is greater than the shard's pending primary term OR if the operation's primary term is greater than the shard's current primary term and this is a "all permits" operation. Then we rely on bumpPrimaryTerm() which already guards against concurrent primary term updates. What do you think?

@tlrx
Copy link
Member Author

tlrx commented Dec 3, 2018

I pushed 1f875b0 to illustrate the previous comment.

@ywelsch
Copy link
Contributor

ywelsch commented Dec 3, 2018

yeah, I had the same kind of thing in mind. Can you add more tests that concurrently bump terms (similar to the situation described above) and that would have possibly failed without your last patch?

@tlrx
Copy link
Member Author

tlrx commented Dec 3, 2018

Can you add more tests that concurrently bump terms (similar to the situation described above) and that would have possibly failed without your last patch?

I updated in 587e074 the test added in 1f875b0 to make it more likely to fail. It tests the situation you described, multiple concurrent invocations of acquireAllReplicaOperationsPermits that require a term update. The tests fails 4 times out of 10 when my last change in IndexShard is not applied. I think this test is enough but let me know if you still want more tests.

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left a few more comments and suggestions. Thanks for the extra test.

void onPrimaryTermUpdate() throws Exception;

@Override
default void onResponse(final Releasable releasable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I missed that there was a second caller of bumpPrimaryTerm that only implements onPrimaryTermUpdate

} catch (final Exception e) {
onFailure(e);
} finally {
if (success == false) {
releaseOnce.run();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any particular reason to run release after the onFailure? With the try-with-resources, the release was done before running the catch block. I would prefer to keep the semantics like that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was mostly a safeguard in case the listener throws an exception and didn't close the release itself.

I would prefer to keep the semantics like that.

+1

}
listener.onResponse(releaseOnce::run);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think a failure in this method should also fail the shard. Even though we are combining two things here, the exception handling should be distinct for both.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

final long globalCheckpoint = replica.getGlobalCheckpoint();
final long maxSeqNoOfUpdatesOrDeletes = replica.getMaxSeqNoOfUpdatesOrDeletes();

final int operations = scaledRandomIntBetween(10, 64);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need that much concurrency for this to trigger?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes the test most likely to fail, at least on my workstation. But since Jenkins is usually more busy than my box I'll decrease the number of ops here.

@@ -2316,9 +2316,24 @@ public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable>
indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
}

private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm, final CheckedRunnable<E> onBlocked) {
@FunctionalInterface
private interface PrimaryTermUpdateListener extends ActionListener<Releasable> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the logic still quite convoluted with this interface. I wonder if it's easier to optionally add the ActionListener with which to combine the term bump into the bumpPrimaryTerm method. Here's my try at this. Let me know what you prefer.

diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index ad06d8449a0..d8299cab5a5 100644
--- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -48,6 +48,7 @@ import org.elasticsearch.cluster.routing.RecoverySource;
 import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.common.Booleans;
+import org.elasticsearch.common.CheckedRunnable;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -548,7 +549,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                             } catch (final AlreadyClosedException e) {
                                 // okay, the index was deleted
                             }
-                        });
+                        }, null);
                 }
             }
             // set this last, once we finished updating all internal state.
@@ -2316,22 +2317,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
     }
 
-    @FunctionalInterface
-    private interface PrimaryTermUpdateListener extends ActionListener<Releasable> {
-
-        void onPrimaryTermUpdate() throws Exception;
-
-        @Override
-        default void onResponse(final Releasable releasable) {
-            Releasables.close(releasable);
-        }
-
-        @Override
-        default void onFailure(final Exception e) {
-        }
-    }
-
-    private void bumpPrimaryTerm(final long newPrimaryTerm, final PrimaryTermUpdateListener listener) {
+    private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm, final CheckedRunnable<E> onBlocked,
+                                                       @Nullable ActionListener<Releasable> combineWithAction) {
         assert Thread.holdsLock(mutex);
         assert newPrimaryTerm >= pendingPrimaryTerm;
         assert operationPrimaryTerm <= pendingPrimaryTerm;
@@ -2339,19 +2326,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         indexShardOperationPermits.asyncBlockOperations(new ActionListener<Releasable>() {
             @Override
             public void onFailure(final Exception e) {
+                try {
+                    innerFail(e);
+                } finally {
+                    if (combineWithAction != null) {
+                        combineWithAction.onFailure(e);
+                    }
+                }
+            }
+
+            private void innerFail(final Exception e) {
                 try {
                     failShard("exception during primary term transition", e);
                 } catch (AlreadyClosedException ace) {
                     // ignore, shard is already closed
-                } finally {
-                    listener.onFailure(e);
                 }
             }
 
             @Override
             public void onResponse(final Releasable releasable) {
                 final RunOnce releaseOnce = new RunOnce(releasable::close);
-                boolean success = false;
                 try {
                     assert operationPrimaryTerm <= pendingPrimaryTerm;
                     termUpdated.await();
@@ -2359,14 +2353,18 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                     // in the order submitted. We need to guard against another term bump
                     if (operationPrimaryTerm < newPrimaryTerm) {
                         operationPrimaryTerm = newPrimaryTerm;
-                        listener.onPrimaryTermUpdate();
+                        onBlocked.run();
                     }
-                    listener.onResponse(releaseOnce::run);
-                    success = true;
                 } catch (final Exception e) {
-                    onFailure(e);
+                    if (combineWithAction == null) {
+                        // otherwise leave it to combineWithAction to release the permit
+                        releaseOnce.run();
+                    }
+                    innerFail(e);
                 } finally {
-                    if (success == false) {
+                    if (combineWithAction != null) {
+                        combineWithAction.onResponse(releasable);
+                    } else {
                         releaseOnce.run();
                     }
                 }
@@ -2480,37 +2478,18 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                         throw new IndexShardNotStartedException(shardId, shardState);
                     }
 
-                    bumpPrimaryTerm(opPrimaryTerm, new PrimaryTermUpdateListener() {
-                        @Override
-                        public void onPrimaryTermUpdate() throws Exception {
-                            updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
-                            final long currentGlobalCheckpoint = getGlobalCheckpoint();
-                            final long maxSeqNo = seqNoStats().getMaxSeqNo();
-                            logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
-                                opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
-                            if (currentGlobalCheckpoint < maxSeqNo) {
-                                resetEngineToGlobalCheckpoint();
-                            } else {
-                                getEngine().rollTranslogGeneration();
-                            }
+                    bumpPrimaryTerm(opPrimaryTerm, () -> {
+                        updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
+                        final long currentGlobalCheckpoint = getGlobalCheckpoint();
+                        final long maxSeqNo = seqNoStats().getMaxSeqNo();
+                        logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
+                            opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
+                        if (currentGlobalCheckpoint < maxSeqNo) {
+                            resetEngineToGlobalCheckpoint();
+                        } else {
+                            getEngine().rollTranslogGeneration();
                         }
-
-                        @Override
-                        public void onResponse(final Releasable releasable) {
-                            if (allowCombineOperationWithPrimaryTermUpdate) {
-                                operationListener.onResponse(releasable);
-                            } else {
-                                Releasables.close(releasable);
-                            }
-                        }
-
-                        @Override
-                        public void onFailure(Exception e) {
-                            if (allowCombineOperationWithPrimaryTermUpdate) {
-                                operationListener.onFailure(e);
-                            }
-                        }
-                    });
+                    }, allowCombineOperationWithPrimaryTermUpdate ? operationListener : null);
 
                     if (allowCombineOperationWithPrimaryTermUpdate) {
                         logger.debug("operation execution has been combined with primary term update");

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is indeed much clearer, thanks. I integrated it as it is.

@tlrx
Copy link
Member Author

tlrx commented Dec 4, 2018

@ywelsch code updated again, can you have another look please?

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

assert Thread.holdsLock(mutex);
assert newPrimaryTerm > pendingPrimaryTerm;
assert newPrimaryTerm >= pendingPrimaryTerm;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe strengthen this assertion along the ways of
assert newPrimaryTerm > pendingPrimaryTerm || (newPrimaryTerm >= pendingPrimaryTerm && combineWithAction != null)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's also more explicit

@tlrx tlrx merged commit 5d684ca into elastic:master Dec 4, 2018
@tlrx tlrx deleted the combine-primary-term-update-with-acquire-all-operation branch December 4, 2018 13:39
@tlrx
Copy link
Member Author

tlrx commented Dec 4, 2018

Thanks @ywelsch

tlrx added a commit that referenced this pull request Dec 4, 2018
…term update (#36116)

This commit changes how an operation which requires all index shard
operations permits is executed when a primary term update is required:
the operation and the update are combined so that the operation is
executed after the primary term update under the same blocking
operation.

Closes #35850

Co-authored-by: Yannick Welsch <yannick@welsch.lu>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>bug :Distributed/Distributed A catch all label for anything in the Distributed Area. If you aren't sure, use this one. v6.6.0 v7.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[CI] Failures in IndexShardTests#testRollbackReplicaEngineOnPromotion
4 participants