Add two phased commit to Cluster State publishing #13062

Merged
merged 19 commits into from Sep 14, 2015

Projects

None yet

5 participants

@bleskes
Member
bleskes commented Aug 23, 2015

When publishing a new cluster state, the master will send it to all the node of the cluster, noting down how many master nodes responded successfully. The nodes do not yet process the new cluster state, but rather park it in memory. As soon as at least minimum master nodes have ack-ed the cluster state change, it is committed and a commit request is sent to all the node that responded so far (and will respond in the future). Once receiving the commit requests the nodes continue to process the cluster state change as they did before this change.

A few notable comments:

  1. For this change to have effect, min master nodes must be configured.
  2. All basic cluster state validation is done in the first phase of publish and is thus now part of ShardOperationResult
  3. A new COMMIT_TIMEOUT settings is introduced, dictating how long a master should wait for nodes to ack the first phase. Unlike PUBLISH_TIMEOUT, if waiting for a commit times out, the cluster state change will be rejected.
  4. Failing to achieve a min master node of acks, will cause the master to step down as it clearly doesn't have enough active followers.
  5. Previously there was a short window between the moment a master lost it's followers and it stepping down because of node fault detection failures. In this short window, the master could process any change (but fail to publish it). This PR closes this gap to 0.

I still have one no commit and some docs to add but I think we can start the review cycles.

@brwe @imotov and @jasontedor - can you have a careful look when you have time?

@bleskes bleskes changed the title from Add two phased to Cluster State publishing to Add two phased commit to Cluster State publishing Aug 23, 2015
@brwe brwe and 1 other commented on an outdated diff Aug 24, 2015
...va/org/elasticsearch/discovery/DiscoverySettings.java
@@ -57,6 +69,7 @@ public DiscoverySettings(Settings settings, NodeSettingsService nodeSettingsServ
nodeSettingsService.addListener(new ApplySettings());
this.noMasterBlock = parseNoMasterBlock(settings.get(NO_MASTER_BLOCK, DEFAULT_NO_MASTER_BLOCK));
this.publishTimeout = settings.getAsTime(PUBLISH_TIMEOUT, publishTimeout);
+ this.commitTimeout = settings.getAsTime(COMMIT_TIMEOUT, publishTimeout);
@brwe
brwe Aug 24, 2015 Contributor

should this be settings.getAsTime(COMMIT_TIMEOUT, commitTimeout); ?

@bleskes
bleskes Aug 24, 2015 Member

ykes. Yes… :(

On 24 Aug 2015, at 11:57, Britta Weber notifications@github.com wrote:

In core/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java:

@@ -57,6 +69,7 @@ public DiscoverySettings(Settings settings, NodeSettingsService nodeSettingsServ
nodeSettingsService.addListener(new ApplySettings());
this.noMasterBlock = parseNoMasterBlock(settings.get(NO_MASTER_BLOCK, DEFAULT_NO_MASTER_BLOCK));
this.publishTimeout = settings.getAsTime(PUBLISH_TIMEOUT, publishTimeout);

  •    this.commitTimeout = settings.getAsTime(COMMIT_TIMEOUT, publishTimeout);
    

should this be settings.getAsTime(COMMIT_TIMEOUT, commitTimeout); ?


Reply to this email directly or view it on GitHub.

@brwe brwe commented on the diff Aug 24, 2015
...ava/org/elasticsearch/discovery/zen/ZenDiscovery.java
@@ -669,12 +684,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
void handleNewClusterStateFromMaster(ClusterState newClusterState, final PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
final ClusterName incomingClusterName = newClusterState.getClusterName();
- /* The cluster name can still be null if the state comes from a node that is prev 1.1.1*/
- if (incomingClusterName != null && !incomingClusterName.equals(this.clusterName)) {
- logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", newClusterState.nodes().masterNode(), incomingClusterName);
- newStateProcessed.onNewClusterStateFailed(new IllegalStateException("received state from a node that is not part of the cluster"));
@brwe
brwe Aug 24, 2015 Contributor

why can we just remove this check?

@bleskes
bleskes Aug 25, 2015 Member

This one is moved to PublishClusterStateAction#validateIncomingState

@bleskes
Member
bleskes commented Aug 24, 2015

@brwe @imotov @jasontedor FYI I removed the no commit.

@imotov imotov commented on the diff Aug 24, 2015
.../discovery/zen/publish/PublishClusterStateAction.java
+ }
+ }
+
+ private class CommitClusterStateRequestHandler implements TransportRequestHandler<CommitClusterStateRequest> {
+ @Override
+ public void messageReceived(CommitClusterStateRequest request, final TransportChannel channel) throws Exception {
+ handleCommitRequest(request, channel);
+ }
+ }
+
+ protected static class CommitClusterStateRequest extends TransportRequest {
+
+ String stateUUID;
+
+ public CommitClusterStateRequest() {
+ }
@imotov
imotov Aug 24, 2015 Member

This constructor doesn't seem to be necessary.

@bleskes
bleskes Aug 26, 2015 Member

it's needed for the request to be created on the receiving node. It's used by reflection.

@imotov imotov and 1 other commented on an outdated diff Aug 24, 2015
.../discovery/zen/publish/PublishClusterStateAction.java
try {
- listener.onNewClusterState(lastSeenClusterState, new NewClusterStateListener.NewStateProcessed() {
- @Override
- public void onNewClusterStateProcessed() {
- try {
- channel.sendResponse(TransportResponse.Empty.INSTANCE);
- } catch (Throwable e) {
- logger.debug("failed to send response on cluster state processed", e);
- }
- }
+ timedout = committedOrFailedLatch.await(commitTimeout.millis(), TimeUnit.MILLISECONDS) == false;
+ } catch (InterruptedException e) {
@imotov
imotov Aug 24, 2015 Member

Maybe we should restore interrupted state or at least explain why we ignore it here?

@bleskes
bleskes Aug 26, 2015 Member

Added a note

@imotov imotov commented on the diff Aug 24, 2015
...overy/zen/publish/PublishClusterStateActionTests.java
+ */
+ public void testTimeoutOrCommit() throws Exception {
+ Settings settings = Settings.builder()
+ .put(DiscoverySettings.COMMIT_TIMEOUT, "1ms").build(); // short but so we will sometime commit sometime timeout
+
+ MockNode master = createMockNode("master", settings);
+ MockNode node = createMockNode("node", settings);
+ ClusterState state = ClusterState.builder(master.clusterState)
+ .nodes(DiscoveryNodes.builder(master.clusterState.nodes()).put(node.discoveryNode).masterNodeId(master.discoveryNode.id())).build();
+
+ for (int i = 0; i < 10; i++) {
+ state = ClusterState.builder(state).incrementVersion().build();
+ logger.debug("--> publishing version [{}], UUID [{}]", state.version(), state.stateUUID());
+ boolean success;
+ try {
+ publishState(master.action, state, master.clusterState, 2).await(1, TimeUnit.HOURS);
@imotov
imotov Aug 24, 2015 Member

This test fails here if I run

mvn test -Pdev -pl org.elasticsearch:elasticsearch -Dtests.seed=8302BF8B33D58713 -Dtests.class=org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests -Des.logger.level=ERROR -Dtests.assertion.disabled=false -Dtests.security.manager=true -Dtests.heap.size=512m -Dtests.locale=ar_QA -Dtests.timezone=Pacific/Kwajalein -Dtests.iters=200

a few times. Here is a typical error: https://gist.github.com/imotov/a1512b9a27a2854c90f5

@bleskes
Member
bleskes commented Aug 25, 2015

I update the docs. @clintongormley @jasontedor - I would love a native English speaker to review it - can you take a look?

@jasontedor
Contributor

@bleskes I left a review of the updated docs but I'm still in progress on a review of the code.

@clintongormley clintongormley commented on an outdated diff Aug 25, 2015
docs/reference/modules/discovery/zen.asciidoc
@@ -108,12 +108,18 @@ considered failed. Defaults to `3`.
The master node is the only node in a cluster that can make changes to the
cluster state. The master node processes one cluster state update at a time,
applies the required changes and publishes the updated cluster state to all
-the other nodes in the cluster. Each node receives the publish message,
-updates its own cluster state and replies to the master node, which waits for
-all nodes to respond, up to a timeout, before going ahead processing the next
-updates in the queue. The `discovery.zen.publish_timeout` is set by default
-to 30 seconds and can be changed dynamically through the
-<<cluster-update-settings,cluster update settings api>>
+the other nodes in the cluster. Each node receives the publish message, acknowledges
+it but do *not* yet apply it. If the master does not receive acknowledgement from
@clintongormley clintongormley commented on an outdated diff Aug 25, 2015
docs/reference/modules/discovery/zen.asciidoc
@@ -108,12 +108,18 @@ considered failed. Defaults to `3`.
The master node is the only node in a cluster that can make changes to the
cluster state. The master node processes one cluster state update at a time,
applies the required changes and publishes the updated cluster state to all
-the other nodes in the cluster. Each node receives the publish message,
-updates its own cluster state and replies to the master node, which waits for
-all nodes to respond, up to a timeout, before going ahead processing the next
-updates in the queue. The `discovery.zen.publish_timeout` is set by default
-to 30 seconds and can be changed dynamically through the
-<<cluster-update-settings,cluster update settings api>>
+the other nodes in the cluster. Each node receives the publish message, acknowledges
+it but do *not* yet apply it. If the master does not receive acknowledgement from
+at least `discovery.zen.minimum_master_nodes` nodes within a certain time (controlled by
+the `discovery.zen.commit_timeout` setting and defaults to 30 seconds) the cluster state
+change is rejected.
+
+Once enough nodes have responded, the cluster state is committed and a message will
+be sent to all the nodes. The nodes then proceed and apply the new cluster state to their
@clintongormley
clintongormley Aug 25, 2015 Member

proceed and -> proceed to

@clintongormley clintongormley commented on an outdated diff Aug 25, 2015
docs/resiliency/index.asciidoc
@@ -56,6 +56,21 @@ If you encounter an issue, https://github.com/elasticsearch/elasticsearch/issues
We are committed to tracking down and fixing all the issues that are posted.
[float]
+=== Use two phase commit for Cluster State publishing (STATUS: ONGOING)
+
+A master node in Elasticsearch continuously https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery-zen.html#fault-detection[monitors the cluster nodes]
+and removes any node from the cluster that doesn't respond to it's pings in a timely
@clintongormley clintongormley commented on an outdated diff Aug 25, 2015
docs/resiliency/index.asciidoc
@@ -56,6 +56,21 @@ If you encounter an issue, https://github.com/elasticsearch/elasticsearch/issues
We are committed to tracking down and fixing all the issues that are posted.
[float]
+=== Use two phase commit for Cluster State publishing (STATUS: ONGOING)
+
+A master node in Elasticsearch continuously https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery-zen.html#fault-detection[monitors the cluster nodes]
+and removes any node from the cluster that doesn't respond to it's pings in a timely
+fashion. If the master is left with less nodes than the `discovery.zen.minimum_master_nodes`
@clintongormley
clintongormley Aug 25, 2015 Member

less -> fewer

@clintongormley clintongormley and 1 other commented on an outdated diff Aug 25, 2015
docs/resiliency/index.asciidoc
@@ -56,6 +56,21 @@ If you encounter an issue, https://github.com/elasticsearch/elasticsearch/issues
We are committed to tracking down and fixing all the issues that are posted.
[float]
+=== Use two phase commit for Cluster State publishing (STATUS: ONGOING)
+
+A master node in Elasticsearch continuously https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery-zen.html#fault-detection[monitors the cluster nodes]
+and removes any node from the cluster that doesn't respond to it's pings in a timely
+fashion. If the master is left with less nodes than the `discovery.zen.minimum_master_nodes`
+settings, it will step down and a new master election will start.
+
+When a network partition occurs causing a master to loose many followers, there is a
@clintongormley
clintongormley Aug 25, 2015 Member

When a network partition causes a master node to lose many followers, there is a short window in time until the node loss is detected and the master steps down.

@bleskes
bleskes Aug 25, 2015 Member

better. thx.

@clintongormley
Member

left some minor docs suggestions

@brwe brwe and 1 other commented on an outdated diff Aug 26, 2015
...ticsearch/cluster/service/InternalClusterService.java
@@ -482,8 +482,14 @@ public void run() {
// we publish here before we send a notification to all the listeners, since if it fails
// we don't want to notify
if (newClusterState.nodes().localNodeMaster()) {
- logger.debug("publishing cluster state version {}", newClusterState.version());
- discoveryService.publish(clusterChangedEvent, ackListener);
+ logger.debug("publishing cluster state version [{}]", newClusterState.version());
+ try {
+ discoveryService.publish(clusterChangedEvent, ackListener);
+ } catch (Throwable t) {
@brwe
brwe Aug 26, 2015 Contributor

I guess we need that to catch the FailedToCommitException? If so, why not only catch that?

@bleskes
bleskes Aug 26, 2015 Member

That's a good one. I've tightened things up to make sure can rely on FaileToCommitException to indicate something wrong happened before committing and we can safely reject the CS (it will not be committed on any node)

@brwe brwe commented on the diff Aug 26, 2015
.../discovery/zen/publish/PublishClusterStateAction.java
+ }
+
+ if (timedout) {
+ markAsFailed("timed out waiting for commit (commit timeout [" + commitTimeout + "]");
+ }
+ if (isCommitted() == false) {
+ throw new FailedToCommitException("{} enough masters to ack sent cluster state. [{}] left",
+ timedout ? "timed out while waiting for" : "failed to get", neededMastersToCommit);
+ }
+ }
+
+ synchronized public boolean isCommitted() {
+ return committed;
+ }
+
+ synchronized public void onNodeSendAck(DiscoveryNode node) {
@brwe
brwe Aug 26, 2015 Contributor

can we give that a better name that describes what the function does? something like checkCommittedAndSendCommitIfSo().

@bleskes
bleskes Aug 26, 2015 Member

The naming suggest when the methods are called (when a node acked the sent CS). It's what we use in many other places, I think it's OK?

@brwe
brwe Aug 27, 2015 Contributor

I understand we sometimes have to give vague names to methods when they are needed for some sort of abstraction but this is not the case here. We have the following methods:

  • onNodeSendAck: sends a commit to the node in case state is committed and otherwise check if state is committed
  • onMasterNodeSendAck the actual check if the state is committed now
  • onMasterNodeDone countdown outstanding send requests and additional check if too many master nodes failed and quorum not met
  • onNodeSendFailed countdown outstanding publish requests and calls onMasterNodeDone for the extra check

None of them are ever overwritten or in any other way part of an abstraction that would require them to have generic names. I find it hard to read the code as is and think it would be much easier if there was at least a hint in the method names.

@brwe brwe commented on an outdated diff Aug 26, 2015
.../discovery/zen/publish/PublishClusterStateAction.java
+ synchronized public void onNodeSendAck(DiscoveryNode node) {
+ if (committed) {
+ assert sendAckedBeforeCommit.isEmpty();
+ sendCommitToNode(node, clusterState, this);
+ } else if (committedOrFailed) {
+ logger.trace("ignoring ack from [{}] for cluster state version [{}]. already failed", node, clusterState.version());
+ } else {
+ // we're still waiting
+ sendAckedBeforeCommit.add(node);
+ if (node.isMasterNode()) {
+ onMasterNodeSendAck(node);
+ }
+ }
+ }
+
+ synchronized private void onMasterNodeSendAck(DiscoveryNode node) {
@brwe
brwe Aug 26, 2015 Contributor

same as above, function name says nothing about what it does.

@brwe brwe and 1 other commented on an outdated diff Aug 26, 2015
.../discovery/zen/publish/PublishClusterStateAction.java
+ validateIncomingState(incomingState, lastSeenClusterState);
+
+ lastSeenClusterState = incomingState;
+ lastSeenClusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
+ }
+ channel.sendResponse(TransportResponse.Empty.INSTANCE);
+ }
+
+ // package private for testing
+ /**
+ * does simple sanity check of the incoming cluster state. Throws an exception on rejections.
+ */
+ void validateIncomingState(ClusterState incomingState, ClusterState lastSeenClusterState) {
+ final ClusterName incomingClusterName = incomingState.getClusterName();
+ if (!incomingClusterName.equals(PublishClusterStateAction.this.clusterName)) {
+ logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().masterNode(), incomingClusterName);
@brwe
brwe Aug 26, 2015 Contributor

why the PublishClusterStateAction.this. ?

@bleskes
bleskes Aug 26, 2015 Member

left over copy paste. I'll remove

@brwe brwe and 1 other commented on an outdated diff Aug 26, 2015
.../discovery/zen/publish/PublishClusterStateAction.java
}
}
+ onMasterNodeDone(node);
+ }
+
+ synchronized private void onMasterNodeDone(DiscoveryNode node) {
+ pendingMasterNodes--;
+ if (pendingMasterNodes == 0 && neededMastersToCommit > 0) {
+ markAsFailed("All master nodes acked or failed but [" + neededMastersToCommit + "] acks are still needed");
@brwe
brwe Aug 26, 2015 Contributor

what does this message mean? did the master nodes ack or not?

@bleskes
bleskes Aug 26, 2015 Member

some did, some failed. I'll change it and see if it's better.

@bleskes
Member
bleskes commented Aug 26, 2015

@imotov @brwe pushed another commit addressing comments so far

@imotov imotov and 1 other commented on an outdated diff Aug 26, 2015
.../discovery/zen/publish/PublishClusterStateAction.java
}
}
- } catch (InterruptedException e) {
- // ignore & restore interrupt
- Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ throw new ElasticsearchException("failed to serialize cluster_state for publishing to node {}", e, node);
}
}
}
private void sendFullClusterState(ClusterState clusterState, @Nullable Map<Version, BytesReference> serializedStates,
@imotov
imotov Aug 26, 2015 Member

It looks like with the latest changes serializedStates can no longer be null, so we should probably remove Nullable here.

@bleskes
bleskes Aug 27, 2015 Member

correct. Removed.

@imotov imotov commented on the diff Aug 26, 2015
.../discovery/zen/publish/PublishClusterStateAction.java
}
}
}
private void sendFullClusterState(ClusterState clusterState, @Nullable Map<Version, BytesReference> serializedStates,
- DiscoveryNode node, AtomicBoolean timedOutWaitingForNodes, TimeValue publishTimeout,
- BlockingClusterStatePublishResponseHandler publishResponseHandler) {
- BytesReference bytes = null;
- if (serializedStates != null) {
- bytes = serializedStates.get(node.version());
- }
+ DiscoveryNode node, TimeValue publishTimeout, SendingController sendingController) {
+ BytesReference bytes = serializedStates.get(node.version());
if (bytes == null) {
try {
bytes = serializeFullClusterState(clusterState, node.version());
@imotov
imotov Aug 26, 2015 Member

if (serializedStates != null) { is no longer needed

@bleskes
bleskes Aug 27, 2015 Member

confused about this one?

@imotov
imotov Aug 27, 2015 Member

Sorry, it's not visible in github, but on the next line after this one there is a check if serializedStates is null or not. However, it cannot be null here. If it was null it would have failed with NPE 4 lines above where we try to retrieve version from it.

@bleskes
bleskes Aug 27, 2015 Member

I see. removed.

@imotov imotov and 1 other commented on an outdated diff Aug 26, 2015
.../discovery/zen/publish/PublishClusterStateAction.java
}
}
+ private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final SendingController sendingController) {
+ try {
+ logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]", clusterState.stateUUID(), clusterState.version(), node);
+ TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false);
+ // no need to put a timeout on the options here, because we want the response to eventually be received
+ // and not log an error if it arrives after the timeout
+ transportService.sendRequest(node, COMMIT_ACTION_NAME,
+ new CommitClusterStateRequest(clusterState.stateUUID()),
+ options, // no need to compress, we already compressed the bytes
@imotov
imotov Aug 26, 2015 Member

This comment doesn't make much sense here.

@bleskes
bleskes Aug 27, 2015 Member

yeah, removed

@imotov imotov commented on an outdated diff Aug 26, 2015
...elasticsearch/discovery/zen/ZenDiscoveryUnitTest.java
import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.common.transport.DummyTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
@imotov
imotov Aug 26, 2015 Member

Unused import

@imotov imotov commented on an outdated diff Aug 26, 2015
...elasticsearch/discovery/zen/ZenDiscoveryUnitTest.java
@@ -41,9 +45,10 @@ public void testShouldIgnoreNewClusterState() {
ClusterName clusterName = new ClusterName("abc");
DiscoveryNodes.Builder currentNodes = DiscoveryNodes.builder();
- currentNodes.masterNodeId("a");
+ currentNodes.masterNodeId("a").put(new DiscoveryNode("a", DummyTransportAddress.INSTANCE, Version.CURRENT));
+ ;
@imotov
imotov Aug 26, 2015 Member

Unused ";"

@imotov imotov and 1 other commented on an outdated diff Aug 26, 2015
.../discovery/zen/publish/PublishClusterStateAction.java
- private void publish(final ClusterChangedEvent clusterChangedEvent, final Set<DiscoveryNode> nodesToPublishTo,
- final BlockingClusterStatePublishResponseHandler publishResponseHandler) {
+ try {
+ innerPublish(clusterChangedEvent, nodesToPublishTo, sendingController, sendFullVersion, serializedStates, serializedDiffs);
+ } catch (Discovery.FailedToCommitClusterStateException t) {
+ throw t;
+ } catch (Throwable t) {
+ // try to fail committing, in cause it's still on going
+ sendingController.markAsFailed("unexpected error [" + t.getMessage() + "]");
+ if (sendingController.isCommitted() == false) {
@imotov
imotov Aug 26, 2015 Member

Why are we not using the return value of sendingController.markAsFailed here but instead checking the committed status again?

@bleskes
bleskes Aug 27, 2015 Member

good one. Simplifying..

@brwe brwe commented on an outdated diff Aug 27, 2015
.../discovery/zen/publish/PublishClusterStateAction.java
}
}
+ onMasterNodeDone(node);
+ }
+
+ synchronized private void onMasterNodeDone(DiscoveryNode node) {
+ pendingMasterNodes--;
+ if (pendingMasterNodes == 0 && neededMastersToCommit > 0) {
+ markAsFailed("no more pending master nodes, but [" + neededMastersToCommit + "] acks are still needed");
@brwe
brwe Aug 27, 2015 Contributor

hm. so what I meant in my last comment is that this log message tells what the variables pendingMasterNodes and neededMastersToCommit are set to but not what that means. I think at this point less than a quorum of master eligible nodes have responded with success and the rest failed and therefore we cannot apply the cluster state. Is that right? If so, can we maybe change that to:

markAsFailed("received responses from all master eligible nodes that are known but less than minimum_master_nodes responded with success and all the others failed ({})", numberOfFailedMasterNodes); ?

@brwe brwe commented on the diff Aug 27, 2015
.../discovery/zen/publish/PublishClusterStateAction.java
+ public void writeTo(StreamOutput out) throws IOException {
+ super.writeTo(out);
+ out.writeString(stateUUID);
+ }
+ }
+
+
+ class SendingController {
+
+ private final ClusterState clusterState;
+
+ public BlockingClusterStatePublishResponseHandler getPublishResponseHandler() {
+ return publishResponseHandler;
+ }
+
+ private final BlockingClusterStatePublishResponseHandler publishResponseHandler;
@brwe
brwe Aug 27, 2015 Contributor

I am not sure it is a good idea to make this a member of sendingController. It is mostly used in PublishClusterStateAction anyway and only when a node fails to respond to the cluster state sending we use it in SendingController. This makes it slightly more difficult to read the code because now I have to look in two places to see how this handler is actually used. Can we remove it here and just use it in PublishClusterStateAction?

@bleskes
bleskes Aug 27, 2015 Member

we discussed it and I think the alternative will be more confusing as we will need to pass it along in all places.

@brwe brwe and 1 other commented on an outdated diff Aug 27, 2015
.../discovery/zen/publish/PublishClusterStateAction.java
}
}
+ onMasterNodeDone(node);
+ }
+
+ synchronized private void onMasterNodeDone(DiscoveryNode node) {
@brwe
brwe Aug 27, 2015 Contributor

parameter is not used?

@bleskes
bleskes Aug 27, 2015 Member

removed.

@brwe brwe and 1 other commented on an outdated diff Aug 27, 2015
.../discovery/zen/publish/PublishClusterStateAction.java
+
+
+ class SendingController {
+
+ private final ClusterState clusterState;
+
+ public BlockingClusterStatePublishResponseHandler getPublishResponseHandler() {
+ return publishResponseHandler;
+ }
+
+ private final BlockingClusterStatePublishResponseHandler publishResponseHandler;
+ final ArrayList<DiscoveryNode> sendAckedBeforeCommit = new ArrayList<>();
+ final CountDownLatch committedOrFailedLatch;
+
+ // writes and reads of these are protected under synchronization
+ boolean committedOrFailed; // true if a decision was made w.r.t committing or failing
@brwe
brwe Aug 27, 2015 Contributor

do we really need committedOrFailed or can we not get the same information from committedOrFailedLatch.getCount()?

@bleskes
bleskes Aug 27, 2015 Member

yeah, we can reuse the latch (which is how it used to be). I thought his is clearer (and latch is only used for waiting). Can change if you want.

@brwe
brwe Aug 28, 2015 Contributor

It confused me, so I'd rather change it.

@bleskes
Member
bleskes commented Aug 27, 2015

@imotov @brwe I pushed another commit. I think I addressed all feedback so far.

@imotov
Member
imotov commented Aug 27, 2015

LGTM

@brwe
Contributor
brwe commented Aug 28, 2015

Just #13062 (comment) left and I am not too passionate about it. LGTM too otherwise.

@bleskes
Member
bleskes commented Aug 28, 2015

@brwe thx. I update the PR based on your last comment (and rebased to latest master)

@bleskes bleskes added a commit to bleskes/elasticsearch that referenced this pull request Sep 11, 2015
@bleskes bleskes Discovery: Add a dedicate queue for incoming ClusterStates
The initial implementation of two phase commit based cluster state publishing (#13062) relied on a single in memory "pending" cluster state that is only processed by ZenDiscovery once committed by the master. While this is fine on it's own, it resulted in an issue with acknowledged APIs, such as the open index API, in the extreme case where a node falls behind and receives a commit message after a new cluster state has been published. Specifically:

1) Master receives and acked-API call and publishes cluster state CS1
2) Master waits for a min-master nodes to receives CS1 and commits it.
3) All nodes that have responded to CS1 are sent a commit message, however, node N didn't respond yet
4) Master waits for publish timeout (defaults to 30s) for all nodes to process the commit. Node N fails to do so.
5) Master publishes a cluster state CS2. Node N responds to cluster state CS1's publishing but receives cluster state CS2 before the commit for CS1 arrives.
6) The commit message for cluster CS1 is processed on node N, but fails because CS2 is pending. This caused the acked API in step 1 to return (but CS2 , is not yet processed).

In this case, the action indicated by CS1 is not yet executed on node N and therefore the acked API calls return pre-maturely. Note that once CS2 is processed but the change in CS1 takes effect (cluster state operations are safe to batch and we do so all the time).

An example failure can be found on: http://build-us-00.elastic.co/job/es_feature_two_phase_pub/314/

This commit extracts the already existing pending cluster state queue (processNewClusterStates) from ZenDiscovery into it's own class, which serves as a temporary container for in-flight cluster states. Once committed the cluster states are transferred to ZenDiscovery as they used to before. This allows "lagging" cluster states to still be successfully committed and processed (and likely to be ignored as a newer cluster state has already been processed).

As a side effect, all batching logic is now extracted from ZenDiscovery and is unit tested.
80b59e0
@bleskes bleskes merged commit 218979d into elastic:master Sep 14, 2015

1 check passed

CLA Commit author is a member of Elasticsearch
Details
@bleskes bleskes added a commit that referenced this pull request Sep 14, 2015
@bleskes bleskes Discovery: Add two phased commit to Cluster State publishing
When publishing a new cluster state, the master will send it to all the node of the cluster, noting down how many *master* nodes responded successfully. The nodes do not yet process the new cluster state, but rather park it in memory. As soon as at least minimum master nodes have ack-ed the cluster state change, it is committed and a commit request is sent to all the node that responded so far (and will respond in the future). Once receiving the commit requests the nodes continue to process the cluster state change as they did before this change.

A few notable comments:
1. For this change to have effect, min master nodes must be configured.
2. All basic cluster state validation is done in the first phase of publish and is thus now part of `ShardOperationResult`
3. A new `COMMIT_TIMEOUT` settings is introduced, dictating how long a master should wait for nodes to ack the first phase. Unlike `PUBLISH_TIMEOUT`, if waiting for a commit times out, the cluster state change will be rejected.
4. Failing to achieve a min master node of acks, will cause the master to step down as it clearly doesn't have enough active followers.
5. Previously there was a short window between the moment a master lost it's followers and it stepping down because of node fault detection failures. In this short window, the master could process any change (but fail to publish it). This PR closes this gap to 0.
6. A dedicated pending cluster states queue was added to keep pending non-comitted cluster states and manage the logic around processing committed cluster states. See #13303 for details.

Closes #13062 , Closes #13303
15f4863
@bleskes bleskes added the v3.0.0 label Sep 15, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment