Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce clean transition on primary promotion #24925

Merged
merged 14 commits into from May 30, 2017

Conversation

jasontedor
Copy link
Member

This commit introduces a clean transition from the old primary term to the new primary term when a replica is promoted primary. To accomplish this, we delay all operations before incrementing the primary term. The delay is guaranteed to be in place before we increment the term, and then all operations that are delayed are executed after the delay is removed which asynchronously happens on another thread. This thread does not progress until in-flight operations that were executing are completed, and after these operations drain, the delayed operations re-acquire permits and are executed.

Relates #10708

This commit introduces a clean transition from the old primary term to
the new primary term when a replica is promoted primary. To accomplish
this, we delay all operations before incrementing the primary term. The
delay is guaranteed to be in place before we increment the term, and
then all operations that are delayed are executed after the delay is
removed which asynchronously happens on another thread. This thread does
not progress until in-flight operations that were executing are
completed, and after these operations drain, the delayed operations
re-acquire permits and are executed.
@jasontedor jasontedor force-pushed the primary-promotion-transition branch from 8d0b64a to 3b4a5c0 Compare May 28, 2017 17:55
Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jasontedor . Basics flow looks good. I left some suggestions.

private final ShardId shardId;
private final Logger logger;
private final ThreadPool threadPool;

private static final int TOTAL_PERMITS = Integer.MAX_VALUE;
// fair semaphore to ensure that blockOperations() does not starve under thread contention
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true);
final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE, true); // fair to ensure a blocking thread is not starved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why switch from the TOTAL_PERMITS field?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't even up come with a plausible theory how that happened, that was completely unintentional.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I know what happened! I was playing around with the idea of using a shared/exclusive lock so removed the semaphore and then added it back manually (so with this change) after I decided against it (because it doesn't give an easy way to see how many threads hold the shared lock which we use in testing).

* @throws IndexShardClosedException if operation permit has been closed
*/
public <E extends Exception> void blockOperations(long timeout, TimeUnit timeUnit, CheckedRunnable<E> onBlocked) throws
InterruptedException, TimeoutException, E {
<E extends Exception> void syncBlockOperations(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - I like the blockOperations naming... syncX is just one letter away from asyncX..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.

@@ -92,6 +154,7 @@ public void close() {
synchronized (this) {
queuedActions = delayedOperations;
delayedOperations = null;
delayed = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert again that it was true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.

}
}

private <E extends Exception> void doBlockOperations(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we doc / make it clearer in the name that this methods releases the delayed operations?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better yet, how about the following? This way it's clear how a delayOperations call is matched by the undelay logic.

diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java
index 7fc56a1..f1c8b0d 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java
@@ -27,6 +27,7 @@ import org.elasticsearch.action.support.ThreadedActionListener;
 import org.elasticsearch.common.CheckedRunnable;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext;
 import org.elasticsearch.threadpool.ThreadPool;
 
@@ -95,7 +96,11 @@ final class IndexShardOperationPermits implements Closeable {
             throw new IndexShardClosedException(shardId);
         }
         delayOperations();
-        doBlockOperations(timeout, timeUnit, onBlocked);
+        try {
+            doBlockOperations(timeout, timeUnit, onBlocked);
+        } finally {
+            releasedDelayedOperations();
+        }
     }
 
     /**
@@ -110,12 +115,21 @@ final class IndexShardOperationPermits implements Closeable {
      */
     <E extends Exception> void asyncBlockOperations(final long timeout, final TimeUnit timeUnit, final CheckedRunnable<E> onBlocked) {
         delayOperations();
-        threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
-            try {
-                doBlockOperations(timeout, timeUnit, onBlocked);
-            } catch (final Exception e) {
+        threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() {
+            @Override
+            public void onFailure(Exception e) {
                 throw new RuntimeException(e);
             }
+
+            @Override
+            protected void doRun() throws Exception {
+                doBlockOperations(timeout, timeUnit, onBlocked);
+            }
+
+            @Override
+            public void onAfter() {
+                releasedDelayedOperations();
+            }
         });
     }
 
@@ -150,25 +164,30 @@ final class IndexShardOperationPermits implements Closeable {
                 throw new TimeoutException("timed out during blockOperations");
             }
         } finally {
-            final List<ActionListener<Releasable>> queuedActions;
-            synchronized (this) {
-                queuedActions = delayedOperations;
-                delayedOperations = null;
-                delayed = false;
-            }
-            if (queuedActions != null) {
-                // Try acquiring permits on fresh thread (for two reasons):
-                // - blockOperations can be called on recovery thread which can be expected to be interrupted when recovery is cancelled.
-                //   Interruptions are bad here as permit acquisition will throw an InterruptedException which will be swallowed by
-                //   ThreadedActionListener if the queue of the thread pool on which it submits is full.
-                // - if permit is acquired and queue of the thread pool which the ThreadedActionListener uses is full, the onFailure
-                //   handler is executed on the calling thread. This should not be the recovery thread as it would delay the recovery.
-                threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
-                    for (ActionListener<Releasable> queuedAction : queuedActions) {
-                        acquire(queuedAction, null, false);
-                    }
-                });
-            }
+            releasedDelayedOperations();
+        }
+    }
+
+    private void releasedDelayedOperations() {
+        final List<ActionListener<Releasable>> queuedActions;
+        synchronized (this) {
+            assert delayed;
+            queuedActions = delayedOperations;
+            delayedOperations = null;
+            delayed = false;
+        }
+        if (queuedActions != null) {
+            // Try acquiring permits on fresh thread (for two reasons):
+            // - blockOperations can be called on recovery thread which can be expected to be interrupted when recovery is cancelled.
+            //   Interruptions are bad here as permit acquisition will throw an InterruptedException which will be swallowed by
+            //   ThreadedActionListener if the queue of the thread pool on which it submits is full.
+            // - if permit is acquired and queue of the thread pool which the ThreadedActionListener uses is full, the onFailure
+            //   handler is executed on the calling thread. This should not be the recovery thread as it would delay the recovery.
+            threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
+                for (ActionListener<Releasable> queuedAction : queuedActions) {
+                    acquire(queuedAction, null, false);
+                }
+            });
         }
     }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.

try {
doBlockOperations(timeout, timeUnit, onBlocked);
} catch (final Exception e) {
throw new RuntimeException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should fail the shard if something goes wrong here... maybe we should make the CheckedRunnable it's own AbstractRunnable so we can call onFailure and let the caller decide what to do?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.

if (delayed) {
throw new IllegalStateException("operations are already delayed");
} else {
delayed = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can pre-create the delayedOperations list. Stronger yet, maybe we should use the existence of the list as a marker that ops should be delayed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly prefer the explicit boolean flag rather than implicit dependency on the list being null versus not null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. We can also maybe just have a something we can drain and let it be a final field.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.

try {
synchronized (this) {
releasable = tryAcquire();
if (releasable == null) {
// blockOperations is executing, this operation will be retried by blockOperations once it finishes
assert delayed;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we used delay to control the flow here? I think it be easier to read that to make tryAcquire check on delay. WDYT?

diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java
index 7fc56a1..ebab08d 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java
@@ -190,10 +190,7 @@ final class IndexShardOperationPermits implements Closeable {
         final Releasable releasable;
         try {
             synchronized (this) {
-                releasable = tryAcquire();
-                if (releasable == null) {
-                    assert delayed;
-                    // operations are delayed, this operation will be retried by doBlockOperations once the delay is remoked
+                if (delayed) {
                     if (delayedOperations == null) {
                         delayedOperations = new ArrayList<>();
                     }
@@ -205,6 +202,13 @@ final class IndexShardOperationPermits implements Closeable {
                     } else {
                         delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired));
                     }
+                    releasable = null;
+                } else {
+                    releasable = tryAcquire();
+                    assert releasable != null;
+                }
+                if (releasable == null) {
+                    assert delayed;
                     return;
                 }
             }
@@ -212,7 +216,10 @@ final class IndexShardOperationPermits implements Closeable {
             onAcquired.onFailure(e);
             return;
         }
-        onAcquired.onResponse(releasable);
+        if (releasable != null) {
+            // if it's null operations are delayed, this operation will be retried by doBlockOperations once the delay is remoked
+            onAcquired.onResponse(releasable);
+        }
     }
 
     @Nullable private Releasable tryAcquire() throws InterruptedException {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can take it a little more, see my push.

@jasontedor
Copy link
Member Author

Back to you @bleskes.

* master:
  Add a second refresh to concurrent relocation test
  Add a dummy_index to upgrade tests to ensure we recover fine with replicas (elastic#24937)
  Rework bwc snapshot projects to build up to two bwc versions (elastic#24870)
  Move the IndexDeletionPolicy to be engine internal (elastic#24930)
  [Tests] Harden InternalExtendedStatsTests (elastic#24934)
  TCorrecting api name (elastic#24924)
  Add search method to high level REST client (elastic#24796)
  Add fromXContent method to ClearScrollResponse (elastic#24909)
  ClearScrollRequest to implement ToXContentObject (elastic#24907)
  SearchScrollRequest to implement ToXContentObject (elastic#24906)
  Fix bug in weight computation for query cache
@jasontedor
Copy link
Member Author

@bleskes I think that a4d2bf3 is necessary. What do you think?

* master:
  Fix typo in comment in ReplicationOperation.java
  Prevent Index & Delete request primaryTerm getter/setter, setShardId setter
  Drop name from TokenizerFactory (elastic#24869)
  Correctly set doc_count when MovAvg "predicts" values on existing buckets (elastic#24892)
  Handle primary failure handling replica response
  Add missing word to terms-query.asciidoc (elastic#24960)
  Correct some spelling in match-phrase-prefix docs (elastic#24956)
  testConcurrentWriteViewsAndSnapshot shouldn't flush concurrently
  [TEST] Fix FieldSortIT failures
  Add doc_count to ParsedMatrixStats (elastic#24952)
  Add document count to Matrix Stats aggregation response (elastic#24776)
  Fix script field sort returning Double.MAX_VALUE for all documents (elastic#24942)
Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Left some minor suggestions.

}
doBlockOperations(timeout, timeUnit, onBlocked);
} finally {
releasedDelayedOperations();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - an extra d? releasedDelayed..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.

final long timeout,
final TimeUnit timeUnit,
final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E {
if (Assertions.ENABLED) {
synchronized (this) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment this is for visibility?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.

} else {
delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired));
}
return;
} else {
releasable = tryAcquire();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just make this be called acquire and make it never return null (but blow up if it can't acquire (as it is not non-delayed)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I will do that in a follow-up immediately after this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened the follow-up: #24971

@jasontedor jasontedor merged commit ddbc468 into elastic:master May 30, 2017
@jasontedor jasontedor deleted the primary-promotion-transition branch May 30, 2017 15:47
@clintongormley clintongormley added :Distributed/Engine Anything around managing Lucene and the Translog in an open shard. and removed :Sequence IDs labels Feb 14, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed/Engine Anything around managing Lucene and the Translog in an open shard. >enhancement v6.0.0-alpha2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants