Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use m_m_nodes from Zen1 master for Zen2 bootstrap #37701

Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ private void buildResponse(final ClusterStateRequest request,
ClusterState.Builder builder = ClusterState.builder(currentState.getClusterName());
builder.version(currentState.version());
builder.stateUUID(currentState.stateUUID());
builder.minimumMasterNodesOnPublishingMaster(currentState.getMinimumMasterNodesOnPublishingMaster());

if (request.nodes()) {
builder.nodes(currentState.nodes());
Expand Down
43 changes: 38 additions & 5 deletions server/src/main/java/org/elasticsearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.Version;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand Down Expand Up @@ -178,17 +179,19 @@ default boolean isPrivate() {

private final boolean wasReadFromDiff;

private final int minimumMasterNodesOnPublishingMaster;

// built on demand
private volatile RoutingNodes routingNodes;

public ClusterState(long version, String stateUUID, ClusterState state) {
this(state.clusterName, version, stateUUID, state.metaData(), state.routingTable(), state.nodes(), state.blocks(),
state.customs(), false);
state.customs(), -1, false);
}

public ClusterState(ClusterName clusterName, long version, String stateUUID, MetaData metaData, RoutingTable routingTable,
DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap<String, Custom> customs,
boolean wasReadFromDiff) {
int minimumMasterNodesOnPublishingMaster, boolean wasReadFromDiff) {
this.version = version;
this.stateUUID = stateUUID;
this.clusterName = clusterName;
Expand All @@ -197,6 +200,7 @@ public ClusterState(ClusterName clusterName, long version, String stateUUID, Met
this.nodes = nodes;
this.blocks = blocks;
this.customs = customs;
this.minimumMasterNodesOnPublishingMaster = minimumMasterNodesOnPublishingMaster;
this.wasReadFromDiff = wasReadFromDiff;
}

Expand Down Expand Up @@ -290,6 +294,17 @@ public Set<VotingConfigExclusion> getVotingConfigExclusions() {
return coordinationMetaData().getVotingConfigExclusions();
}

/**
* The node-level `discovery.zen.minimum_master_nodes` setting on the master node that published this cluster state, for use in rolling
* upgrades from 6.x to 7.x. Once all the 6.x master-eligible nodes have left the cluster, the 7.x nodes use this value to determine how
* many master-eligible nodes must be discovered before the cluster can be bootstrapped. Note that this method returns the node-level
* value of this setting, and ignores any cluster-level override that was set via the API. Callers are expected to combine this value
* with any value set in the cluster-level settings. This should be removed once we no longer need support for {@link Version#V_6_7_0}.
*/
public int getMinimumMasterNodesOnPublishingMaster() {
return minimumMasterNodesOnPublishingMaster;
}

// Used for testing and logging to determine how this cluster state was send over the wire
public boolean wasReadFromDiff() {
return wasReadFromDiff;
Expand Down Expand Up @@ -644,7 +659,7 @@ public static class Builder {
private ClusterBlocks blocks = ClusterBlocks.EMPTY_CLUSTER_BLOCK;
private final ImmutableOpenMap.Builder<String, Custom> customs;
private boolean fromDiff;

private int minimumMasterNodesOnPublishingMaster = -1;

public Builder(ClusterState state) {
this.clusterName = state.clusterName;
Expand All @@ -655,6 +670,7 @@ public Builder(ClusterState state) {
this.metaData = state.metaData();
this.blocks = state.blocks();
this.customs = ImmutableOpenMap.builder(state.customs());
this.minimumMasterNodesOnPublishingMaster = state.minimumMasterNodesOnPublishingMaster;
this.fromDiff = false;
}

Expand Down Expand Up @@ -715,6 +731,11 @@ public Builder stateUUID(String uuid) {
return this;
}

public Builder minimumMasterNodesOnPublishingMaster(int minimumMasterNodesOnPublishingMaster) {
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
this.minimumMasterNodesOnPublishingMaster = minimumMasterNodesOnPublishingMaster;
return this;
}

public Builder putCustom(String type, Custom custom) {
customs.put(type, custom);
return this;
Expand All @@ -739,7 +760,8 @@ public ClusterState build() {
if (UNKNOWN_UUID.equals(uuid)) {
uuid = UUIDs.randomBase64UUID();
}
return new ClusterState(clusterName, version, uuid, metaData, routingTable, nodes, blocks, customs.build(), fromDiff);
return new ClusterState(clusterName, version, uuid, metaData, routingTable, nodes, blocks, customs.build(),
minimumMasterNodesOnPublishingMaster, fromDiff);
}

public static byte[] toBytes(ClusterState state) throws IOException {
Expand Down Expand Up @@ -782,6 +804,7 @@ public static ClusterState readFrom(StreamInput in, DiscoveryNode localNode) thr
Custom customIndexMetaData = in.readNamedWriteable(Custom.class);
builder.putCustom(customIndexMetaData.getWriteableName(), customIndexMetaData);
}
builder.minimumMasterNodesOnPublishingMaster = in.getVersion().onOrAfter(Version.V_7_0_0) ? in.readVInt() : -1;
return builder.build();
}

Expand All @@ -807,6 +830,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(cursor.value);
}
}
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeVInt(minimumMasterNodesOnPublishingMaster);
}
}

private static class ClusterStateDiff implements Diff<ClusterState> {
Expand All @@ -829,6 +855,8 @@ private static class ClusterStateDiff implements Diff<ClusterState> {

private final Diff<ImmutableOpenMap<String, Custom>> customs;

private final int minimumMasterNodesOnPublishingMaster;

ClusterStateDiff(ClusterState before, ClusterState after) {
fromUuid = before.stateUUID;
toUuid = after.stateUUID;
Expand All @@ -839,6 +867,7 @@ private static class ClusterStateDiff implements Diff<ClusterState> {
metaData = after.metaData.diff(before.metaData);
blocks = after.blocks.diff(before.blocks);
customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER);
minimumMasterNodesOnPublishingMaster = after.minimumMasterNodesOnPublishingMaster;
}

ClusterStateDiff(StreamInput in, DiscoveryNode localNode) throws IOException {
Expand All @@ -851,6 +880,7 @@ private static class ClusterStateDiff implements Diff<ClusterState> {
metaData = MetaData.readDiffFrom(in);
blocks = ClusterBlocks.readDiffFrom(in);
customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER);
minimumMasterNodesOnPublishingMaster = in.getVersion().onOrAfter(Version.V_7_0_0) ? in.readVInt() : -1;
}

@Override
Expand All @@ -864,6 +894,9 @@ public void writeTo(StreamOutput out) throws IOException {
metaData.writeTo(out);
blocks.writeTo(out);
customs.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeVInt(minimumMasterNodesOnPublishingMaster);
}
}

@Override
Expand All @@ -883,9 +916,9 @@ public ClusterState apply(ClusterState state) {
builder.metaData(metaData.apply(state.metaData));
builder.blocks(blocks.apply(state.blocks));
builder.customs(customs.apply(state.customs));
builder.minimumMasterNodesOnPublishingMaster(minimumMasterNodesOnPublishingMaster);
builder.fromDiff(true);
return builder.build();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService, this::getFoundPeers,
this::isInitialConfigurationSet, this::setInitialConfiguration);
this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, clusterSettings, transportService,
this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, transportService,
this::isInitialConfigurationSet, joinHelper, peerFinder::getFoundPeers, this::setInitialConfiguration);
this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),
transportService::getLocalNode);
Expand Down Expand Up @@ -454,7 +454,7 @@ void becomeCandidate(String method) {
clusterFormationFailureHelper.start();

if (getCurrentTerm() == ZEN1_BWC_TERM) {
discoveryUpgradeService.activate(lastKnownLeader);
discoveryUpgradeService.activate(lastKnownLeader, coordinationState.get().getLastAcceptedState());
}

leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -60,6 +60,7 @@
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
import static org.elasticsearch.cluster.ClusterState.UNKNOWN_VERSION;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
import static org.elasticsearch.discovery.zen.ZenDiscovery.PING_TIMEOUT_SETTING;

/**
Expand All @@ -80,7 +81,12 @@ public class DiscoveryUpgradeService {
public static final Setting<Boolean> ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING =
Setting.boolSetting("discovery.zen.unsafe_rolling_upgrades_enabled", true, Setting.Property.NodeScope);

private final ElectMasterService electMasterService;
/**
* Dummy {@link ElectMasterService} that is only used to choose the best 6.x master from the discovered nodes, ignoring the
* `minimum_master_nodes` setting.
*/
private static final ElectMasterService electMasterService = new ElectMasterService(Settings.EMPTY);

private final TransportService transportService;
private final BooleanSupplier isBootstrappedSupplier;
private final JoinHelper joinHelper;
Expand All @@ -93,12 +99,11 @@ public class DiscoveryUpgradeService {
@Nullable // null if no active joining round
private volatile JoiningRound joiningRound;

public DiscoveryUpgradeService(Settings settings, ClusterSettings clusterSettings, TransportService transportService,
public DiscoveryUpgradeService(Settings settings, TransportService transportService,
BooleanSupplier isBootstrappedSupplier, JoinHelper joinHelper,
Supplier<Iterable<DiscoveryNode>> peersSupplier,
Consumer<VotingConfiguration> initialConfigurationConsumer) {
assert Version.CURRENT.major == Version.V_6_6_0.major + 1 : "remove this service once unsafe upgrades are no longer needed";
electMasterService = new ElectMasterService(settings);
this.transportService = transportService;
this.isBootstrappedSupplier = isBootstrappedSupplier;
this.joinHelper = joinHelper;
Expand All @@ -107,12 +112,9 @@ public DiscoveryUpgradeService(Settings settings, ClusterSettings clusterSetting
this.bwcPingTimeout = BWC_PING_TIMEOUT_SETTING.get(settings);
this.enableUnsafeBootstrappingOnUpgrade = ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING.get(settings);
this.clusterName = CLUSTER_NAME_SETTING.get(settings);

clusterSettings.addSettingsUpdateConsumer(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
electMasterService::minimumMasterNodes); // TODO reject update if the new value is too large
}

public void activate(Optional<DiscoveryNode> lastKnownLeader) {
public void activate(Optional<DiscoveryNode> lastKnownLeader, ClusterState lastAcceptedClusterState) {
// called under coordinator mutex

if (isBootstrappedSupplier.getAsBoolean()) {
Expand All @@ -122,8 +124,13 @@ public void activate(Optional<DiscoveryNode> lastKnownLeader) {
assert lastKnownLeader.isPresent() == false || Coordinator.isZen1Node(lastKnownLeader.get()) : lastKnownLeader;
// if there was a leader and it's not a old node then we must have been bootstrapped

final Settings dynamicSettings = lastAcceptedClusterState.metaData().settings();
final int minimumMasterNodes = DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(dynamicSettings)
? DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(dynamicSettings)
: lastAcceptedClusterState.getMinimumMasterNodesOnPublishingMaster();

assert joiningRound == null : joiningRound;
joiningRound = new JoiningRound(lastKnownLeader.isPresent());
joiningRound = new JoiningRound(enableUnsafeBootstrappingOnUpgrade && lastKnownLeader.isPresent(), minimumMasterNodes);
joiningRound.scheduleNextAttempt();
}

Expand Down Expand Up @@ -160,15 +167,21 @@ void countDown() {

private class JoiningRound {
private final boolean upgrading;
private final int minimumMasterNodes;

JoiningRound(boolean upgrading) {
JoiningRound(boolean upgrading, int minimumMasterNodes) {
this.upgrading = upgrading;
this.minimumMasterNodes = minimumMasterNodes;
}

private boolean isRunning() {
return joiningRound == this && isBootstrappedSupplier.getAsBoolean() == false;
}

private boolean canBootstrap(Set<DiscoveryNode> discoveryNodes) {
return upgrading && minimumMasterNodes <= discoveryNodes.stream().filter(DiscoveryNode::isMasterNode).count();
}

void scheduleNextAttempt() {
if (isRunning() == false) {
return;
Expand All @@ -189,26 +202,22 @@ public void run() {
// this set of nodes is reasonably fresh - the PeerFinder cleans up nodes to which the transport service is not
// connected each time it wakes up (every second by default)

logger.debug("nodes: {}", discoveryNodes);

if (electMasterService.hasEnoughMasterNodes(discoveryNodes)) {
if (discoveryNodes.stream().anyMatch(Coordinator::isZen1Node)) {
electBestOldMaster(discoveryNodes);
} else if (upgrading && enableUnsafeBootstrappingOnUpgrade) {
// no Zen1 nodes found, but the last-known master was a Zen1 node, so this is a rolling upgrade
transportService.getThreadPool().generic().execute(() -> {
try {
initialConfigurationConsumer.accept(new VotingConfiguration(discoveryNodes.stream()
.map(DiscoveryNode::getId).collect(Collectors.toSet())));
} catch (Exception e) {
logger.debug("exception during bootstrapping upgrade, retrying", e);
} finally {
scheduleNextAttempt();
}
});
} else {
scheduleNextAttempt();
}
logger.debug("upgrading={}, minimumMasterNodes={}, nodes={}", upgrading, minimumMasterNodes, discoveryNodes);

if (discoveryNodes.stream().anyMatch(Coordinator::isZen1Node)) {
electBestOldMaster(discoveryNodes);
} else if (canBootstrap(discoveryNodes)) {
// no Zen1 nodes found, but the last-known master was a Zen1 node, so this is a rolling upgrade
transportService.getThreadPool().generic().execute(() -> {
try {
initialConfigurationConsumer.accept(new VotingConfiguration(discoveryNodes.stream()
.map(DiscoveryNode::getId).collect(Collectors.toSet())));
} catch (Exception e) {
logger.debug("exception during bootstrapping upgrade, retrying", e);
} finally {
scheduleNextAttempt();
}
});
} else {
scheduleNextAttempt();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public JoinHelper(Settings settings, AllocationService allocationService, Master
this.masterService = masterService;
this.transportService = transportService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger) {
this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger) {

@Override
public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentState, List<JoinTaskExecutor.Task> joiningTasks)
Expand Down
Loading