Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose minimum_master_nodes in cluster state #37811

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -128,6 +128,8 @@ private void buildResponse(final ClusterStateRequest request,
ClusterState.Builder builder = ClusterState.builder(currentState.getClusterName());
builder.version(currentState.version());
builder.stateUUID(currentState.stateUUID());
builder.minimumMasterNodesOnPublishingMaster(currentState.getMinimumMasterNodesOnPublishingMaster());

if (request.nodes()) {
builder.nodes(currentState.nodes());
}
Expand Down
44 changes: 40 additions & 4 deletions server/src/main/java/org/elasticsearch/cluster/ClusterState.java
Expand Up @@ -22,6 +22,8 @@
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.Version;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand Down Expand Up @@ -172,16 +174,19 @@ default boolean isPrivate() {

private final boolean wasReadFromDiff;

private final int minimumMasterNodesOnPublishingMaster;

// built on demand
private volatile RoutingNodes routingNodes;

public ClusterState(long version, String stateUUID, ClusterState state) {
this(state.clusterName, version, stateUUID, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), state.customs(),
false);
-1, false);
}

public ClusterState(ClusterName clusterName, long version, String stateUUID, MetaData metaData, RoutingTable routingTable,
DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap<String, Custom> customs, boolean wasReadFromDiff) {
DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap<String, Custom> customs,
int minimumMasterNodesOnPublishingMaster, boolean wasReadFromDiff) {
this.version = version;
this.stateUUID = stateUUID;
this.clusterName = clusterName;
Expand All @@ -190,6 +195,7 @@ public ClusterState(ClusterName clusterName, long version, String stateUUID, Met
this.nodes = nodes;
this.blocks = blocks;
this.customs = customs;
this.minimumMasterNodesOnPublishingMaster = minimumMasterNodesOnPublishingMaster;
this.wasReadFromDiff = wasReadFromDiff;
}

Expand Down Expand Up @@ -257,6 +263,17 @@ public ClusterName getClusterName() {
return this.clusterName;
}

/**
* The node-level `discovery.zen.minimum_master_nodes` setting on the master node that published this cluster state, for use in rolling
* upgrades from 6.x to 7.x. Once all the 6.x master-eligible nodes have left the cluster, the 7.x nodes use this value to determine how
* many master-eligible nodes must be discovered before the cluster can be bootstrapped. Note that this method returns the node-level
* value of this setting, and ignores any cluster-level override that was set via the API. Callers are expected to combine this value
* with any value set in the cluster-level settings. This should be removed once we no longer need support for {@link Version#V_6_7_0}.
*/
public int getMinimumMasterNodesOnPublishingMaster() {
return minimumMasterNodesOnPublishingMaster;
}

// Used for testing and logging to determine how this cluster state was send over the wire
public boolean wasReadFromDiff() {
return wasReadFromDiff;
Expand Down Expand Up @@ -598,7 +615,7 @@ public static class Builder {
private ClusterBlocks blocks = ClusterBlocks.EMPTY_CLUSTER_BLOCK;
private final ImmutableOpenMap.Builder<String, Custom> customs;
private boolean fromDiff;

private int minimumMasterNodesOnPublishingMaster = -1;

public Builder(ClusterState state) {
this.clusterName = state.clusterName;
Expand All @@ -609,6 +626,7 @@ public Builder(ClusterState state) {
this.metaData = state.metaData();
this.blocks = state.blocks();
this.customs = ImmutableOpenMap.builder(state.customs());
this.minimumMasterNodesOnPublishingMaster = state.minimumMasterNodesOnPublishingMaster;
this.fromDiff = false;
}

Expand Down Expand Up @@ -669,6 +687,11 @@ public Builder stateUUID(String uuid) {
return this;
}

public Builder minimumMasterNodesOnPublishingMaster(int minimumMasterNodesOnPublishingMaster) {
this.minimumMasterNodesOnPublishingMaster = minimumMasterNodesOnPublishingMaster;
return this;
}

public Builder putCustom(String type, Custom custom) {
customs.put(type, custom);
return this;
Expand All @@ -693,7 +716,8 @@ public ClusterState build() {
if (UNKNOWN_UUID.equals(uuid)) {
uuid = UUIDs.randomBase64UUID();
}
return new ClusterState(clusterName, version, uuid, metaData, routingTable, nodes, blocks, customs.build(), fromDiff);
return new ClusterState(clusterName, version, uuid, metaData, routingTable, nodes, blocks, customs.build(),
minimumMasterNodesOnPublishingMaster, fromDiff);
}

public static byte[] toBytes(ClusterState state) throws IOException {
Expand Down Expand Up @@ -736,6 +760,7 @@ public static ClusterState readFrom(StreamInput in, DiscoveryNode localNode) thr
Custom customIndexMetaData = in.readNamedWriteable(Custom.class);
builder.putCustom(customIndexMetaData.getWriteableName(), customIndexMetaData);
}
builder.minimumMasterNodesOnPublishingMaster = in.getVersion().onOrAfter(Version.V_6_7_0) ? in.readVInt() : -1;
return builder.build();
}

Expand All @@ -761,6 +786,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(cursor.value);
}
}
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
out.writeVInt(minimumMasterNodesOnPublishingMaster);
}
}

private static class ClusterStateDiff implements Diff<ClusterState> {
Expand All @@ -783,6 +811,8 @@ private static class ClusterStateDiff implements Diff<ClusterState> {

private final Diff<ImmutableOpenMap<String, Custom>> customs;

private final int minimumMasterNodesOnPublishingMaster;

ClusterStateDiff(ClusterState before, ClusterState after) {
fromUuid = before.stateUUID;
toUuid = after.stateUUID;
Expand All @@ -793,6 +823,7 @@ private static class ClusterStateDiff implements Diff<ClusterState> {
metaData = after.metaData.diff(before.metaData);
blocks = after.blocks.diff(before.blocks);
customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER);
minimumMasterNodesOnPublishingMaster = after.minimumMasterNodesOnPublishingMaster;
}

ClusterStateDiff(StreamInput in, DiscoveryNode localNode) throws IOException {
Expand All @@ -805,6 +836,7 @@ private static class ClusterStateDiff implements Diff<ClusterState> {
metaData = MetaData.readDiffFrom(in);
blocks = ClusterBlocks.readDiffFrom(in);
customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER);
minimumMasterNodesOnPublishingMaster = in.getVersion().onOrAfter(Version.V_6_7_0) ? in.readVInt() : -1;
}

@Override
Expand All @@ -818,6 +850,9 @@ public void writeTo(StreamOutput out) throws IOException {
metaData.writeTo(out);
blocks.writeTo(out);
customs.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
out.writeVInt(minimumMasterNodesOnPublishingMaster);
}
}

@Override
Expand All @@ -837,6 +872,7 @@ public ClusterState apply(ClusterState state) {
builder.metaData(metaData.apply(state.metaData));
builder.blocks(blocks.apply(state.blocks));
builder.customs(customs.apply(state.customs));
builder.minimumMasterNodesOnPublishingMaster(minimumMasterNodesOnPublishingMaster);
builder.fromDiff(true);
return builder.build();
}
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoverySettings;
Expand Down Expand Up @@ -67,9 +68,10 @@ public class NodeJoinController {
private ElectionContext electionContext = null;


public NodeJoinController(MasterService masterService, AllocationService allocationService, ElectMasterService electMaster) {
public NodeJoinController(Settings settings, MasterService masterService, AllocationService allocationService,
ElectMasterService electMaster) {
this.masterService = masterService;
joinTaskExecutor = new JoinTaskExecutor(allocationService, electMaster, logger);
joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, electMaster, logger);
}

/**
Expand Down Expand Up @@ -410,10 +412,14 @@ public static class JoinTaskExecutor implements ClusterStateTaskExecutor<Discove

private final Logger logger;

public JoinTaskExecutor(AllocationService allocationService, ElectMasterService electMasterService, Logger logger) {
private final int minimumMasterNodesOnLocalNode;

public JoinTaskExecutor(Settings settings, AllocationService allocationService, ElectMasterService electMasterService,
Logger logger) {
this.allocationService = allocationService;
this.electMasterService = electMasterService;
this.logger = logger;
minimumMasterNodesOnLocalNode = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
}

@Override
Expand Down Expand Up @@ -509,7 +515,9 @@ private ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState cu
// or removed by us above
ClusterState tmpState = ClusterState.builder(currentState).nodes(nodesBuilder).blocks(ClusterBlocks.builder()
.blocks(currentState.blocks())
.removeGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID)).build();
.removeGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID))
.minimumMasterNodesOnPublishingMaster(minimumMasterNodesOnLocalNode)
.build();
tmpState = PersistentTasksCustomMetaData.disassociateDeadNodes(tmpState);
return ClusterState.builder(allocationService.disassociateDeadNodes(tmpState, false,
"removed dead nodes on election"));
Expand Down
Expand Up @@ -219,7 +219,7 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t
this.membership = new MembershipAction(transportService, new MembershipListener(), onJoinValidators);
this.joinThreadControl = new JoinThreadControl();

this.nodeJoinController = new NodeJoinController(masterService, allocationService, electMaster);
this.nodeJoinController = new NodeJoinController(settings, masterService, allocationService, electMaster);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, electMaster, this::submitRejoin, logger);

masterService.setClusterStateSupplier(this::clusterState);
Expand Down
Expand Up @@ -67,7 +67,8 @@ public void testClusterStateSerialization() throws Exception {
.add(newNode("node3")).localNodeId("node1").masterNodeId("node2").build();

ClusterState clusterState = ClusterState.builder(new ClusterName("clusterName1"))
.nodes(nodes).metaData(metaData).routingTable(routingTable).build();
.nodes(nodes).metaData(metaData).routingTable(routingTable)
.minimumMasterNodesOnPublishingMaster(randomIntBetween(-1, 10)).build();

AllocationService strategy = createAllocationService();
clusterState = ClusterState.builder(clusterState).routingTable(strategy.reroute(clusterState, "reroute").routingTable()).build();
Expand All @@ -78,6 +79,9 @@ public void testClusterStateSerialization() throws Exception {
assertThat(serializedClusterState.getClusterName().value(), equalTo(clusterState.getClusterName().value()));

assertThat(serializedClusterState.routingTable().toString(), equalTo(clusterState.routingTable().toString()));

assertThat(serializedClusterState.getMinimumMasterNodesOnPublishingMaster(),
equalTo(clusterState.getMinimumMasterNodesOnPublishingMaster()));
}

public void testRoutingTableSerialization() throws Exception {
Expand Down
@@ -0,0 +1,66 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen;

import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
import static org.elasticsearch.test.InternalTestCluster.nameFilter;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isIn;

@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0)
public class MinimumMasterNodesInClusterStateIT extends ESIntegTestCase {

public void testMasterPublishes() throws Exception {
final String firstNode = internalCluster().startNode();

{
final ClusterState localState
= client(firstNode).admin().cluster().state(new ClusterStateRequest().local(true)).get().getState();
assertThat(localState.getMinimumMasterNodesOnPublishingMaster(), equalTo(1));
assertFalse(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(localState.metaData().settings()));
}

final List<String> secondThirdNodes = internalCluster().startNodes(2);
assertThat(internalCluster().getMasterName(), equalTo(firstNode));

final List<String> allNodes = Stream.concat(Stream.of(firstNode), secondThirdNodes.stream()).collect(Collectors.toList());
for (final String node : allNodes) {
final ClusterState localState = client(node).admin().cluster().state(new ClusterStateRequest().local(true)).get().getState();
assertThat(localState.getMinimumMasterNodesOnPublishingMaster(), equalTo(1));
assertThat(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(localState.metaData().settings()), equalTo(2));
}

internalCluster().stopRandomNode(nameFilter(firstNode));
assertThat(internalCluster().getMasterName(), isIn(secondThirdNodes));

for (final String node : secondThirdNodes) {
final ClusterState localState = client(node).admin().cluster().state(new ClusterStateRequest().local(true)).get().getState();
assertThat(localState.getMinimumMasterNodesOnPublishingMaster(), equalTo(2));
assertThat(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(localState.metaData().settings()), equalTo(2));
}
}
}
Expand Up @@ -141,7 +141,7 @@ private void setupMasterServiceAndNodeJoinController(ClusterState initialState)
throw new IllegalStateException("method setupMasterServiceAndNodeJoinController can only be called once");
}
masterService = ClusterServiceUtils.createMasterService(threadPool, initialState);
nodeJoinController = new NodeJoinController(masterService, createAllocationService(Settings.EMPTY),
nodeJoinController = new NodeJoinController(Settings.EMPTY, masterService, createAllocationService(Settings.EMPTY),
new ElectMasterService(Settings.EMPTY));
}

Expand Down
Expand Up @@ -215,7 +215,7 @@ allocationService, new AliasValidator(), environment,
ElectMasterService electMasterService = new ElectMasterService(SETTINGS);
nodeRemovalExecutor = new ZenDiscovery.NodeRemovalClusterStateTaskExecutor(allocationService, electMasterService,
s -> { throw new AssertionError("rejoin not implemented"); }, logger);
joinTaskExecutor = new NodeJoinController.JoinTaskExecutor(allocationService, electMasterService, logger);
joinTaskExecutor = new NodeJoinController.JoinTaskExecutor(Settings.EMPTY, allocationService, electMasterService, logger);
}

public ClusterState createIndex(ClusterState state, CreateIndexRequest request) {
Expand Down