From bdef2ab8c047e31d84e7fdbe9c843e57802a6291 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 24 Jan 2019 08:57:40 +0000 Subject: [PATCH] Use m_m_nodes from Zen1 master for Zen2 bootstrap (#37701) Today we support a smooth rolling upgrade from Zen1 to Zen2 by automatically bootstrapping the cluster once all the Zen1 nodes have left, as long as the `minimum_master_nodes` count is satisfied. However this means that Zen2 nodes also require the `minimum_master_nodes` setting for this one specific and transient situation. Since nodes only perform this automatic bootstrapping if they previously belonged to a Zen1 cluster, they can keep track of the `minimum_master_nodes` setting from the previous master instead of requiring it to be set on the Zen2 node. --- .../state/TransportClusterStateAction.java | 1 + .../elasticsearch/cluster/ClusterState.java | 43 ++++++++++-- .../cluster/coordination/Coordinator.java | 4 +- .../coordination/DiscoveryUpgradeService.java | 69 +++++++++++-------- .../cluster/coordination/JoinHelper.java | 2 +- .../coordination/JoinTaskExecutor.java | 11 ++- .../discovery/zen/NodeJoinController.java | 6 +- .../discovery/zen/ZenDiscovery.java | 2 +- .../ClusterSerializationTests.java | 6 +- .../MinimumMasterNodesInClusterStateIT.java | 66 ++++++++++++++++++ .../zen/NodeJoinControllerTests.java | 2 +- .../indices/cluster/ClusterStateChanges.java | 2 +- 12 files changed, 168 insertions(+), 46 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/discovery/zen/MinimumMasterNodesInClusterStateIT.java diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java index 9fc35dc7be38a..75dc811f37db6 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -127,6 +127,7 @@ private void buildResponse(final ClusterStateRequest request, ClusterState.Builder builder = ClusterState.builder(currentState.getClusterName()); builder.version(currentState.version()); builder.stateUUID(currentState.stateUUID()); + builder.minimumMasterNodesOnPublishingMaster(currentState.getMinimumMasterNodesOnPublishingMaster()); if (request.nodes()) { builder.nodes(currentState.nodes()); diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java index 8dd7291410ecc..200f5b59d5416 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -23,6 +23,7 @@ import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.elasticsearch.Version; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -178,17 +179,19 @@ default boolean isPrivate() { private final boolean wasReadFromDiff; + private final int minimumMasterNodesOnPublishingMaster; + // built on demand private volatile RoutingNodes routingNodes; public ClusterState(long version, String stateUUID, ClusterState state) { this(state.clusterName, version, stateUUID, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), - state.customs(), false); + state.customs(), -1, false); } public ClusterState(ClusterName clusterName, long version, String stateUUID, MetaData metaData, RoutingTable routingTable, DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap customs, - boolean wasReadFromDiff) { + int minimumMasterNodesOnPublishingMaster, boolean wasReadFromDiff) { this.version = version; this.stateUUID = stateUUID; this.clusterName = clusterName; @@ -197,6 +200,7 @@ public ClusterState(ClusterName clusterName, long version, String stateUUID, Met this.nodes = nodes; this.blocks = blocks; this.customs = customs; + this.minimumMasterNodesOnPublishingMaster = minimumMasterNodesOnPublishingMaster; this.wasReadFromDiff = wasReadFromDiff; } @@ -290,6 +294,17 @@ public Set getVotingConfigExclusions() { return coordinationMetaData().getVotingConfigExclusions(); } + /** + * The node-level `discovery.zen.minimum_master_nodes` setting on the master node that published this cluster state, for use in rolling + * upgrades from 6.x to 7.x. Once all the 6.x master-eligible nodes have left the cluster, the 7.x nodes use this value to determine how + * many master-eligible nodes must be discovered before the cluster can be bootstrapped. Note that this method returns the node-level + * value of this setting, and ignores any cluster-level override that was set via the API. Callers are expected to combine this value + * with any value set in the cluster-level settings. This should be removed once we no longer need support for {@link Version#V_6_7_0}. + */ + public int getMinimumMasterNodesOnPublishingMaster() { + return minimumMasterNodesOnPublishingMaster; + } + // Used for testing and logging to determine how this cluster state was send over the wire public boolean wasReadFromDiff() { return wasReadFromDiff; @@ -644,7 +659,7 @@ public static class Builder { private ClusterBlocks blocks = ClusterBlocks.EMPTY_CLUSTER_BLOCK; private final ImmutableOpenMap.Builder customs; private boolean fromDiff; - + private int minimumMasterNodesOnPublishingMaster = -1; public Builder(ClusterState state) { this.clusterName = state.clusterName; @@ -655,6 +670,7 @@ public Builder(ClusterState state) { this.metaData = state.metaData(); this.blocks = state.blocks(); this.customs = ImmutableOpenMap.builder(state.customs()); + this.minimumMasterNodesOnPublishingMaster = state.minimumMasterNodesOnPublishingMaster; this.fromDiff = false; } @@ -715,6 +731,11 @@ public Builder stateUUID(String uuid) { return this; } + public Builder minimumMasterNodesOnPublishingMaster(int minimumMasterNodesOnPublishingMaster) { + this.minimumMasterNodesOnPublishingMaster = minimumMasterNodesOnPublishingMaster; + return this; + } + public Builder putCustom(String type, Custom custom) { customs.put(type, custom); return this; @@ -739,7 +760,8 @@ public ClusterState build() { if (UNKNOWN_UUID.equals(uuid)) { uuid = UUIDs.randomBase64UUID(); } - return new ClusterState(clusterName, version, uuid, metaData, routingTable, nodes, blocks, customs.build(), fromDiff); + return new ClusterState(clusterName, version, uuid, metaData, routingTable, nodes, blocks, customs.build(), + minimumMasterNodesOnPublishingMaster, fromDiff); } public static byte[] toBytes(ClusterState state) throws IOException { @@ -782,6 +804,7 @@ public static ClusterState readFrom(StreamInput in, DiscoveryNode localNode) thr Custom customIndexMetaData = in.readNamedWriteable(Custom.class); builder.putCustom(customIndexMetaData.getWriteableName(), customIndexMetaData); } + builder.minimumMasterNodesOnPublishingMaster = in.getVersion().onOrAfter(Version.V_7_0_0) ? in.readVInt() : -1; return builder.build(); } @@ -807,6 +830,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeNamedWriteable(cursor.value); } } + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeVInt(minimumMasterNodesOnPublishingMaster); + } } private static class ClusterStateDiff implements Diff { @@ -829,6 +855,8 @@ private static class ClusterStateDiff implements Diff { private final Diff> customs; + private final int minimumMasterNodesOnPublishingMaster; + ClusterStateDiff(ClusterState before, ClusterState after) { fromUuid = before.stateUUID; toUuid = after.stateUUID; @@ -839,6 +867,7 @@ private static class ClusterStateDiff implements Diff { metaData = after.metaData.diff(before.metaData); blocks = after.blocks.diff(before.blocks); customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER); + minimumMasterNodesOnPublishingMaster = after.minimumMasterNodesOnPublishingMaster; } ClusterStateDiff(StreamInput in, DiscoveryNode localNode) throws IOException { @@ -851,6 +880,7 @@ private static class ClusterStateDiff implements Diff { metaData = MetaData.readDiffFrom(in); blocks = ClusterBlocks.readDiffFrom(in); customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER); + minimumMasterNodesOnPublishingMaster = in.getVersion().onOrAfter(Version.V_7_0_0) ? in.readVInt() : -1; } @Override @@ -864,6 +894,9 @@ public void writeTo(StreamOutput out) throws IOException { metaData.writeTo(out); blocks.writeTo(out); customs.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeVInt(minimumMasterNodesOnPublishingMaster); + } } @Override @@ -883,9 +916,9 @@ public ClusterState apply(ClusterState state) { builder.metaData(metaData.apply(state.metaData)); builder.blocks(blocks.apply(state.blocks)); builder.customs(customs.apply(state.customs)); + builder.minimumMasterNodesOnPublishingMaster(minimumMasterNodesOnPublishingMaster); builder.fromDiff(true); return builder.build(); } - } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 4a018c1f78f91..a4e1d3ed8c990 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -168,7 +168,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe this.reconfigurator = new Reconfigurator(settings, clusterSettings); this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService, this::getFoundPeers, this::isInitialConfigurationSet, this::setInitialConfiguration); - this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, clusterSettings, transportService, + this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, transportService, this::isInitialConfigurationSet, joinHelper, peerFinder::getFoundPeers, this::setInitialConfiguration); this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"), transportService::getLocalNode); @@ -467,7 +467,7 @@ void becomeCandidate(String method) { clusterFormationFailureHelper.start(); if (getCurrentTerm() == ZEN1_BWC_TERM) { - discoveryUpgradeService.activate(lastKnownLeader); + discoveryUpgradeService.activate(lastKnownLeader, coordinationState.get().getLastAcceptedState()); } leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java index 496adb65bb6f0..56102704848c8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java @@ -24,11 +24,11 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -60,6 +60,7 @@ import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING; import static org.elasticsearch.cluster.ClusterState.UNKNOWN_VERSION; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; +import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; import static org.elasticsearch.discovery.zen.ZenDiscovery.PING_TIMEOUT_SETTING; /** @@ -80,7 +81,12 @@ public class DiscoveryUpgradeService { public static final Setting ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING = Setting.boolSetting("discovery.zen.unsafe_rolling_upgrades_enabled", true, Setting.Property.NodeScope); - private final ElectMasterService electMasterService; + /** + * Dummy {@link ElectMasterService} that is only used to choose the best 6.x master from the discovered nodes, ignoring the + * `minimum_master_nodes` setting. + */ + private static final ElectMasterService electMasterService = new ElectMasterService(Settings.EMPTY); + private final TransportService transportService; private final BooleanSupplier isBootstrappedSupplier; private final JoinHelper joinHelper; @@ -93,12 +99,11 @@ public class DiscoveryUpgradeService { @Nullable // null if no active joining round private volatile JoiningRound joiningRound; - public DiscoveryUpgradeService(Settings settings, ClusterSettings clusterSettings, TransportService transportService, + public DiscoveryUpgradeService(Settings settings, TransportService transportService, BooleanSupplier isBootstrappedSupplier, JoinHelper joinHelper, Supplier> peersSupplier, Consumer initialConfigurationConsumer) { assert Version.CURRENT.major == Version.V_6_6_0.major + 1 : "remove this service once unsafe upgrades are no longer needed"; - electMasterService = new ElectMasterService(settings); this.transportService = transportService; this.isBootstrappedSupplier = isBootstrappedSupplier; this.joinHelper = joinHelper; @@ -107,12 +112,9 @@ public DiscoveryUpgradeService(Settings settings, ClusterSettings clusterSetting this.bwcPingTimeout = BWC_PING_TIMEOUT_SETTING.get(settings); this.enableUnsafeBootstrappingOnUpgrade = ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING.get(settings); this.clusterName = CLUSTER_NAME_SETTING.get(settings); - - clusterSettings.addSettingsUpdateConsumer(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING, - electMasterService::minimumMasterNodes); // TODO reject update if the new value is too large } - public void activate(Optional lastKnownLeader) { + public void activate(Optional lastKnownLeader, ClusterState lastAcceptedClusterState) { // called under coordinator mutex if (isBootstrappedSupplier.getAsBoolean()) { @@ -122,8 +124,13 @@ public void activate(Optional lastKnownLeader) { assert lastKnownLeader.isPresent() == false || Coordinator.isZen1Node(lastKnownLeader.get()) : lastKnownLeader; // if there was a leader and it's not a old node then we must have been bootstrapped + final Settings dynamicSettings = lastAcceptedClusterState.metaData().settings(); + final int minimumMasterNodes = DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(dynamicSettings) + ? DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(dynamicSettings) + : lastAcceptedClusterState.getMinimumMasterNodesOnPublishingMaster(); + assert joiningRound == null : joiningRound; - joiningRound = new JoiningRound(lastKnownLeader.isPresent()); + joiningRound = new JoiningRound(enableUnsafeBootstrappingOnUpgrade && lastKnownLeader.isPresent(), minimumMasterNodes); joiningRound.scheduleNextAttempt(); } @@ -160,15 +167,21 @@ void countDown() { private class JoiningRound { private final boolean upgrading; + private final int minimumMasterNodes; - JoiningRound(boolean upgrading) { + JoiningRound(boolean upgrading, int minimumMasterNodes) { this.upgrading = upgrading; + this.minimumMasterNodes = minimumMasterNodes; } private boolean isRunning() { return joiningRound == this && isBootstrappedSupplier.getAsBoolean() == false; } + private boolean canBootstrap(Set discoveryNodes) { + return upgrading && minimumMasterNodes <= discoveryNodes.stream().filter(DiscoveryNode::isMasterNode).count(); + } + void scheduleNextAttempt() { if (isRunning() == false) { return; @@ -189,26 +202,22 @@ public void run() { // this set of nodes is reasonably fresh - the PeerFinder cleans up nodes to which the transport service is not // connected each time it wakes up (every second by default) - logger.debug("nodes: {}", discoveryNodes); - - if (electMasterService.hasEnoughMasterNodes(discoveryNodes)) { - if (discoveryNodes.stream().anyMatch(Coordinator::isZen1Node)) { - electBestOldMaster(discoveryNodes); - } else if (upgrading && enableUnsafeBootstrappingOnUpgrade) { - // no Zen1 nodes found, but the last-known master was a Zen1 node, so this is a rolling upgrade - transportService.getThreadPool().generic().execute(() -> { - try { - initialConfigurationConsumer.accept(new VotingConfiguration(discoveryNodes.stream() - .map(DiscoveryNode::getId).collect(Collectors.toSet()))); - } catch (Exception e) { - logger.debug("exception during bootstrapping upgrade, retrying", e); - } finally { - scheduleNextAttempt(); - } - }); - } else { - scheduleNextAttempt(); - } + logger.debug("upgrading={}, minimumMasterNodes={}, nodes={}", upgrading, minimumMasterNodes, discoveryNodes); + + if (discoveryNodes.stream().anyMatch(Coordinator::isZen1Node)) { + electBestOldMaster(discoveryNodes); + } else if (canBootstrap(discoveryNodes)) { + // no Zen1 nodes found, but the last-known master was a Zen1 node, so this is a rolling upgrade + transportService.getThreadPool().generic().execute(() -> { + try { + initialConfigurationConsumer.accept(new VotingConfiguration(discoveryNodes.stream() + .map(DiscoveryNode::getId).collect(Collectors.toSet()))); + } catch (Exception e) { + logger.debug("exception during bootstrapping upgrade, retrying", e); + } finally { + scheduleNextAttempt(); + } + }); } else { scheduleNextAttempt(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 8c41d7b2eaa52..53fada396fcef 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -90,7 +90,7 @@ public JoinHelper(Settings settings, AllocationService allocationService, Master this.masterService = masterService; this.transportService = transportService; this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); - this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger) { + this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger) { @Override public ClusterTasksResult execute(ClusterState currentState, List joiningTasks) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java index b754e50a945c1..2dcc1022f8d46 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java @@ -29,7 +29,9 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import java.util.ArrayList; @@ -46,6 +48,8 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor secondThirdNodes = internalCluster().startNodes(2); + assertThat(internalCluster().getMasterName(), equalTo(firstNode)); + + final List allNodes = Stream.concat(Stream.of(firstNode), secondThirdNodes.stream()).collect(Collectors.toList()); + for (final String node : allNodes) { + final ClusterState localState = client(node).admin().cluster().state(new ClusterStateRequest().local(true)).get().getState(); + assertThat(localState.getMinimumMasterNodesOnPublishingMaster(), equalTo(1)); + assertThat(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(localState.metaData().settings()), equalTo(2)); + } + + internalCluster().stopRandomNode(nameFilter(firstNode)); + assertThat(internalCluster().getMasterName(), isIn(secondThirdNodes)); + + for (final String node : secondThirdNodes) { + final ClusterState localState = client(node).admin().cluster().state(new ClusterStateRequest().local(true)).get().getState(); + assertThat(localState.getMinimumMasterNodesOnPublishingMaster(), equalTo(2)); + assertThat(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(localState.metaData().settings()), equalTo(2)); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java index a3ae6b07b19c9..35a2173e0aea0 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java @@ -141,7 +141,7 @@ private void setupMasterServiceAndNodeJoinController(ClusterState initialState) throw new IllegalStateException("method setupMasterServiceAndNodeJoinController can only be called once"); } masterService = ClusterServiceUtils.createMasterService(threadPool, initialState); - nodeJoinController = new NodeJoinController(masterService, createAllocationService(Settings.EMPTY), + nodeJoinController = new NodeJoinController(Settings.EMPTY, masterService, createAllocationService(Settings.EMPTY), new ElectMasterService(Settings.EMPTY)); } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 387ba1c3d9653..8a00be28f5eb2 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -213,7 +213,7 @@ allocationService, new AliasValidator(), environment, transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver); nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); - joinTaskExecutor = new JoinTaskExecutor(allocationService, logger); + joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger); } public ClusterState createIndex(ClusterState state, CreateIndexRequest request) {