Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node receiving a cluster state with a wrong master node should reject and throw an error #9963

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 7 additions & 11 deletions src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
Expand Up @@ -744,11 +744,6 @@ public void onFailure(String source, Throwable t) {
assert newClusterState.nodes().masterNode() != null : "received a cluster state without a master";
assert !newClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) : "received a cluster state with a master block";

ClusterState currentState = clusterService.state();
if (shouldIgnoreNewClusterState(logger, currentState, newClusterState)) {
return;
}

clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
Expand All @@ -766,7 +761,7 @@ public ClusterState execute(ClusterState currentState) {
if (updatedState == null) {
updatedState = currentState;
}
if (shouldIgnoreNewClusterState(logger, currentState, updatedState)) {
if (shouldIgnoreOrRejectNewClusterState(logger, currentState, updatedState)) {
return currentState;
}

Expand Down Expand Up @@ -876,16 +871,17 @@ static ClusterState selectNextStateToProcess(Queue<ProcessClusterState> processN

/**
* In the case we follow an elected master the new cluster state needs to have the same elected master and
* the new cluster state version needs to be equal or higher than our cluster state version. If either conditions
* are true then the cluster state is dated and we should ignore it.
* the new cluster state version needs to be equal or higher than our cluster state version.
* If the first condition fails we reject the cluster state and throw an error.
* If the second condition fails we ignore the cluster state.
*/
static boolean shouldIgnoreNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) {
static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) {
if (currentState.nodes().masterNodeId() == null) {
return false;
}
if (!currentState.nodes().masterNodeId().equals(newClusterState.nodes().masterNodeId())) {
logger.warn("received a cluster state from a different master then the current one, ignoring (received {}, current {})", newClusterState.nodes().masterNode(), currentState.nodes().masterNode());
return true;
logger.warn("received a cluster state from a different master then the current one, rejecting (received {}, current {})", newClusterState.nodes().masterNode(), currentState.nodes().masterNode());
throw new ElasticsearchIllegalStateException("cluster state from a different master then the current one, rejecting (received " + newClusterState.nodes().masterNode() + ", current " + currentState.nodes().masterNode() + ")");
} else if (newClusterState.version() < currentState.version()) {
// if the new state has a smaller version, and it has the same master node, then no need to process it
logger.debug("received a cluster state that has a lower version than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version());
Expand Down
Expand Up @@ -21,14 +21,15 @@

import com.google.common.collect.Maps;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.*;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler;
Expand Down Expand Up @@ -190,25 +191,34 @@ public void messageReceived(BytesTransportRequest request, final TransportChanne
ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
clusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
logger.debug("received cluster state version {}", clusterState.version());
listener.onNewClusterState(clusterState, new NewClusterStateListener.NewStateProcessed() {
@Override
public void onNewClusterStateProcessed() {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (Throwable e) {
logger.debug("failed to send response on cluster state processed", e);
try {
listener.onNewClusterState(clusterState, new NewClusterStateListener.NewStateProcessed() {
@Override
public void onNewClusterStateProcessed() {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (Throwable e) {
logger.debug("failed to send response on cluster state processed", e);
}
}
}

@Override
public void onNewClusterStateFailed(Throwable t) {
try {
channel.sendResponse(t);
} catch (Throwable e) {
logger.debug("failed to send response on cluster state processed", e);
@Override
public void onNewClusterStateFailed(Throwable t) {
try {
channel.sendResponse(t);
} catch (Throwable e) {
logger.debug("failed to send response on cluster state processed", e);
}
}
});
} catch (Exception e) {
logger.warn("unexpected error while processing cluster state version [{}]", e, clusterState.version());
try {
channel.sendResponse(e);
} catch (Throwable e1) {
logger.debug("failed to send response on cluster state processed", e1);
}
});
}
}

@Override
Expand Down
Expand Up @@ -594,7 +594,6 @@ public void testMasterNodeGCs() throws Exception {
* them from following the stale master.
*/
@Test
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elasticsearch/elasticsearch/pull/9963")
public void testStaleMasterNotHijackingMajority() throws Exception {
// TODO: on mac OS multicast threads are shared between nodes and we therefore we can't simulate GC and stop pinging for just one node
// find a way to block thread creation in the generic thread pool to avoid this.
Expand Down Expand Up @@ -648,8 +647,8 @@ public void clusterChanged(ClusterChangedEvent event) {
masterNodeDisruption.startDisrupting();

// Wait for the majority side to get stable
ensureStableCluster(2, majoritySide.get(0));
ensureStableCluster(2, majoritySide.get(1));
assertDifferentMaster(majoritySide.get(0), oldMasterNode);
assertDifferentMaster(majoritySide.get(1), oldMasterNode);

// The old master node is frozen, but here we submit a cluster state update task that doesn't get executed,
// but will be queued and once the old master node un-freezes it gets executed.
Expand Down
Expand Up @@ -19,26 +19,40 @@

package org.elasticsearch.discovery.zen;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.hamcrest.Matchers;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.*;

Expand Down Expand Up @@ -154,4 +168,58 @@ public void clusterChanged(ClusterChangedEvent event) {
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); // wait for all to be processed
assertThat(statesFound, Matchers.hasSize(2));
}

@Test
public void testNodeRejectsClusterStateWithWrongMasterNode() throws Exception {
Settings settings = ImmutableSettings.builder()
.put("discovery.type", "zen")
.build();
List<String> nodeNames = internalCluster().startNodesAsync(2, settings).get();
client().admin().cluster().prepareHealth().setWaitForNodes("2").get();

List<String> nonMasterNodes = new ArrayList<>(nodeNames);
nonMasterNodes.remove(internalCluster().getMasterName());
String noneMasterNode = nonMasterNodes.get(0);

ClusterState state = internalCluster().getInstance(ClusterService.class).state();
DiscoveryNode node = null;
for (DiscoveryNode discoveryNode : state.nodes()) {
if (discoveryNode.name().equals(noneMasterNode)) {
node = discoveryNode;
}
}
assert node != null;

DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(state.nodes())
.put(new DiscoveryNode("abc", new LocalTransportAddress("abc"), Version.CURRENT)).masterNodeId("abc");
ClusterState.Builder builder = ClusterState.builder(state);
builder.nodes(nodes);
BytesStreamOutput bStream = new BytesStreamOutput();
StreamOutput stream = CompressorFactory.defaultCompressor().streamOutput(bStream);
stream.setVersion(node.version());
ClusterState.Builder.writeTo(builder.build(), stream);
stream.close();
BytesReference bytes = bStream.bytes();

final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> reference = new AtomicReference<>();
internalCluster().getInstance(TransportService.class, noneMasterNode).sendRequest(node, PublishClusterStateAction.ACTION_NAME, new BytesTransportRequest(bytes, Version.CURRENT), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {

@Override
public void handleResponse(TransportResponse.Empty response) {
super.handleResponse(response);
latch.countDown();
}

@Override
public void handleException(TransportException exp) {
super.handleException(exp);
reference.set(exp);
latch.countDown();
}
});
latch.await();
assertThat(reference.get(), notNullValue());
assertThat(ExceptionsHelper.detailedMessage(reference.get()), containsString("cluster state from a different master then the current one, rejecting "));
}
}
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.discovery.zen;

import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -29,7 +30,7 @@
import java.util.Queue;

import static org.elasticsearch.discovery.zen.ZenDiscovery.ProcessClusterState;
import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreNewClusterState;
import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.core.IsNull.nullValue;

Expand All @@ -52,13 +53,13 @@ public void testShouldIgnoreNewClusterState() {

currentState.version(2);
newState.version(1);
assertTrue("should ignore, because new state's version is lower to current state's version", shouldIgnoreNewClusterState(logger, currentState.build(), newState.build()));
assertTrue("should ignore, because new state's version is lower to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));
currentState.version(1);
newState.version(1);
assertFalse("should not ignore, because new state's version is equal to current state's version", shouldIgnoreNewClusterState(logger, currentState.build(), newState.build()));
assertFalse("should not ignore, because new state's version is equal to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));
currentState.version(1);
newState.version(2);
assertFalse("should not ignore, because new state's version is higher to current state's version", shouldIgnoreNewClusterState(logger, currentState.build(), newState.build()));
assertFalse("should not ignore, because new state's version is higher to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));

currentNodes = DiscoveryNodes.builder();
currentNodes.masterNodeId("b");
Expand All @@ -71,7 +72,12 @@ public void testShouldIgnoreNewClusterState() {
newState.version(2);
}
currentState.nodes(currentNodes);
assertTrue("should ignore, because current state's master is not equal to new state's master", shouldIgnoreNewClusterState(logger, currentState.build(), newState.build()));
try {
shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build());
fail("should ignore, because current state's master is not equal to new state's master");
} catch (ElasticsearchIllegalStateException e) {
assertThat(e.getMessage(), containsString("cluster state from a different master then the current one, rejecting"));
}

currentNodes = DiscoveryNodes.builder();
currentNodes.masterNodeId(null);
Expand All @@ -84,7 +90,7 @@ public void testShouldIgnoreNewClusterState() {
currentState.version(1);
newState.version(2);
}
assertFalse("should not ignore, because current state doesn't have a master", shouldIgnoreNewClusterState(logger, currentState.build(), newState.build()));
assertFalse("should not ignore, because current state doesn't have a master", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));
}

public void testSelectNextStateToProcess_empty() {
Expand Down