From 6d2f34575672d81beaaf44695306a3c668025912 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 22 Aug 2014 22:25:58 +0200 Subject: [PATCH] [Internal] user node's cluster name as a default for an incoming cluster state who misses it ClusterState has a reference to the cluster name since version 1.1.0 (df7474b9fcf849bbfea4222c1d2aa58b6669e52a) . However, if the state was sent from a master of an older version, this name can be set to null. This is an unexpected and can cause bugs. The bad part is that it will never correct it self until a full cluster restart where the cluster state is rebuilt using the code of the latest version. This commit changes the default to the node's cluster name. Relates to #7386 Closes #7414 --- .../reroute/ClusterRerouteResponse.java | 3 +-- .../cluster/state/ClusterStateResponse.java | 2 +- .../elasticsearch/cluster/ClusterState.java | 20 +++++++++++++++---- .../discovery/local/LocalDiscovery.java | 2 +- .../discovery/zen/ZenDiscovery.java | 2 +- .../zen/membership/MembershipAction.java | 6 ++++-- .../publish/PublishClusterStateAction.java | 7 +++++-- .../ClusterSerializationTests.java | 6 ++++-- 8 files changed, 33 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponse.java b/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponse.java index 55fe2893663b3..6e266f21a1a68 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponse.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.allocation.RoutingExplanations; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -61,7 +60,7 @@ public RoutingExplanations getExplanations() { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - state = ClusterState.Builder.readFrom(in, null); + state = ClusterState.Builder.readFrom(in, null, null); readAcknowledged(in); if (in.getVersion().onOrAfter(Version.V_1_1_0)) { explanations = RoutingExplanations.readFrom(in); diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java b/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java index 9ada2716e27cf..7d88c703b3ddd 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java @@ -55,7 +55,7 @@ public ClusterName getClusterName() { public void readFrom(StreamInput in) throws IOException { super.readFrom(in); clusterName = ClusterName.readClusterName(in); - clusterState = ClusterState.Builder.readFrom(in, null); + clusterState = ClusterState.Builder.readFrom(in, null, clusterName); } @Override diff --git a/src/main/java/org/elasticsearch/cluster/ClusterState.java b/src/main/java/org/elasticsearch/cluster/ClusterState.java index d06a2b58989e9..9a4c503012eb0 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -568,8 +568,14 @@ public static byte[] toBytes(ClusterState state) throws IOException { return os.bytes().toBytes(); } - public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode) throws IOException { - return readFrom(new BytesStreamInput(data, false), localNode); + /** + * @param data input bytes + * @param localNode used to set the local node in the cluster state. + * @param defaultClusterName this cluster name will be used of if the deserialized cluster state does not have a name set + * (which is only introduced in version 1.1.1) + */ + public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode, ClusterName defaultClusterName) throws IOException { + return readFrom(new BytesStreamInput(data, false), localNode, defaultClusterName); } public static void writeTo(ClusterState state, StreamOutput out) throws IOException { @@ -595,8 +601,14 @@ public static void writeTo(ClusterState state, StreamOutput out) throws IOExcept } } - public static ClusterState readFrom(StreamInput in, @Nullable DiscoveryNode localNode) throws IOException { - ClusterName clusterName = null; + /** + * @param in input stream + * @param localNode used to set the local node in the cluster state. can be null. + * @param defaultClusterName this cluster name will be used of receiving a cluster state from a node on version older than 1.1.1 + * or if the sending node did not set a cluster name + */ + public static ClusterState readFrom(StreamInput in, @Nullable DiscoveryNode localNode, @Nullable ClusterName defaultClusterName) throws IOException { + ClusterName clusterName = defaultClusterName; if (in.getVersion().onOrAfter(Version.V_1_1_1)) { // it might be null even if it comes from a >= 1.1.1 node since it's origin might be an older node if (in.readBoolean()) { diff --git a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java index f052ffef51282..31ae57a7cf8a7 100644 --- a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java @@ -301,7 +301,7 @@ private void publish(LocalDiscovery[] members, ClusterState clusterState, final if (discovery.master) { continue; } - final ClusterState nodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode); + final ClusterState nodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode, clusterName); nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED); // ignore cluster state messages that do not include "me", not in the game yet... if (nodeSpecificClusterState.nodes().localNode() != null) { diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 95bdaef7fa6f0..80ee7edb55f74 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -152,7 +152,7 @@ public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService); this.nodesFD.addListener(new NodeFailureListener()); - this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings); + this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings, clusterName); this.pingService.setNodesProvider(this); this.membership = new MembershipAction(settings, transportService, this, new MembershipListener()); diff --git a/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java b/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java index 9bef402e0b7ea..344c14239ed44 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java +++ b/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java @@ -146,7 +146,8 @@ class JoinResponse extends TransportResponse { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); + // we don't care about cluster name. This cluster state is never used. + clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode(), null); } @Override @@ -212,7 +213,8 @@ class ValidateJoinRequest extends TransportRequest { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); + // cluster name doesn't matter... + clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode(), null); } @Override diff --git a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 5ed8133c1d44d..00defddf50ca2 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -21,6 +21,7 @@ import com.google.common.collect.Maps; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesReference; @@ -61,14 +62,16 @@ static interface NewStateProcessed { private final DiscoveryNodesProvider nodesProvider; private final NewClusterStateListener listener; private final DiscoverySettings discoverySettings; + private final ClusterName clusterName; public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, - NewClusterStateListener listener, DiscoverySettings discoverySettings) { + NewClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) { super(settings); this.transportService = transportService; this.nodesProvider = nodesProvider; this.listener = listener; this.discoverySettings = discoverySettings; + this.clusterName = clusterName; transportService.registerHandler(PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequestHandler()); } @@ -169,7 +172,7 @@ public void messageReceived(BytesTransportRequest request, final TransportChanne in = CachedStreamInput.cachedHandles(request.bytes().streamInput()); } in.setVersion(request.version()); - ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); + ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode(), clusterName); clusterState.status(ClusterState.ClusterStateStatus.RECEIVED); logger.debug("received cluster state version {}", clusterState.version()); listener.onNewClusterState(clusterState, new NewClusterStateListener.NewStateProcessed() { diff --git a/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java b/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java index 5db37fa67c90a..a99eb09febe75 100644 --- a/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java +++ b/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.serialization; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -49,13 +50,14 @@ public void testClusterStateSerialization() throws Exception { DiscoveryNodes nodes = DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3")).localNodeId("node1").masterNodeId("node2").build(); - ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).nodes(nodes).metaData(metaData).routingTable(routingTable).build(); + ClusterState clusterState = ClusterState.builder(new ClusterName("clusterName1")).nodes(nodes).metaData(metaData).routingTable(routingTable).build(); AllocationService strategy = createAllocationService(); clusterState = ClusterState.builder(clusterState).routingTable(strategy.reroute(clusterState).routingTable()).build(); - ClusterState serializedClusterState = ClusterState.Builder.fromBytes(ClusterState.Builder.toBytes(clusterState), newNode("node1")); + ClusterState serializedClusterState = ClusterState.Builder.fromBytes(ClusterState.Builder.toBytes(clusterState), newNode("node1"), new ClusterName("clusterName2")); + assertThat(serializedClusterState.getClusterName().value(), equalTo(clusterState.getClusterName().value())); assertThat(serializedClusterState.routingTable().prettyPrint(), equalTo(clusterState.routingTable().prettyPrint())); }