Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use m_m_nodes from Zen1 master for Zen2 bootstrap #37701

Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ private void buildResponse(final ClusterStateRequest request,
ClusterState.Builder builder = ClusterState.builder(currentState.getClusterName());
builder.version(currentState.version());
builder.stateUUID(currentState.stateUUID());
builder.minimumMasterNodesOnPublishingMaster(currentState.getMinimumMasterNodesOnPublishingMaster());

if (request.nodes()) {
builder.nodes(currentState.nodes());
Expand Down
35 changes: 30 additions & 5 deletions server/src/main/java/org/elasticsearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.Version;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand Down Expand Up @@ -178,17 +179,19 @@ default boolean isPrivate() {

private final boolean wasReadFromDiff;

private final int minimumMasterNodesOnPublishingMaster;

// built on demand
private volatile RoutingNodes routingNodes;

public ClusterState(long version, String stateUUID, ClusterState state) {
this(state.clusterName, version, stateUUID, state.metaData(), state.routingTable(), state.nodes(), state.blocks(),
state.customs(), false);
state.customs(), -1, false);
}

public ClusterState(ClusterName clusterName, long version, String stateUUID, MetaData metaData, RoutingTable routingTable,
DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap<String, Custom> customs,
boolean wasReadFromDiff) {
int minimumMasterNodesOnPublishingMaster, boolean wasReadFromDiff) {
this.version = version;
this.stateUUID = stateUUID;
this.clusterName = clusterName;
Expand All @@ -197,6 +200,7 @@ public ClusterState(ClusterName clusterName, long version, String stateUUID, Met
this.nodes = nodes;
this.blocks = blocks;
this.customs = customs;
this.minimumMasterNodesOnPublishingMaster = minimumMasterNodesOnPublishingMaster;
this.wasReadFromDiff = wasReadFromDiff;
}

Expand Down Expand Up @@ -290,6 +294,8 @@ public Set<VotingConfigExclusion> getVotingConfigExclusions() {
return coordinationMetaData().getVotingConfigExclusions();
}

public int getMinimumMasterNodesOnPublishingMaster() { return minimumMasterNodesOnPublishingMaster; }
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved

// Used for testing and logging to determine how this cluster state was send over the wire
public boolean wasReadFromDiff() {
return wasReadFromDiff;
Expand Down Expand Up @@ -436,6 +442,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

if (metrics.contains(Metric.MASTER_NODE)) {
builder.field("master_node", nodes().getMasterNodeId());
builder.field("minimum_master_nodes", minimumMasterNodesOnPublishingMaster);
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
}

if (metrics.contains(Metric.BLOCKS)) {
Expand Down Expand Up @@ -644,7 +651,7 @@ public static class Builder {
private ClusterBlocks blocks = ClusterBlocks.EMPTY_CLUSTER_BLOCK;
private final ImmutableOpenMap.Builder<String, Custom> customs;
private boolean fromDiff;

private int minimumMasterNodesOnPublishingMaster = -1;

public Builder(ClusterState state) {
this.clusterName = state.clusterName;
Expand All @@ -655,6 +662,7 @@ public Builder(ClusterState state) {
this.metaData = state.metaData();
this.blocks = state.blocks();
this.customs = ImmutableOpenMap.builder(state.customs());
this.minimumMasterNodesOnPublishingMaster = state.minimumMasterNodesOnPublishingMaster;
this.fromDiff = false;
}

Expand Down Expand Up @@ -715,6 +723,11 @@ public Builder stateUUID(String uuid) {
return this;
}

public Builder minimumMasterNodesOnPublishingMaster(int minimumMasterNodesOnPublishingMaster) {
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
this.minimumMasterNodesOnPublishingMaster = minimumMasterNodesOnPublishingMaster;
return this;
}

public Builder putCustom(String type, Custom custom) {
customs.put(type, custom);
return this;
Expand All @@ -739,7 +752,8 @@ public ClusterState build() {
if (UNKNOWN_UUID.equals(uuid)) {
uuid = UUIDs.randomBase64UUID();
}
return new ClusterState(clusterName, version, uuid, metaData, routingTable, nodes, blocks, customs.build(), fromDiff);
return new ClusterState(clusterName, version, uuid, metaData, routingTable, nodes, blocks, customs.build(),
minimumMasterNodesOnPublishingMaster, fromDiff);
}

public static byte[] toBytes(ClusterState state) throws IOException {
Expand Down Expand Up @@ -782,6 +796,7 @@ public static ClusterState readFrom(StreamInput in, DiscoveryNode localNode) thr
Custom customIndexMetaData = in.readNamedWriteable(Custom.class);
builder.putCustom(customIndexMetaData.getWriteableName(), customIndexMetaData);
}
builder.minimumMasterNodesOnPublishingMaster = in.getVersion().onOrAfter(Version.V_7_0_0) ? in.readVInt() : -1;
return builder.build();
}

Expand All @@ -807,6 +822,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(cursor.value);
}
}
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeVInt(minimumMasterNodesOnPublishingMaster);
}
}

private static class ClusterStateDiff implements Diff<ClusterState> {
Expand All @@ -829,6 +847,8 @@ private static class ClusterStateDiff implements Diff<ClusterState> {

private final Diff<ImmutableOpenMap<String, Custom>> customs;

private final int minimumMasterNodesOnPublishingMaster;

ClusterStateDiff(ClusterState before, ClusterState after) {
fromUuid = before.stateUUID;
toUuid = after.stateUUID;
Expand All @@ -839,6 +859,7 @@ private static class ClusterStateDiff implements Diff<ClusterState> {
metaData = after.metaData.diff(before.metaData);
blocks = after.blocks.diff(before.blocks);
customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER);
minimumMasterNodesOnPublishingMaster = after.minimumMasterNodesOnPublishingMaster;
}

ClusterStateDiff(StreamInput in, DiscoveryNode localNode) throws IOException {
Expand All @@ -851,6 +872,7 @@ private static class ClusterStateDiff implements Diff<ClusterState> {
metaData = MetaData.readDiffFrom(in);
blocks = ClusterBlocks.readDiffFrom(in);
customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER);
minimumMasterNodesOnPublishingMaster = in.getVersion().onOrAfter(Version.V_7_0_0) ? in.readVInt() : -1;
}

@Override
Expand All @@ -864,6 +886,9 @@ public void writeTo(StreamOutput out) throws IOException {
metaData.writeTo(out);
blocks.writeTo(out);
customs.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeVInt(minimumMasterNodesOnPublishingMaster);
}
}

@Override
Expand All @@ -883,9 +908,9 @@ public ClusterState apply(ClusterState state) {
builder.metaData(metaData.apply(state.metaData));
builder.blocks(blocks.apply(state.blocks));
builder.customs(customs.apply(state.customs));
builder.minimumMasterNodesOnPublishingMaster(minimumMasterNodesOnPublishingMaster);
builder.fromDiff(true);
return builder.build();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ void becomeCandidate(String method) {
clusterFormationFailureHelper.start();

if (getCurrentTerm() == ZEN1_BWC_TERM) {
discoveryUpgradeService.activate(lastKnownLeader);
discoveryUpgradeService.activate(lastKnownLeader, coordinationState.get().getLastAcceptedState());
}

leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -60,6 +61,7 @@
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
import static org.elasticsearch.cluster.ClusterState.UNKNOWN_VERSION;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
import static org.elasticsearch.discovery.zen.ZenDiscovery.PING_TIMEOUT_SETTING;

/**
Expand All @@ -80,7 +82,8 @@ public class DiscoveryUpgradeService {
public static final Setting<Boolean> ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING =
Setting.boolSetting("discovery.zen.unsafe_rolling_upgrades_enabled", true, Setting.Property.NodeScope);

private final ElectMasterService electMasterService;
private static final ElectMasterService permissiveElectMasterService = new ElectMasterService(Settings.EMPTY);
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved

private final TransportService transportService;
private final BooleanSupplier isBootstrappedSupplier;
private final JoinHelper joinHelper;
Expand All @@ -98,7 +101,6 @@ public DiscoveryUpgradeService(Settings settings, ClusterSettings clusterSetting
Supplier<Iterable<DiscoveryNode>> peersSupplier,
Consumer<VotingConfiguration> initialConfigurationConsumer) {
assert Version.CURRENT.major == Version.V_6_6_0.major + 1 : "remove this service once unsafe upgrades are no longer needed";
electMasterService = new ElectMasterService(settings);
this.transportService = transportService;
this.isBootstrappedSupplier = isBootstrappedSupplier;
this.joinHelper = joinHelper;
Expand All @@ -107,12 +109,9 @@ public DiscoveryUpgradeService(Settings settings, ClusterSettings clusterSetting
this.bwcPingTimeout = BWC_PING_TIMEOUT_SETTING.get(settings);
this.enableUnsafeBootstrappingOnUpgrade = ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING.get(settings);
this.clusterName = CLUSTER_NAME_SETTING.get(settings);

clusterSettings.addSettingsUpdateConsumer(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
electMasterService::minimumMasterNodes); // TODO reject update if the new value is too large
}

public void activate(Optional<DiscoveryNode> lastKnownLeader) {
public void activate(Optional<DiscoveryNode> lastKnownLeader, ClusterState lastAcceptedClusterState) {
// called under coordinator mutex

if (isBootstrappedSupplier.getAsBoolean()) {
Expand All @@ -122,8 +121,13 @@ public void activate(Optional<DiscoveryNode> lastKnownLeader) {
assert lastKnownLeader.isPresent() == false || Coordinator.isZen1Node(lastKnownLeader.get()) : lastKnownLeader;
// if there was a leader and it's not a old node then we must have been bootstrapped

final Settings dynamicSettings = lastAcceptedClusterState.metaData().settings();
final int minimumMasterNodes = DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(dynamicSettings)
? DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(dynamicSettings)
: lastAcceptedClusterState.getMinimumMasterNodesOnPublishingMaster();

assert joiningRound == null : joiningRound;
joiningRound = new JoiningRound(lastKnownLeader.isPresent());
joiningRound = new JoiningRound(enableUnsafeBootstrappingOnUpgrade && lastKnownLeader.isPresent(), minimumMasterNodes);
joiningRound.scheduleNextAttempt();
}

Expand Down Expand Up @@ -160,15 +164,22 @@ void countDown() {

private class JoiningRound {
private final boolean upgrading;
private final int minimumMasterNodes;

JoiningRound(boolean upgrading) {
JoiningRound(boolean upgrading, int minimumMasterNodes) {
this.upgrading = upgrading;
this.minimumMasterNodes = minimumMasterNodes;
}

private boolean isRunning() {
return joiningRound == this && isBootstrappedSupplier.getAsBoolean() == false;
}

private boolean canBootstrap(Set<DiscoveryNode> discoveryNodes) {
return upgrading && 1 <= minimumMasterNodes
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
&& minimumMasterNodes <= discoveryNodes.stream().filter(DiscoveryNode::isMasterNode).count();
}

void scheduleNextAttempt() {
if (isRunning() == false) {
return;
Expand All @@ -189,26 +200,22 @@ public void run() {
// this set of nodes is reasonably fresh - the PeerFinder cleans up nodes to which the transport service is not
// connected each time it wakes up (every second by default)

logger.debug("nodes: {}", discoveryNodes);

if (electMasterService.hasEnoughMasterNodes(discoveryNodes)) {
if (discoveryNodes.stream().anyMatch(Coordinator::isZen1Node)) {
electBestOldMaster(discoveryNodes);
} else if (upgrading && enableUnsafeBootstrappingOnUpgrade) {
// no Zen1 nodes found, but the last-known master was a Zen1 node, so this is a rolling upgrade
transportService.getThreadPool().generic().execute(() -> {
try {
initialConfigurationConsumer.accept(new VotingConfiguration(discoveryNodes.stream()
.map(DiscoveryNode::getId).collect(Collectors.toSet())));
} catch (Exception e) {
logger.debug("exception during bootstrapping upgrade, retrying", e);
} finally {
scheduleNextAttempt();
}
});
} else {
scheduleNextAttempt();
}
logger.debug("upgrading={}, minimumMasterNodes={}, nodes={}", upgrading, minimumMasterNodes, discoveryNodes);

if (discoveryNodes.stream().anyMatch(Coordinator::isZen1Node)) {
electBestOldMaster(discoveryNodes);
} else if (canBootstrap(discoveryNodes)) {
// no Zen1 nodes found, but the last-known master was a Zen1 node, so this is a rolling upgrade
transportService.getThreadPool().generic().execute(() -> {
try {
initialConfigurationConsumer.accept(new VotingConfiguration(discoveryNodes.stream()
.map(DiscoveryNode::getId).collect(Collectors.toSet())));
} catch (Exception e) {
logger.debug("exception during bootstrapping upgrade, retrying", e);
} finally {
scheduleNextAttempt();
}
});
} else {
scheduleNextAttempt();
}
Expand All @@ -232,7 +239,7 @@ public void onResponse(Void value) {
// If the only Zen1 nodes left are stale, and we can bootstrap, maybe we should bootstrap?
// Do we ever need to elect a freshly-started Zen1 node?
if (isRunning()) {
final MasterCandidate electedMaster = electMasterService.electMaster(masterCandidates);
final MasterCandidate electedMaster = permissiveElectMasterService.electMaster(masterCandidates);
logger.debug("elected {}, sending join", electedMaster);
joinHelper.sendJoinRequest(electedMaster.getNode(), Optional.empty(),
JoiningRound.this::scheduleNextAttempt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public JoinHelper(Settings settings, AllocationService allocationService, Master
this.masterService = masterService;
this.transportService = transportService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger) {
this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger) {

@Override
public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentState, List<JoinTaskExecutor.Task> joiningTasks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.ElectMasterService;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -45,6 +47,8 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut

private final Logger logger;

private final int minimumMasterNodesOnLocalNode;

public static class Task {

private final DiscoveryNode node;
Expand Down Expand Up @@ -80,9 +84,10 @@ public boolean isFinishElectionTask() {
private static final String FINISH_ELECTION_TASK_REASON = "_FINISH_ELECTION_";
}

public JoinTaskExecutor(AllocationService allocationService, Logger logger) {
public JoinTaskExecutor(Settings settings, AllocationService allocationService, Logger logger) {
this.allocationService = allocationService;
this.logger = logger;
minimumMasterNodesOnLocalNode = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
}

@Override
Expand Down Expand Up @@ -185,7 +190,9 @@ protected ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState
// or removed by us above
ClusterState tmpState = ClusterState.builder(currentState).nodes(nodesBuilder).blocks(ClusterBlocks.builder()
.blocks(currentState.blocks())
.removeGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID)).build();
.removeGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID))
.minimumMasterNodesOnPublishingMaster(minimumMasterNodesOnLocalNode)
.build();
logger.trace("becomeMasterAndTrimConflictingNodes: {}", tmpState.nodes());
return ClusterState.builder(allocationService.deassociateDeadNodes(tmpState, false, "removed dead nodes on election"));
}
Expand Down
Loading