Skip to content

Commit

Permalink
[Internal] user node's cluster name as a default for an incoming clus…
Browse files Browse the repository at this point in the history
…ter state who misses it

ClusterState has a reference to the cluster name since version 1.1.0 (df7474b) . However, if the state was sent from a master of an older version, this name can be set to null. This is an unexpected and can cause bugs. The bad part is that it will never correct it self until a full cluster restart where the cluster state is rebuilt using the code of the latest version.

This commit changes the default to the node's cluster name.

Relates to elastic#7386

Closes elastic#7414
  • Loading branch information
bleskes committed Aug 27, 2014
1 parent 9222720 commit 6d2f345
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 15 deletions.
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

Expand Down Expand Up @@ -61,7 +60,7 @@ public RoutingExplanations getExplanations() {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
state = ClusterState.Builder.readFrom(in, null);
state = ClusterState.Builder.readFrom(in, null, null);
readAcknowledged(in);
if (in.getVersion().onOrAfter(Version.V_1_1_0)) {
explanations = RoutingExplanations.readFrom(in);
Expand Down
Expand Up @@ -55,7 +55,7 @@ public ClusterName getClusterName() {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterName = ClusterName.readClusterName(in);
clusterState = ClusterState.Builder.readFrom(in, null);
clusterState = ClusterState.Builder.readFrom(in, null, clusterName);
}

@Override
Expand Down
20 changes: 16 additions & 4 deletions src/main/java/org/elasticsearch/cluster/ClusterState.java
Expand Up @@ -568,8 +568,14 @@ public static byte[] toBytes(ClusterState state) throws IOException {
return os.bytes().toBytes();
}

public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode) throws IOException {
return readFrom(new BytesStreamInput(data, false), localNode);
/**
* @param data input bytes
* @param localNode used to set the local node in the cluster state.
* @param defaultClusterName this cluster name will be used of if the deserialized cluster state does not have a name set
* (which is only introduced in version 1.1.1)
*/
public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode, ClusterName defaultClusterName) throws IOException {
return readFrom(new BytesStreamInput(data, false), localNode, defaultClusterName);
}

public static void writeTo(ClusterState state, StreamOutput out) throws IOException {
Expand All @@ -595,8 +601,14 @@ public static void writeTo(ClusterState state, StreamOutput out) throws IOExcept
}
}

public static ClusterState readFrom(StreamInput in, @Nullable DiscoveryNode localNode) throws IOException {
ClusterName clusterName = null;
/**
* @param in input stream
* @param localNode used to set the local node in the cluster state. can be null.
* @param defaultClusterName this cluster name will be used of receiving a cluster state from a node on version older than 1.1.1
* or if the sending node did not set a cluster name
*/
public static ClusterState readFrom(StreamInput in, @Nullable DiscoveryNode localNode, @Nullable ClusterName defaultClusterName) throws IOException {
ClusterName clusterName = defaultClusterName;
if (in.getVersion().onOrAfter(Version.V_1_1_1)) {
// it might be null even if it comes from a >= 1.1.1 node since it's origin might be an older node
if (in.readBoolean()) {
Expand Down
Expand Up @@ -301,7 +301,7 @@ private void publish(LocalDiscovery[] members, ClusterState clusterState, final
if (discovery.master) {
continue;
}
final ClusterState nodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode);
final ClusterState nodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode, clusterName);
nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
// ignore cluster state messages that do not include "me", not in the game yet...
if (nodeSpecificClusterState.nodes().localNode() != null) {
Expand Down
Expand Up @@ -152,7 +152,7 @@ public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService);
this.nodesFD.addListener(new NodeFailureListener());

this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings);
this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings, clusterName);
this.pingService.setNodesProvider(this);
this.membership = new MembershipAction(settings, transportService, this, new MembershipListener());

Expand Down
Expand Up @@ -146,7 +146,8 @@ class JoinResponse extends TransportResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
// we don't care about cluster name. This cluster state is never used.
clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode(), null);
}

@Override
Expand Down Expand Up @@ -212,7 +213,8 @@ class ValidateJoinRequest extends TransportRequest {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
// cluster name doesn't matter...
clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode(), null);
}

@Override
Expand Down
Expand Up @@ -21,6 +21,7 @@

import com.google.common.collect.Maps;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -61,14 +62,16 @@ static interface NewStateProcessed {
private final DiscoveryNodesProvider nodesProvider;
private final NewClusterStateListener listener;
private final DiscoverySettings discoverySettings;
private final ClusterName clusterName;

public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider,
NewClusterStateListener listener, DiscoverySettings discoverySettings) {
NewClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) {
super(settings);
this.transportService = transportService;
this.nodesProvider = nodesProvider;
this.listener = listener;
this.discoverySettings = discoverySettings;
this.clusterName = clusterName;
transportService.registerHandler(PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequestHandler());
}

Expand Down Expand Up @@ -169,7 +172,7 @@ public void messageReceived(BytesTransportRequest request, final TransportChanne
in = CachedStreamInput.cachedHandles(request.bytes().streamInput());
}
in.setVersion(request.version());
ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode(), clusterName);
clusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
logger.debug("received cluster state version {}", clusterState.version());
listener.onNewClusterState(clusterState, new NewClusterStateListener.NewStateProcessed() {
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.cluster.serialization;

import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
Expand Down Expand Up @@ -49,13 +50,14 @@ public void testClusterStateSerialization() throws Exception {

DiscoveryNodes nodes = DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3")).localNodeId("node1").masterNodeId("node2").build();

ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).nodes(nodes).metaData(metaData).routingTable(routingTable).build();
ClusterState clusterState = ClusterState.builder(new ClusterName("clusterName1")).nodes(nodes).metaData(metaData).routingTable(routingTable).build();

AllocationService strategy = createAllocationService();
clusterState = ClusterState.builder(clusterState).routingTable(strategy.reroute(clusterState).routingTable()).build();

ClusterState serializedClusterState = ClusterState.Builder.fromBytes(ClusterState.Builder.toBytes(clusterState), newNode("node1"));
ClusterState serializedClusterState = ClusterState.Builder.fromBytes(ClusterState.Builder.toBytes(clusterState), newNode("node1"), new ClusterName("clusterName2"));

assertThat(serializedClusterState.getClusterName().value(), equalTo(clusterState.getClusterName().value()));
assertThat(serializedClusterState.routingTable().prettyPrint(), equalTo(clusterState.routingTable().prettyPrint()));
}

Expand Down

0 comments on commit 6d2f345

Please sign in to comment.