diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponse.java b/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponse.java index 55fe2893663b3..6e266f21a1a68 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponse.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.allocation.RoutingExplanations; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -61,7 +60,7 @@ public RoutingExplanations getExplanations() { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - state = ClusterState.Builder.readFrom(in, null); + state = ClusterState.Builder.readFrom(in, null, null); readAcknowledged(in); if (in.getVersion().onOrAfter(Version.V_1_1_0)) { explanations = RoutingExplanations.readFrom(in); diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java b/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java index 9ada2716e27cf..7d88c703b3ddd 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java @@ -55,7 +55,7 @@ public ClusterName getClusterName() { public void readFrom(StreamInput in) throws IOException { super.readFrom(in); clusterName = ClusterName.readClusterName(in); - clusterState = ClusterState.Builder.readFrom(in, null); + clusterState = ClusterState.Builder.readFrom(in, null, clusterName); } @Override diff --git a/src/main/java/org/elasticsearch/cluster/ClusterState.java b/src/main/java/org/elasticsearch/cluster/ClusterState.java index d06a2b58989e9..9a4c503012eb0 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -568,8 +568,14 @@ public static byte[] toBytes(ClusterState state) throws IOException { return os.bytes().toBytes(); } - public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode) throws IOException { - return readFrom(new BytesStreamInput(data, false), localNode); + /** + * @param data input bytes + * @param localNode used to set the local node in the cluster state. + * @param defaultClusterName this cluster name will be used of if the deserialized cluster state does not have a name set + * (which is only introduced in version 1.1.1) + */ + public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode, ClusterName defaultClusterName) throws IOException { + return readFrom(new BytesStreamInput(data, false), localNode, defaultClusterName); } public static void writeTo(ClusterState state, StreamOutput out) throws IOException { @@ -595,8 +601,14 @@ public static void writeTo(ClusterState state, StreamOutput out) throws IOExcept } } - public static ClusterState readFrom(StreamInput in, @Nullable DiscoveryNode localNode) throws IOException { - ClusterName clusterName = null; + /** + * @param in input stream + * @param localNode used to set the local node in the cluster state. can be null. + * @param defaultClusterName this cluster name will be used of receiving a cluster state from a node on version older than 1.1.1 + * or if the sending node did not set a cluster name + */ + public static ClusterState readFrom(StreamInput in, @Nullable DiscoveryNode localNode, @Nullable ClusterName defaultClusterName) throws IOException { + ClusterName clusterName = defaultClusterName; if (in.getVersion().onOrAfter(Version.V_1_1_1)) { // it might be null even if it comes from a >= 1.1.1 node since it's origin might be an older node if (in.readBoolean()) { diff --git a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java index f052ffef51282..31ae57a7cf8a7 100644 --- a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java @@ -301,7 +301,7 @@ private void publish(LocalDiscovery[] members, ClusterState clusterState, final if (discovery.master) { continue; } - final ClusterState nodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode); + final ClusterState nodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode, clusterName); nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED); // ignore cluster state messages that do not include "me", not in the game yet... if (nodeSpecificClusterState.nodes().localNode() != null) { diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 95bdaef7fa6f0..80ee7edb55f74 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -152,7 +152,7 @@ public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService); this.nodesFD.addListener(new NodeFailureListener()); - this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings); + this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings, clusterName); this.pingService.setNodesProvider(this); this.membership = new MembershipAction(settings, transportService, this, new MembershipListener()); diff --git a/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java b/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java index 9bef402e0b7ea..344c14239ed44 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java +++ b/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java @@ -146,7 +146,8 @@ class JoinResponse extends TransportResponse { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); + // we don't care about cluster name. This cluster state is never used. + clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode(), null); } @Override @@ -212,7 +213,8 @@ class ValidateJoinRequest extends TransportRequest { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); + // cluster name doesn't matter... + clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode(), null); } @Override diff --git a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 5ed8133c1d44d..00defddf50ca2 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -21,6 +21,7 @@ import com.google.common.collect.Maps; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesReference; @@ -61,14 +62,16 @@ static interface NewStateProcessed { private final DiscoveryNodesProvider nodesProvider; private final NewClusterStateListener listener; private final DiscoverySettings discoverySettings; + private final ClusterName clusterName; public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, - NewClusterStateListener listener, DiscoverySettings discoverySettings) { + NewClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) { super(settings); this.transportService = transportService; this.nodesProvider = nodesProvider; this.listener = listener; this.discoverySettings = discoverySettings; + this.clusterName = clusterName; transportService.registerHandler(PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequestHandler()); } @@ -169,7 +172,7 @@ public void messageReceived(BytesTransportRequest request, final TransportChanne in = CachedStreamInput.cachedHandles(request.bytes().streamInput()); } in.setVersion(request.version()); - ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); + ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode(), clusterName); clusterState.status(ClusterState.ClusterStateStatus.RECEIVED); logger.debug("received cluster state version {}", clusterState.version()); listener.onNewClusterState(clusterState, new NewClusterStateListener.NewStateProcessed() { diff --git a/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java b/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java index 5db37fa67c90a..a99eb09febe75 100644 --- a/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java +++ b/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.serialization; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -49,13 +50,14 @@ public void testClusterStateSerialization() throws Exception { DiscoveryNodes nodes = DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3")).localNodeId("node1").masterNodeId("node2").build(); - ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).nodes(nodes).metaData(metaData).routingTable(routingTable).build(); + ClusterState clusterState = ClusterState.builder(new ClusterName("clusterName1")).nodes(nodes).metaData(metaData).routingTable(routingTable).build(); AllocationService strategy = createAllocationService(); clusterState = ClusterState.builder(clusterState).routingTable(strategy.reroute(clusterState).routingTable()).build(); - ClusterState serializedClusterState = ClusterState.Builder.fromBytes(ClusterState.Builder.toBytes(clusterState), newNode("node1")); + ClusterState serializedClusterState = ClusterState.Builder.fromBytes(ClusterState.Builder.toBytes(clusterState), newNode("node1"), new ClusterName("clusterName2")); + assertThat(serializedClusterState.getClusterName().value(), equalTo(clusterState.getClusterName().value())); assertThat(serializedClusterState.routingTable().prettyPrint(), equalTo(clusterState.routingTable().prettyPrint())); }