diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 1ce156b853682..1256021b96e99 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -54,8 +54,8 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing; import org.elasticsearch.env.Environment; diff --git a/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index 040066adeb6b1..b41316b65345d 100644 --- a/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -19,7 +19,6 @@ package org.elasticsearch.discovery; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.common.settings.Setting; @@ -27,8 +26,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.ExtensionPoint; import org.elasticsearch.discovery.local.LocalDiscovery; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.discovery.zen.ping.ZenPingService; import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java b/core/src/main/java/org/elasticsearch/discovery/zen/ElectMasterService.java similarity index 61% rename from core/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java rename to core/src/main/java/org/elasticsearch/discovery/zen/ElectMasterService.java index 3ef9138f933b9..1d11f5cf0f569 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ElectMasterService.java @@ -17,11 +17,10 @@ * under the License. */ -package org.elasticsearch.discovery.zen.elect; +package org.elasticsearch.discovery.zen; import com.carrotsearch.hppc.ObjectContainer; import org.apache.lucene.util.CollectionUtil; -import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; @@ -33,9 +32,11 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Comparator; +import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; /** * @@ -45,17 +46,64 @@ public class ElectMasterService extends AbstractComponent { public static final Setting DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING = Setting.intSetting("discovery.zen.minimum_master_nodes", -1, Property.Dynamic, Property.NodeScope); - // This is the minimum version a master needs to be on, otherwise it gets ignored - // This is based on the minimum compatible version of the current version this node is on - private final Version minMasterVersion; - private final NodeComparator nodeComparator = new NodeComparator(); - private volatile int minimumMasterNodes; + /** + * a class to encapsulate all the information about a candidate in a master election + * that is needed to decided which of the candidates should win + */ + public static class MasterCandidate { + + public static final long UNRECOVERED_CLUSTER_VERSION = -1; + + final DiscoveryNode node; + + final long clusterStateVersion; + + public MasterCandidate(DiscoveryNode node, long clusterStateVersion) { + Objects.requireNonNull(node); + assert clusterStateVersion >= -1 : "got: " + clusterStateVersion; + assert node.isMasterNode(); + this.node = node; + this.clusterStateVersion = clusterStateVersion; + } + + public DiscoveryNode getNode() { + return node; + } + + public long getClusterStateVersion() { + return clusterStateVersion; + } + + @Override + public String toString() { + return "Candidate{" + + "node=" + node + + ", clusterStateVersion=" + clusterStateVersion + + '}'; + } + + /** + * compares two candidates to indicate which the a better master. + * A higher cluster state version is better + * + * @return -1 if c1 is a batter candidate, 1 if c2. + */ + public static int compare(MasterCandidate c1, MasterCandidate c2) { + // we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted + // list, so if c2 has a higher cluster state version, it needs to come first. + int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion); + if (ret == 0) { + ret = compareNodes(c1.getNode(), c2.getNode()); + } + return ret; + } + } + @Inject public ElectMasterService(Settings settings) { super(settings); - this.minMasterVersion = Version.CURRENT.minimumCompatibilityVersion(); this.minimumMasterNodes = DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings); logger.debug("using minimum_master_nodes [{}]", minimumMasterNodes); } @@ -69,16 +117,41 @@ public int minimumMasterNodes() { } public boolean hasEnoughMasterNodes(Iterable nodes) { - if (minimumMasterNodes < 1) { - return true; - } int count = 0; for (DiscoveryNode node : nodes) { if (node.isMasterNode()) { count++; } } - return count >= minimumMasterNodes; + return count > 0 && (minimumMasterNodes < 0 || count >= minimumMasterNodes); + } + + public boolean hasEnoughCandidates(Collection candidates) { + if (candidates.isEmpty()) { + return false; + } + if (minimumMasterNodes < 1) { + return true; + } + assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() : + "duplicates ahead: " + candidates; + return candidates.size() >= minimumMasterNodes; + } + + /** + * Elects a new master out of the possible nodes, returning it. Returns null + * if no master has been elected. + */ + public MasterCandidate electMaster(Collection candidates) { + assert hasEnoughCandidates(candidates); + List sortedCandidates = new ArrayList<>(candidates); + sortedCandidates.sort(MasterCandidate::compare); + return sortedCandidates.get(0); + } + + /** selects the best active master to join, where multiple are discovered */ + public DiscoveryNode tieBreakActiveMasters(Collection activeMasters) { + return activeMasters.stream().min(ElectMasterService::compareNodes).get(); } public boolean hasTooManyMasterNodes(Iterable nodes) { @@ -107,7 +180,7 @@ public void logMinimumMasterNodesWarningIfNecessary(ClusterState oldState, Clust */ public List sortByMasterLikelihood(Iterable nodes) { ArrayList sortedNodes = CollectionUtils.iterableAsArrayList(nodes); - CollectionUtil.introSort(sortedNodes, nodeComparator); + CollectionUtil.introSort(sortedNodes, ElectMasterService::compareNodes); return sortedNodes; } @@ -130,25 +203,6 @@ public DiscoveryNode[] nextPossibleMasters(ObjectContainer nodes, return nextPossibleMasters.toArray(new DiscoveryNode[nextPossibleMasters.size()]); } - /** - * Elects a new master out of the possible nodes, returning it. Returns null - * if no master has been elected. - */ - public DiscoveryNode electMaster(Iterable nodes) { - List sortedNodes = sortedMasterNodes(nodes); - if (sortedNodes == null || sortedNodes.isEmpty()) { - return null; - } - DiscoveryNode masterNode = sortedNodes.get(0); - // Sanity check: maybe we don't end up here, because serialization may have failed. - if (masterNode.getVersion().before(minMasterVersion)) { - logger.warn("ignoring master [{}], because the version [{}] is lower than the minimum compatible version [{}]", masterNode, masterNode.getVersion(), minMasterVersion); - return null; - } else { - return masterNode; - } - } - private List sortedMasterNodes(Iterable nodes) { List possibleNodes = CollectionUtils.iterableAsArrayList(nodes); if (possibleNodes.isEmpty()) { @@ -161,21 +215,18 @@ private List sortedMasterNodes(Iterable nodes) { it.remove(); } } - CollectionUtil.introSort(possibleNodes, nodeComparator); + CollectionUtil.introSort(possibleNodes, ElectMasterService::compareNodes); return possibleNodes; } - private static class NodeComparator implements Comparator { - - @Override - public int compare(DiscoveryNode o1, DiscoveryNode o2) { - if (o1.isMasterNode() && !o2.isMasterNode()) { - return -1; - } - if (!o1.isMasterNode() && o2.isMasterNode()) { - return 1; - } - return o1.getId().compareTo(o2.getId()); + /** master nodes go before other nodes, with a secondary sort by id **/ + private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) { + if (o1.isMasterNode() && !o2.isMasterNode()) { + return -1; + } + if (!o1.isMasterNode() && o2.isMasterNode()) { + return 1; } + return o1.getId().compareTo(o2.getId()); } } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java index 6f0b8966d0916..bf8559fb9495a 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java @@ -41,7 +41,6 @@ import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.membership.MembershipAction; import java.util.ArrayList; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 132505fb40377..43739a2f4106f 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -24,7 +24,6 @@ import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -56,7 +55,6 @@ import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.DiscoveryStats; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.fd.MasterFaultDetection; import org.elasticsearch.discovery.zen.fd.NodesFaultDetection; import org.elasticsearch.discovery.zen.membership.MembershipAction; @@ -76,13 +74,10 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -146,9 +141,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover private final JoinThreadControl joinThreadControl; - /** counts the time this node has joined the cluster or have elected it self as master */ - private final AtomicLong clusterJoinsCounter = new AtomicLong(); - // must initialized in doStart(), when we have the allocationService set private volatile NodeJoinController nodeJoinController; private volatile NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor; @@ -304,8 +296,8 @@ public DiscoveryNodes nodes() { } @Override - public boolean nodeHasJoinedClusterOnce() { - return clusterJoinsCounter.get() > 0; + public ClusterState clusterState() { + return clusterService.state(); } /** end of {@link org.elasticsearch.discovery.zen.ping.PingContextProvider } implementation */ @@ -406,8 +398,6 @@ public void onElectedAsMaster(ClusterState state) { joinThreadControl.markThreadAsDone(currentThread); // we only starts nodesFD if we are master (it may be that we received a cluster state while pinging) nodesFD.updateNodesAndPing(state); // start the nodes FD - long count = clusterJoinsCounter.incrementAndGet(); - logger.trace("cluster joins counter set to [{}] (elected as master)", count); } @Override @@ -764,9 +754,6 @@ public ClusterState execute(ClusterState currentState) { if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) { // its a fresh update from the master as we transition from a start of not having a master to having one logger.debug("got first state from fresh master [{}]", newClusterState.nodes().getMasterNodeId()); - long count = clusterJoinsCounter.incrementAndGet(); - logger.trace("updated cluster join cluster to [{}]", count); - return newClusterState; } @@ -873,16 +860,6 @@ void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final } else if (nodeJoinController == null) { throw new IllegalStateException("discovery module is not yet started"); } else { - // The minimum supported version for a node joining a master: - Version minimumNodeJoinVersion = localNode().getVersion().minimumCompatibilityVersion(); - // Sanity check: maybe we don't end up here, because serialization may have failed. - if (node.getVersion().before(minimumNodeJoinVersion)) { - callback.onFailure( - new IllegalStateException("Can't handle join request from a node with a version [" + node.getVersion() + "] that is lower than the minimum compatible version [" + minimumNodeJoinVersion.minimumCompatibilityVersion() + "]") - ); - return; - } - // try and connect to the node, if it fails, we can raise an exception back to the client... transportService.connectToNode(node); @@ -901,14 +878,14 @@ void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final private DiscoveryNode findMaster() { logger.trace("starting to ping"); - ZenPing.PingResponse[] fullPingResponses = pingService.pingAndWait(pingTimeout); + List fullPingResponses = pingService.pingAndWait(pingTimeout).toList(); if (fullPingResponses == null) { logger.trace("No full ping responses"); return null; } if (logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder(); - if (fullPingResponses.length == 0) { + if (fullPingResponses.size() == 0) { sb.append(" {none}"); } else { for (ZenPing.PingResponse pingResponse : fullPingResponses) { @@ -918,69 +895,57 @@ private DiscoveryNode findMaster() { logger.trace("full ping responses:{}", sb); } + final DiscoveryNode localNode = clusterService.localNode(); + + // add our selves + assert fullPingResponses.stream().map(ZenPing.PingResponse::node) + .filter(n -> n.equals(localNode)).findAny().isPresent() == false; + + fullPingResponses.add(new ZenPing.PingResponse(localNode, null, clusterService.state())); + // filter responses final List pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger); - final DiscoveryNode localNode = clusterService.localNode(); - List pingMasters = new ArrayList<>(); + List activeMasters = new ArrayList<>(); for (ZenPing.PingResponse pingResponse : pingResponses) { - if (pingResponse.master() != null) { - // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without - // any check / verifications from other nodes in ZenDiscover#innerJoinCluster() - if (!localNode.equals(pingResponse.master())) { - pingMasters.add(pingResponse.master()); - } + // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without + // any check / verifications from other nodes in ZenDiscover#innerJoinCluster() + if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) { + activeMasters.add(pingResponse.master()); } } // nodes discovered during pinging - Set activeNodes = new HashSet<>(); - // nodes discovered who has previously been part of the cluster and do not ping for the very first time - Set joinedOnceActiveNodes = new HashSet<>(); - if (localNode.isMasterNode()) { - activeNodes.add(localNode); - long joinsCounter = clusterJoinsCounter.get(); - if (joinsCounter > 0) { - logger.trace("adding local node to the list of active nodes that have previously joined the cluster (joins counter is [{}])", joinsCounter); - joinedOnceActiveNodes.add(localNode); - } - } + List masterCandidates = new ArrayList<>(); for (ZenPing.PingResponse pingResponse : pingResponses) { - activeNodes.add(pingResponse.node()); - if (pingResponse.hasJoinedOnce()) { - joinedOnceActiveNodes.add(pingResponse.node()); + if (pingResponse.node().isMasterNode()) { + masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion())); } } - if (pingMasters.isEmpty()) { - if (electMaster.hasEnoughMasterNodes(activeNodes)) { - // we give preference to nodes who have previously already joined the cluster. Those will - // have a cluster state in memory, including an up to date routing table (which is not persistent to disk - // by the gateway) - DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes); - if (master != null) { - return master; - } - return electMaster.electMaster(activeNodes); + if (activeMasters.isEmpty()) { + if (electMaster.hasEnoughCandidates(masterCandidates)) { + final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates); + logger.trace("candidate {} won election", winner); + return winner.getNode(); } else { // if we don't have enough master nodes, we bail, because there are not enough master to elect from - logger.trace("not enough master nodes [{}]", activeNodes); + logger.trace("not enough master nodes [{}]", masterCandidates); return null; } } else { - - assert !pingMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master"; + assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master"; // lets tie break between discovered nodes - return electMaster.electMaster(pingMasters); + return electMaster.tieBreakActiveMasters(activeMasters); } } - static List filterPingResponses(ZenPing.PingResponse[] fullPingResponses, boolean masterElectionIgnoreNonMasters, Logger logger) { + static List filterPingResponses(List fullPingResponses, boolean masterElectionIgnoreNonMasters, Logger logger) { List pingResponses; if (masterElectionIgnoreNonMasters) { - pingResponses = Arrays.stream(fullPingResponses).filter(ping -> ping.node().isMasterNode()).collect(Collectors.toList()); + pingResponses = fullPingResponses.stream().filter(ping -> ping.node().isMasterNode()).collect(Collectors.toList()); } else { - pingResponses = Arrays.asList(fullPingResponses); + pingResponses = fullPingResponses; } if (logger.isDebugEnabled()) { diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/PingContextProvider.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/PingContextProvider.java index 568bc3ec16d75..0bcc8b37d882a 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/PingContextProvider.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/PingContextProvider.java @@ -19,6 +19,7 @@ package org.elasticsearch.discovery.zen.ping; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; /** @@ -26,7 +27,7 @@ */ public interface PingContextProvider extends DiscoveryNodesProvider { - /** return true if this node has previously joined the cluster at least once. False if this is first join */ - boolean nodeHasJoinedClusterOnce(); + /** return the current cluster state of the node */ + ClusterState clusterState(); } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java index 5a9f5f463e236..b4bb61ad461e3 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java @@ -20,30 +20,42 @@ package org.elasticsearch.discovery.zen.ping; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.zen.ElectMasterService; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; + public interface ZenPing extends LifecycleComponent { void setPingContextProvider(PingContextProvider contextProvider); void ping(PingListener listener, TimeValue timeout); - public interface PingListener { + interface PingListener { - void onPing(PingResponse[] pings); + /** + * called when pinging is done. + * + * @param pings ping result *must + */ + void onPing(Collection pings); } - public static class PingResponse implements Streamable { + class PingResponse implements Streamable { public static final PingResponse[] EMPTY = new PingResponse[0]; @@ -59,29 +71,36 @@ public static class PingResponse implements Streamable { private DiscoveryNode master; - private boolean hasJoinedOnce; + private long clusterStateVersion; private PingResponse() { } /** - * @param node the node which this ping describes - * @param master the current master of the node - * @param clusterName the cluster name of the node - * @param hasJoinedOnce true if the joined has successfully joined the cluster before + * @param node the node which this ping describes + * @param master the current master of the node + * @param clusterName the cluster name of the node + * @param clusterStateVersion the current cluster state version of that node + * ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} for not recovered) */ - public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, boolean hasJoinedOnce) { + public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) { this.id = idGenerator.incrementAndGet(); this.node = node; this.master = master; this.clusterName = clusterName; - this.hasJoinedOnce = hasJoinedOnce; + this.clusterStateVersion = clusterStateVersion; } - /** - * an always increasing unique identifier for this ping response. - * lower values means older pings. - */ + public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterState state) { + this(node, master, state.getClusterName(), + state.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) ? + ElectMasterService.MasterCandidate.UNRECOVERED_CLUSTER_VERSION : state.version()); + } + + /** + * an always increasing unique identifier for this ping response. + * lower values means older pings. + */ public long id() { return this.id; } @@ -100,9 +119,11 @@ public DiscoveryNode master() { return master; } - /** true if the joined has successfully joined the cluster before */ - public boolean hasJoinedOnce() { - return hasJoinedOnce; + /** + * the current cluster state version of that node ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} + * for not recovered) */ + public long getClusterStateVersion() { + return clusterStateVersion; } public static PingResponse readPingResponse(StreamInput in) throws IOException { @@ -118,7 +139,7 @@ public void readFrom(StreamInput in) throws IOException { if (in.readBoolean()) { master = new DiscoveryNode(in); } - this.hasJoinedOnce = in.readBoolean(); + this.clusterStateVersion = in.readLong(); this.id = in.readLong(); } @@ -132,13 +153,14 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(true); master.writeTo(out); } - out.writeBoolean(hasJoinedOnce); + out.writeLong(clusterStateVersion); out.writeLong(id); } @Override public String toString() { - return "ping_response{node [" + node + "], id[" + id + "], master [" + master + "], hasJoinedOnce [" + hasJoinedOnce + "], cluster_name[" + clusterName.value() + "]}"; + return "ping_response{node [" + node + "], id[" + id + "], master [" + master + "], cluster_state_version [" + clusterStateVersion + + "], cluster_name[" + clusterName.value() + "]}"; } } @@ -146,7 +168,7 @@ public String toString() { /** * a utility collection of pings where only the most recent ping is stored per node */ - public static class PingCollection { + class PingCollection { Map pings; @@ -171,15 +193,15 @@ public synchronized boolean addPing(PingResponse ping) { } /** adds multiple pings if newer than previous pings from the same node */ - public synchronized void addPings(PingResponse[] pings) { + public synchronized void addPings(Iterable pings) { for (PingResponse ping : pings) { addPing(ping); } } - /** serialize current pings to an array */ - public synchronized PingResponse[] toArray() { - return pings.values().toArray(new PingResponse[pings.size()]); + /** serialize current pings to a list. It is guaranteed that the list contains one ping response per node */ + public synchronized List toList() { + return new ArrayList<>(pings.values()); } /** the number of nodes for which there are known pings */ diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java index bd5855666aca2..3a2ddc10cfbc8 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java @@ -23,17 +23,15 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicBoolean; -public class ZenPingService extends AbstractLifecycleComponent implements ZenPing { +public class ZenPingService extends AbstractLifecycleComponent { private List zenPings = Collections.emptyList(); @@ -47,7 +45,6 @@ public List zenPings() { return this.zenPings; } - @Override public void setPingContextProvider(PingContextProvider contextProvider) { if (lifecycle.started()) { throw new IllegalStateException("Can't set nodes provider when started"); @@ -78,60 +75,31 @@ protected void doClose() { } } - public PingResponse[] pingAndWait(TimeValue timeout) { - final AtomicReference response = new AtomicReference<>(); - final CountDownLatch latch = new CountDownLatch(1); - ping(new PingListener() { - @Override - public void onPing(PingResponse[] pings) { - response.set(pings); - latch.countDown(); + public ZenPing.PingCollection pingAndWait(TimeValue timeout) { + final ZenPing.PingCollection response = new ZenPing.PingCollection(); + final CountDownLatch latch = new CountDownLatch(zenPings.size()); + for (ZenPing zenPing : zenPings) { + final AtomicBoolean counted = new AtomicBoolean(); + try { + zenPing.ping(pings -> { + response.addPings(pings); + if (counted.compareAndSet(false, true)) { + latch.countDown(); + } + }, timeout); + } catch (Exception ex) { + logger.warn("Ping execution failed", ex); + if (counted.compareAndSet(false, true)) { + latch.countDown(); + } } - }, timeout); + } try { latch.await(); - return response.get(); + return response; } catch (InterruptedException e) { logger.trace("pingAndWait interrupted"); - return null; - } - } - - @Override - public void ping(PingListener listener, TimeValue timeout) { - List zenPings = this.zenPings; - CompoundPingListener compoundPingListener = new CompoundPingListener(listener, zenPings); - for (ZenPing zenPing : zenPings) { - try { - zenPing.ping(compoundPingListener, timeout); - } catch (EsRejectedExecutionException ex) { - logger.debug("Ping execution rejected", ex); - compoundPingListener.onPing(null); - } - } - } - - private static class CompoundPingListener implements PingListener { - - private final PingListener listener; - - private final AtomicInteger counter; - - private PingCollection responses = new PingCollection(); - - private CompoundPingListener(PingListener listener, List zenPings) { - this.listener = listener; - this.counter = new AtomicInteger(zenPings.size()); - } - - @Override - public void onPing(PingResponse[] pings) { - if (pings != null) { - responses.addPings(pings); - } - if (counter.decrementAndGet() == 0) { - listener.onPing(responses.toArray()); - } + return response; } } } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index afe4902f887a7..637730c75fd76 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -44,7 +44,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ping.PingContextProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.threadpool.ThreadPool; @@ -63,6 +63,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -236,8 +237,9 @@ public void clearTemporalResponses() { temporalResponses.clear(); } - public PingResponse[] pingAndWait(TimeValue duration) { - final AtomicReference response = new AtomicReference<>(); + // test only + Collection pingAndWait(TimeValue duration) { + final AtomicReference> response = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); ping(pings -> { response.set(pings); @@ -273,7 +275,7 @@ protected void doRun() { protected void doRun() throws Exception { sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), sendPingsHandler); sendPingsHandler.close(); - listener.onPing(sendPingsHandler.pingCollection().toArray()); + listener.onPing(sendPingsHandler.pingCollection().toList()); for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) { logger.trace("[{}] disconnecting from {}", sendPingsHandler.id(), node); transportService.disconnectFromNode(node); @@ -576,8 +578,7 @@ public void writeTo(StreamOutput out) throws IOException { } private PingResponse createPingResponse(DiscoveryNodes discoNodes) { - return new PingResponse(discoNodes.getLocalNode(), discoNodes.getMasterNode(), clusterName, - contextProvider.nodeHasJoinedClusterOnce()); + return new PingResponse(discoNodes.getLocalNode(), discoNodes.getMasterNode(), contextProvider.clusterState()); } static class UnicastPingResponse extends TransportResponse { diff --git a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java index b30a343547973..87f86c3f596d2 100644 --- a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java +++ b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java @@ -23,7 +23,7 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; diff --git a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java index 07c1e5dd8da3e..2e86cb5b896f8 100644 --- a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java @@ -29,8 +29,8 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; diff --git a/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java index 98c7b1a3d67a0..31e841227b866 100644 --- a/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java @@ -30,8 +30,8 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java index 057b54c7a078f..3b436f4541097 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java @@ -18,13 +18,10 @@ */ package org.elasticsearch.discovery; -import org.elasticsearch.Version; import org.elasticsearch.common.inject.ModuleTestCase; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.local.LocalDiscovery; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; -import org.elasticsearch.node.Node; import org.elasticsearch.test.NoopDiscovery; /** diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index f04db89796c38..b78b1d923b9f4 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -49,8 +49,8 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.discovery.zen.membership.MembershipAction; import org.elasticsearch.discovery.zen.ping.ZenPing; @@ -110,9 +110,12 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING; +import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; @@ -164,7 +167,7 @@ private List startCluster(int numberOfNodes, int minimumMasterNode) thro private List startCluster(int numberOfNodes, int minimumMasterNode, @Nullable int[] unicastHostsOrdinals) throws ExecutionException, InterruptedException { - configureUnicastCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode); + configureCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode); List nodes = internalCluster().startNodesAsync(numberOfNodes).get(); ensureStableCluster(numberOfNodes); @@ -196,15 +199,15 @@ protected Collection> nodePlugins() { return Arrays.asList(MockTransportService.TestPlugin.class); } - private void configureUnicastCluster( + private void configureCluster( int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode ) throws ExecutionException, InterruptedException { - configureUnicastCluster(DEFAULT_SETTINGS, numberOfNodes, unicastHostsOrdinals, minimumMasterNode); + configureCluster(DEFAULT_SETTINGS, numberOfNodes, unicastHostsOrdinals, minimumMasterNode); } - private void configureUnicastCluster( + private void configureCluster( Settings settings, int numberOfNodes, @Nullable int[] unicastHostsOrdinals, @@ -1031,7 +1034,7 @@ public void onFailure(Exception e) { } public void testClusterFormingWithASlowNode() throws Exception { - configureUnicastCluster(3, null, 2); + configureCluster(3, null, 2); SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(random(), 0, 0, 1000, 2000); @@ -1094,7 +1097,7 @@ public void testNodeNotReachableFromMaster() throws Exception { */ public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Exception { // don't use DEFAULT settings (which can cause node disconnects on a slow CI machine) - configureUnicastCluster(Settings.EMPTY, 3, null, 1); + configureCluster(Settings.EMPTY, 3, null, 1); InternalTestCluster.Async masterNodeFuture = internalCluster().startMasterOnlyNodeAsync(); InternalTestCluster.Async node_1Future = internalCluster().startDataOnlyNodeAsync(); @@ -1135,7 +1138,7 @@ public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Excep public void testIndexImportedFromDataOnlyNodesIfMasterLostDataFolder() throws Exception { // test for https://github.com/elastic/elasticsearch/issues/8823 - configureUnicastCluster(2, null, 1); + configureCluster(2, null, 1); String masterNode = internalCluster().startMasterOnlyNode(Settings.EMPTY); internalCluster().startDataOnlyNode(Settings.EMPTY); @@ -1166,7 +1169,7 @@ public void testIndicesDeleted() throws Exception { .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed .build(); final String idxName = "test"; - configureUnicastCluster(settings, 3, null, 2); + configureCluster(settings, 3, null, 2); InternalTestCluster.Async> masterNodes = internalCluster().startMasterOnlyNodesAsync(2); InternalTestCluster.Async dataNode = internalCluster().startDataOnlyNodeAsync(); dataNode.get(); @@ -1195,6 +1198,61 @@ public void testIndicesDeleted() throws Exception { assertFalse(client().admin().indices().prepareExists(idxName).get().isExists()); } + public void testElectMasterWithLatestVersion() throws Exception { + configureCluster(3, null, 2); + final Set nodes = new HashSet<>(internalCluster().startNodesAsync(3).get()); + ensureStableCluster(3); + ServiceDisruptionScheme isolateAllNodes = new NetworkDisruption(new NetworkDisruption.IsolateAllNodes(nodes), new NetworkDisconnect()); + internalCluster().setDisruptionScheme(isolateAllNodes); + + logger.info("--> forcing a complete election to make sure \"preferred\" master is elected"); + isolateAllNodes.startDisrupting(); + for (String node: nodes) { + assertNoMaster(node); + } + isolateAllNodes.stopDisrupting(); + ensureStableCluster(3); + final String preferredMasterName = internalCluster().getMasterName(); + final DiscoveryNode preferredMaster = internalCluster().clusterService(preferredMasterName).localNode(); + for (String node: nodes) { + DiscoveryNode discoveryNode = internalCluster().clusterService(node).localNode(); + assertThat(discoveryNode.getId(), greaterThanOrEqualTo(preferredMaster.getId())); + } + + logger.info("--> preferred master is {}", preferredMaster); + final Set nonPreferredNodes = new HashSet<>(nodes); + nonPreferredNodes.remove(preferredMasterName); + final ServiceDisruptionScheme isolatePreferredMaster = + new NetworkDisruption( + new NetworkDisruption.TwoPartitions( + Collections.singleton(preferredMasterName), nonPreferredNodes), + new NetworkDisconnect()); + internalCluster().setDisruptionScheme(isolatePreferredMaster); + isolatePreferredMaster.startDisrupting(); + + assertAcked(client(randomFrom(nonPreferredNodes)).admin().indices().prepareCreate("test").setSettings( + INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1, + INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0 + )); + + internalCluster().clearDisruptionScheme(false); + internalCluster().setDisruptionScheme(isolateAllNodes); + + logger.info("--> forcing a complete election again"); + isolateAllNodes.startDisrupting(); + for (String node: nodes) { + assertNoMaster(node); + } + + isolateAllNodes.stopDisrupting(); + + final ClusterState state = client().admin().cluster().prepareState().get().getState(); + if (state.metaData().hasIndex("test") == false) { + fail("index 'test' was lost. current cluster state: " + state.prettyPrint()); + } + + } + protected NetworkDisruption addRandomDisruptionType(TwoPartitions partitions) { final NetworkLinkDisruptionType disruptionType; if (randomBoolean()) { diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ElectMasterServiceTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ElectMasterServiceTests.java index b31b0cbaa55e0..737607df6be3c 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ElectMasterServiceTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ElectMasterServiceTests.java @@ -23,7 +23,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.discovery.zen.ElectMasterService.MasterCandidate; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; @@ -31,6 +31,10 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; public class ElectMasterServiceTests extends ESTestCase { @@ -55,6 +59,22 @@ List generateRandomNodes() { return nodes; } + List generateRandomCandidates() { + int count = scaledRandomIntBetween(1, 100); + ArrayList candidates = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + Set roles = new HashSet<>(); + roles.add(DiscoveryNode.Role.MASTER); + DiscoveryNode node = new DiscoveryNode("n_" + i, "n_" + i, LocalTransportAddress.buildUnique(), Collections.emptyMap(), + roles, Version.CURRENT); + candidates.add(new MasterCandidate(node, randomBoolean() ? MasterCandidate.UNRECOVERED_CLUSTER_VERSION : randomPositiveLong())); + } + + Collections.shuffle(candidates, random()); + return candidates; + } + + public void testSortByMasterLikelihood() { List nodes = generateRandomNodes(); List sortedNodes = electMasterService().sortByMasterLikelihood(nodes); @@ -69,36 +89,53 @@ public void testSortByMasterLikelihood() { } prevNode = node; } + } + public void testTieBreakActiveMasters() { + List nodes = generateRandomCandidates().stream().map(MasterCandidate::getNode).collect(Collectors.toList()); + DiscoveryNode bestMaster = electMasterService().tieBreakActiveMasters(nodes); + for (DiscoveryNode node: nodes) { + if (node.equals(bestMaster) == false) { + assertTrue(bestMaster.getId().compareTo(node.getId()) < 0); + } + } } - public void testElectMaster() { - List nodes = generateRandomNodes(); + public void testHasEnoughNodes() { + List nodes = rarely() ? Collections.emptyList() : generateRandomNodes(); ElectMasterService service = electMasterService(); - int min_master_nodes = randomIntBetween(0, nodes.size()); - service.minimumMasterNodes(min_master_nodes); + int masterNodes = (int) nodes.stream().filter(DiscoveryNode::isMasterNode).count(); + service.minimumMasterNodes(randomIntBetween(-1, masterNodes)); + assertThat(service.hasEnoughMasterNodes(nodes), equalTo(masterNodes > 0)); + service.minimumMasterNodes(masterNodes + 1 + randomIntBetween(0, nodes.size())); + assertFalse(service.hasEnoughMasterNodes(nodes)); + } - int master_nodes = 0; - for (DiscoveryNode node : nodes) { - if (node.isMasterNode()) { - master_nodes++; - } - } - DiscoveryNode master = null; - if (service.hasEnoughMasterNodes(nodes)) { - master = service.electMaster(nodes); - } + public void testHasEnoughCandidates() { + List candidates = rarely() ? Collections.emptyList() : generateRandomCandidates(); + ElectMasterService service = electMasterService(); + service.minimumMasterNodes(randomIntBetween(-1, candidates.size())); + assertThat(service.hasEnoughCandidates(candidates), equalTo(candidates.size() > 0)); + service.minimumMasterNodes(candidates.size() + 1 + randomIntBetween(0, candidates.size())); + assertFalse(service.hasEnoughCandidates(candidates)); + } - if (master_nodes == 0) { - assertNull(master); - } else if (min_master_nodes > 0 && master_nodes < min_master_nodes) { - assertNull(master); - } else { - assertNotNull(master); - for (DiscoveryNode node : nodes) { - if (node.isMasterNode()) { - assertTrue(master.getId().compareTo(node.getId()) <= 0); - } + public void testElectMaster() { + List candidates = generateRandomCandidates(); + ElectMasterService service = electMasterService(); + int minMasterNodes = randomIntBetween(0, candidates.size()); + service.minimumMasterNodes(minMasterNodes); + MasterCandidate master = service.electMaster(candidates); + assertNotNull(master); + for (MasterCandidate candidate : candidates) { + if (candidate.getNode().equals(master.getNode())) { + // nothing much to test here + } else if (candidate.getClusterStateVersion() == master.getClusterStateVersion()) { + assertThat("candidate " + candidate + " has a lower or equal id than master " + master, candidate.getNode().getId(), + greaterThan(master.getNode().getId())); + } else { + assertThat("candidate " + master + " has a higher cluster state version than candidate " + candidate, + master.getClusterStateVersion(), greaterThan(candidate.getClusterStateVersion())); } } } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java index 0acba3c420f27..ca75ea960ad1d 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java @@ -43,7 +43,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.membership.MembershipAction; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/NodeRemovalClusterStateTaskExecutorTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/NodeRemovalClusterStateTaskExecutorTests.java index 35335a8ede4ce..1e8954330cd26 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/NodeRemovalClusterStateTaskExecutorTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/NodeRemovalClusterStateTaskExecutorTests.java @@ -28,7 +28,6 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.transport.LocalTransportAddress; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java index b7aaf2795824f..d9a8c9be7f497 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java @@ -34,14 +34,12 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryStats; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.discovery.zen.membership.MembershipAction; import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; @@ -60,10 +58,8 @@ import org.junit.Before; import java.io.IOException; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -77,8 +73,6 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.sameInstance; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) @ESIntegTestCase.SuppressLocalMode @@ -293,44 +287,6 @@ public EnumSet context() { } } - public void testHandleNodeJoin_incompatibleMinVersion() throws UnknownHostException { - Settings nodeSettings = Settings.builder() - .put("discovery.type", "zen") // <-- To override the local setting if set externally - .build(); - String nodeName = internalCluster().startNode(nodeSettings); - ZenDiscovery zenDiscovery = (ZenDiscovery) internalCluster().getInstance(Discovery.class, nodeName); - ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeName); - DiscoveryNode node = new DiscoveryNode("_node_id", new InetSocketTransportAddress(InetAddress.getByName("0.0.0.0"), 0), - emptyMap(), emptySet(), previousMajorVersion); - final AtomicReference holder = new AtomicReference<>(); - zenDiscovery.handleJoinRequest(node, clusterService.state(), new MembershipAction.JoinCallback() { - @Override - public void onSuccess() { - } - - @Override - public void onFailure(Exception e) { - holder.set((IllegalStateException) e); - } - }); - - assertThat(holder.get(), notNullValue()); - assertThat(holder.get().getMessage(), equalTo("Can't handle join request from a node with a version [" + previousMajorVersion - + "] that is lower than the minimum compatible version [" + Version.CURRENT.minimumCompatibilityVersion() + "]")); - } - - public void testJoinElectedMaster_incompatibleMinVersion() { - ElectMasterService electMasterService = new ElectMasterService(Settings.EMPTY); - - DiscoveryNode node = new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), emptyMap(), - Collections.singleton(DiscoveryNode.Role.MASTER), Version.CURRENT); - assertThat(electMasterService.electMaster(Collections.singletonList(node)), sameInstance(node)); - node = new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), emptyMap(), emptySet(), previousMajorVersion); - assertThat("Can't join master because version " + previousMajorVersion - + " is lower than the minimum compatable version " + Version.CURRENT + " can support", - electMasterService.electMaster(Collections.singletonList(node)), nullValue()); - } - public void testDiscoveryStats() throws IOException { String expectedStatsJsonResponse = "{\n" + " \"discovery\" : {\n" + diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index b9f65016048a0..a7291dc37366e 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -33,7 +33,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.discovery.zen.ping.ZenPingService; import org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.AssertingAckListener; @@ -55,8 +54,8 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState; -import static org.elasticsearch.discovery.zen.elect.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.elasticsearch.test.ClusterServiceUtils.setState; import static org.hamcrest.Matchers.containsString; @@ -128,7 +127,7 @@ public void testFilterNonMasterPingResponse() { Set roles = new HashSet<>(randomSubsetOf(Arrays.asList(Role.values()))); DiscoveryNode node = new DiscoveryNode("node_" + i, "id_" + i, LocalTransportAddress.buildUnique(), Collections.emptyMap(), roles, Version.CURRENT); - responses.add(new ZenPing.PingResponse(node, randomBoolean() ? null : node, new ClusterName("test"), randomBoolean())); + responses.add(new ZenPing.PingResponse(node, randomBoolean() ? null : node, new ClusterName("test"), randomLong())); allNodes.add(node); if (node.isMasterNode()) { masterNodes.add(node); @@ -136,8 +135,7 @@ public void testFilterNonMasterPingResponse() { } boolean ignore = randomBoolean(); - List filtered = ZenDiscovery.filterPingResponses( - responses.toArray(new ZenPing.PingResponse[responses.size()]), ignore, logger); + List filtered = ZenDiscovery.filterPingResponses(responses, ignore, logger); final List filteredNodes = filtered.stream().map(ZenPing.PingResponse::node).collect(Collectors.toList()); if (ignore) { assertThat(filteredNodes, equalTo(masterNodes)); diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenPingTests.java index 72674f44e3dc9..2275756e8eeea 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenPingTests.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.List; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -39,7 +40,7 @@ public void testPingCollection() { DiscoveryNode[] nodes = new DiscoveryNode[randomIntBetween(1, 30)]; long maxIdPerNode[] = new long[nodes.length]; DiscoveryNode masterPerNode[] = new DiscoveryNode[nodes.length]; - boolean hasJoinedOncePerNode[] = new boolean[nodes.length]; + long clusterStateVersionPerNode[] = new long[nodes.length]; ArrayList pings = new ArrayList<>(); for (int i = 0; i < nodes.length; i++) { nodes[i] = new DiscoveryNode("" + i, LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); @@ -51,9 +52,9 @@ public void testPingCollection() { if (randomBoolean()) { masterNode = nodes[randomInt(nodes.length - 1)]; } - boolean hasJoinedOnce = randomBoolean(); + long clusterStateVersion = randomLong(); ZenPing.PingResponse ping = new ZenPing.PingResponse(nodes[node], masterNode, ClusterName.CLUSTER_NAME_SETTING. - getDefault(Settings.EMPTY), hasJoinedOnce); + getDefault(Settings.EMPTY), clusterStateVersion); if (rarely()) { // ignore some pings continue; @@ -61,7 +62,7 @@ public void testPingCollection() { // update max ping info maxIdPerNode[node] = ping.id(); masterPerNode[node] = masterNode; - hasJoinedOncePerNode[node] = hasJoinedOnce; + clusterStateVersionPerNode[node] = clusterStateVersion; pings.add(ping); } @@ -69,15 +70,15 @@ public void testPingCollection() { Collections.shuffle(pings, random()); ZenPing.PingCollection collection = new ZenPing.PingCollection(); - collection.addPings(pings.toArray(new ZenPing.PingResponse[pings.size()])); + collection.addPings(pings); - ZenPing.PingResponse[] aggregate = collection.toArray(); + List aggregate = collection.toList(); for (ZenPing.PingResponse ping : aggregate) { int nodeId = Integer.parseInt(ping.node().getId()); assertThat(maxIdPerNode[nodeId], equalTo(ping.id())); assertThat(masterPerNode[nodeId], equalTo(ping.master())); - assertThat(hasJoinedOncePerNode[nodeId], equalTo(ping.hasJoinedOnce())); + assertThat(clusterStateVersionPerNode[nodeId], equalTo(ping.getClusterStateVersion())); maxIdPerNode[nodeId] = -1; // mark as seen } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java similarity index 85% rename from core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java rename to core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java index ea5779c33bbab..e04b0b52d81cc 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java @@ -20,6 +20,9 @@ package org.elasticsearch.discovery.zen.ping.unicast; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -31,7 +34,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ping.PingContextProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -45,16 +48,18 @@ import org.elasticsearch.transport.TransportSettings; import java.net.InetSocketAddress; +import java.util.Collection; import java.util.Collections; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -public class UnicastZenPingIT extends ESTestCase { +public class UnicastZenPingTests extends ESTestCase { public void testSimplePings() throws InterruptedException { int startPort = 11000 + randomIntBetween(0, 1000); int endPort = startPort + 10; @@ -78,6 +83,8 @@ public void testSimplePings() throws InterruptedException { Version versionD = VersionUtils.randomVersionBetween(random(), previousVersion.minimumCompatibilityVersion(), previousVersion); NetworkHandle handleD = startServices(settingsMismatch, threadPool, networkService, "UZP_D", versionD); + final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomPositiveLong()).build(); + Settings hostsSettings = Settings.builder() .putArray("discovery.zen.ping.unicast.hosts", NetworkAddress.format(new InetSocketAddress(handleA.address.address().getAddress(), handleA.address.address().getPort())), @@ -96,8 +103,8 @@ public DiscoveryNodes nodes() { } @Override - public boolean nodeHasJoinedClusterOnce() { - return false; + public ClusterState clusterState() { + return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build(); } }); zenPingA.start(); @@ -110,8 +117,8 @@ public DiscoveryNodes nodes() { } @Override - public boolean nodeHasJoinedClusterOnce() { - return true; + public ClusterState clusterState() { + return state; } }); zenPingB.start(); @@ -130,8 +137,8 @@ public DiscoveryNodes nodes() { } @Override - public boolean nodeHasJoinedClusterOnce() { - return false; + public ClusterState clusterState() { + return state; } }); zenPingC.start(); @@ -144,36 +151,38 @@ public DiscoveryNodes nodes() { } @Override - public boolean nodeHasJoinedClusterOnce() { - return false; + public ClusterState clusterState() { + return state; } }); zenPingD.start(); try { logger.info("ping from UZP_A"); - ZenPing.PingResponse[] pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(1)); - assertThat(pingResponses.length, equalTo(1)); - assertThat(pingResponses[0].node().getId(), equalTo("UZP_B")); - assertTrue(pingResponses[0].hasJoinedOnce()); + Collection pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(1)); + assertThat(pingResponses.size(), equalTo(1)); + ZenPing.PingResponse ping = pingResponses.iterator().next(); + assertThat(ping.node().getId(), equalTo("UZP_B")); + assertThat(ping.getClusterStateVersion(), equalTo(state.version())); assertCounters(handleA, handleA, handleB, handleC, handleD); // ping again, this time from B, logger.info("ping from UZP_B"); pingResponses = zenPingB.pingAndWait(TimeValue.timeValueSeconds(1)); - assertThat(pingResponses.length, equalTo(1)); - assertThat(pingResponses[0].node().getId(), equalTo("UZP_A")); - assertFalse(pingResponses[0].hasJoinedOnce()); + assertThat(pingResponses.size(), equalTo(1)); + ping = pingResponses.iterator().next(); + assertThat(ping.node().getId(), equalTo("UZP_A")); + assertThat(ping.getClusterStateVersion(), equalTo(ElectMasterService.MasterCandidate.UNRECOVERED_CLUSTER_VERSION)); assertCounters(handleB, handleA, handleB, handleC, handleD); logger.info("ping from UZP_C"); pingResponses = zenPingC.pingAndWait(TimeValue.timeValueSeconds(1)); - assertThat(pingResponses.length, equalTo(0)); + assertThat(pingResponses.size(), equalTo(0)); assertCounters(handleC, handleA, handleB, handleC, handleD); logger.info("ping from UZP_D"); pingResponses = zenPingD.pingAndWait(TimeValue.timeValueSeconds(1)); - assertThat(pingResponses.length, equalTo(0)); + assertThat(pingResponses.size(), equalTo(0)); assertCounters(handleD, handleA, handleB, handleC, handleD); } finally { zenPingA.close(); diff --git a/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java b/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java index 9b340fd863a96..a998b56f64090 100644 --- a/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java @@ -38,7 +38,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.mapper.MapperParsingException; diff --git a/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java index a1d16bfd884fd..c820bccae51ad 100644 --- a/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java @@ -23,7 +23,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index f512f1da5387b..3a045c80ac807 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -44,8 +44,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.ttl.IndicesTTLService; diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index f76c9e43a2226..63e08e06b795b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -100,7 +100,7 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.env.Environment; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; @@ -368,14 +368,14 @@ public void randomIndexTemplate() throws IOException { // TODO move settings for random directory etc here into the index based randomized settings. if (cluster().size() > 0) { Settings.Builder randomSettingsBuilder = - setRandomIndexSettings(random(), Settings.builder()); + setRandomIndexSettings(random(), Settings.builder()); if (isInternalCluster()) { // this is only used by mock plugins and if the cluster is not internal we just can't set it randomSettingsBuilder.put(INDEX_TEST_SEED_SETTING.getKey(), random().nextLong()); } randomSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, numberOfShards()) - .put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas()); + .put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas()); // if the test class is annotated with SuppressCodecs("*"), it means don't use lucene's codec randomization // otherwise, use it, it has assertions and so on that can find bugs. @@ -404,10 +404,10 @@ public void randomIndexTemplate() throws IOException { randomSettingsBuilder.put(IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.getKey(), randomBoolean()); } PutIndexTemplateRequestBuilder putTemplate = client().admin().indices() - .preparePutTemplate("random_index_template") - .setTemplate("*") - .setOrder(0) - .setSettings(randomSettingsBuilder); + .preparePutTemplate("random_index_template") + .setTemplate("*") + .setOrder(0) + .setSettings(randomSettingsBuilder); if (mappings != null) { logger.info("test using _default_ mappings: [{}]", mappings.bytes().utf8ToString()); putTemplate.addMapping("_default_", mappings); @@ -443,7 +443,7 @@ protected Settings.Builder setRandomIndexSettings(Random random, Settings.Builde private static Settings.Builder setRandomIndexMergeSettings(Random random, Settings.Builder builder) { if (random.nextBoolean()) { builder.put(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING.getKey(), - random.nextBoolean() ? random.nextDouble() : random.nextBoolean()); + random.nextBoolean() ? random.nextDouble() : random.nextBoolean()); } switch (random.nextInt(4)) { case 3: @@ -525,9 +525,9 @@ protected final void afterInternal(boolean afterClass) throws Exception { if (currentClusterScope != Scope.TEST) { MetaData metaData = client().admin().cluster().prepareState().execute().actionGet().getState().getMetaData(); assertThat("test leaves persistent cluster metadata behind: " + metaData.persistentSettings().getAsMap(), metaData - .persistentSettings().getAsMap().size(), equalTo(0)); + .persistentSettings().getAsMap().size(), equalTo(0)); assertThat("test leaves transient cluster metadata behind: " + metaData.transientSettings().getAsMap(), metaData - .transientSettings().getAsMap().size(), equalTo(0)); + .transientSettings().getAsMap().size(), equalTo(0)); } ensureClusterSizeConsistency(); ensureClusterStateConsistency(); @@ -540,7 +540,7 @@ protected final void afterInternal(boolean afterClass) throws Exception { @Override public void run() { assertThat("still having pending states: " + Strings.arrayToDelimitedString(zenDiscovery.pendingClusterStates(), "\n"), - zenDiscovery.pendingClusterStates(), emptyArray()); + zenDiscovery.pendingClusterStates(), emptyArray()); } }); } @@ -829,7 +829,7 @@ public void assertResultsAndLogOnFailure(long expectedResults, SearchResponse se String failMsg = sb.toString(); for (SearchHit hit : searchResponse.getHits().getHits()) { sb.append("\n-> _index: [").append(hit.getIndex()).append("] type [").append(hit.getType()) - .append("] id [").append(hit.id()).append("]"); + .append("] id [").append(hit.id()).append("]"); } logger.warn("{}", sb); fail(failMsg); @@ -873,7 +873,7 @@ public ClusterHealthStatus ensureGreen(String... indices) { */ public ClusterHealthStatus ensureGreen(TimeValue timeout, String... indices) { ClusterHealthResponse actionGet = client().admin().cluster() - .health(Requests.clusterHealthRequest(indices).timeout(timeout).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForNoRelocatingShards(true)).actionGet(); + .health(Requests.clusterHealthRequest(indices).timeout(timeout).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForNoRelocatingShards(true)).actionGet(); if (actionGet.isTimedOut()) { logger.info("ensureGreen timed out, cluster state:\n{}\n{}", client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().preparePendingClusterTasks().get().prettyPrint()); fail("timed out waiting for green state"); @@ -900,7 +900,7 @@ public ClusterHealthStatus waitForRelocation(ClusterHealthStatus status) { request.waitForStatus(status); } ClusterHealthResponse actionGet = client().admin().cluster() - .health(request).actionGet(); + .health(request).actionGet(); if (actionGet.isTimedOut()) { logger.info("waitForRelocation timed out (status={}), cluster state:\n{}\n{}", status, client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().preparePendingClusterTasks().get().prettyPrint()); assertThat("timed out waiting for relocation", actionGet.isTimedOut(), equalTo(false)); @@ -945,7 +945,7 @@ public long waitForDocs(final long numDocs, @Nullable final BackgroundIndexer in * @return the actual number of docs seen. */ public long waitForDocs(final long numDocs, int maxWaitTime, TimeUnit maxWaitTimeUnit, @Nullable final BackgroundIndexer indexer) - throws InterruptedException { + throws InterruptedException { final AtomicLong lastKnownCount = new AtomicLong(-1); long lastStartCount = -1; BooleanSupplier testDocs = () -> { @@ -988,8 +988,8 @@ public long waitForDocs(final long numDocs, int maxWaitTime, TimeUnit maxWaitTim */ public void setMinimumMasterNodes(int n) { assertTrue(client().admin().cluster().prepareUpdateSettings().setTransientSettings( - Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), n)) - .get().isAcknowledged()); + Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), n)) + .get().isAcknowledged()); } /** @@ -997,7 +997,7 @@ public void setMinimumMasterNodes(int n) { */ public ClusterHealthStatus ensureYellow(String... indices) { ClusterHealthResponse actionGet = client().admin().cluster() - .health(Requests.clusterHealthRequest(indices).waitForNoRelocatingShards(true).waitForYellowStatus().waitForEvents(Priority.LANGUID)).actionGet(); + .health(Requests.clusterHealthRequest(indices).waitForNoRelocatingShards(true).waitForYellowStatus().waitForEvents(Priority.LANGUID)).actionGet(); if (actionGet.isTimedOut()) { logger.info("ensureYellow timed out, cluster state:\n{}\n{}", client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().preparePendingClusterTasks().get().prettyPrint()); assertThat("timed out waiting for yellow", actionGet.isTimedOut(), equalTo(false)); @@ -1019,7 +1019,7 @@ public void logClusterState() { public void logSegmentsState(String... indices) throws Exception { IndicesSegmentResponse segsRsp = client().admin().indices().prepareSegments(indices).get(); logger.debug("segments {} state: \n{}", indices.length == 0 ? "[_all]" : indices, - segsRsp.toXContent(JsonXContent.contentBuilder().prettyPrint(), ToXContent.EMPTY_PARAMS).string()); + segsRsp.toXContent(JsonXContent.contentBuilder().prettyPrint(), ToXContent.EMPTY_PARAMS).string()); } /** @@ -1102,16 +1102,16 @@ protected void ensureStableCluster(int nodeCount, TimeValue timeValue, boolean l } logger.debug("ensuring cluster is stable with [{}] nodes. access node: [{}]. timeout: [{}]", nodeCount, viaNode, timeValue); ClusterHealthResponse clusterHealthResponse = client(viaNode).admin().cluster().prepareHealth() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes(Integer.toString(nodeCount)) - .setTimeout(timeValue) - .setLocal(local) - .setWaitForNoRelocatingShards(true) - .get(); + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes(Integer.toString(nodeCount)) + .setTimeout(timeValue) + .setLocal(local) + .setWaitForNoRelocatingShards(true) + .get(); if (clusterHealthResponse.isTimedOut()) { ClusterStateResponse stateResponse = client(viaNode).admin().cluster().prepareState().get(); fail("failed to reach a stable cluster of [" + nodeCount + "] nodes. Tried via [" + viaNode + "]. last cluster state:\n" - + stateResponse.getState().prettyPrint()); + + stateResponse.getState().prettyPrint()); } assertThat(clusterHealthResponse.isTimedOut(), is(false)); } @@ -1234,7 +1234,7 @@ protected boolean indexExists(String index) { */ protected final void enableAllocation(String... indices) { client().admin().indices().prepareUpdateSettings(indices).setSettings(Settings.builder().put( - EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "all" + EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "all" )).get(); } @@ -1243,7 +1243,7 @@ protected final void enableAllocation(String... indices) { */ protected final void disableAllocation(String... indices) { client().admin().indices().prepareUpdateSettings(indices).setSettings(Settings.builder().put( - EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none" + EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none" )).get(); } @@ -1357,7 +1357,7 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma } } else { List> partition = eagerPartition(builders, Math.min(MAX_BULK_INDEX_REQUEST_SIZE, - Math.max(1, (int) (builders.size() * randomDouble())))); + Math.max(1, (int) (builders.size() * randomDouble())))); logger.info("Index [{}] docs async: [{}] bulk: [{}] partitions [{}]", builders.size(), false, true, partition.size()); for (List segmented : partition) { BulkRequestBuilder bulkBuilder = client().prepareBulk(); @@ -1426,18 +1426,18 @@ private void postIndexAsyncActions(String[] indices, List inFlig if (rarely()) { if (rarely()) { client().admin().indices().prepareRefresh(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute( - new LatchedActionListener<>(newLatch(inFlightAsyncOperations))); + new LatchedActionListener<>(newLatch(inFlightAsyncOperations))); } else if (maybeFlush && rarely()) { if (randomBoolean()) { client().admin().indices().prepareFlush(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute( - new LatchedActionListener<>(newLatch(inFlightAsyncOperations))); + new LatchedActionListener<>(newLatch(inFlightAsyncOperations))); } else { client().admin().indices().syncedFlush(syncedFlushRequest(indices).indicesOptions(IndicesOptions.lenientExpandOpen()), new LatchedActionListener<>(newLatch(inFlightAsyncOperations))); } } else if (rarely()) { client().admin().indices().prepareForceMerge(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).setMaxNumSegments(between(1, 10)).setFlush(maybeFlush && randomBoolean()).execute( - new LatchedActionListener<>(newLatch(inFlightAsyncOperations))); + new LatchedActionListener<>(newLatch(inFlightAsyncOperations))); } } while (inFlightAsyncOperations.size() > MAX_IN_FLIGHT_ASYNC_INDEXES) { @@ -1567,7 +1567,7 @@ protected void addError(Exception e) { */ public void clearScroll(String... scrollIds) { ClearScrollResponse clearResponse = client().prepareClearScroll() - .setScrollIds(Arrays.asList(scrollIds)).get(); + .setScrollIds(Arrays.asList(scrollIds)).get(); assertThat(clearResponse.isSucceeded(), equalTo(true)); } @@ -1631,20 +1631,20 @@ private boolean randomDynamicTemplates() { */ protected Settings nodeSettings(int nodeOrdinal) { Settings.Builder builder = Settings.builder() - .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE) - // Default the watermarks to absurdly low to prevent the tests - // from failing on nodes without enough disk space - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b") - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b") - .put(ScriptService.SCRIPT_MAX_COMPILATIONS_PER_MINUTE.getKey(), 1000) - .put("script.stored", "true") - .put("script.inline", "true") - // by default we never cache below 10k docs in a segment, - // bypass this limit so that caching gets some testing in - // integration tests that usually create few documents - .put(IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING.getKey(), nodeOrdinal % 2 == 0) - // wait short time for other active shards before actually deleting, default 30s not needed in tests - .put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT.getKey(), new TimeValue(1, TimeUnit.SECONDS)); + .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE) + // Default the watermarks to absurdly low to prevent the tests + // from failing on nodes without enough disk space + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b") + .put(ScriptService.SCRIPT_MAX_COMPILATIONS_PER_MINUTE.getKey(), 1000) + .put("script.stored", "true") + .put("script.inline", "true") + // by default we never cache below 10k docs in a segment, + // bypass this limit so that caching gets some testing in + // integration tests that usually create few documents + .put(IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING.getKey(), nodeOrdinal % 2 == 0) + // wait short time for other active shards before actually deleting, default 30s not needed in tests + .put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT.getKey(), new TimeValue(1, TimeUnit.SECONDS)); return builder.build(); } @@ -1739,8 +1739,8 @@ protected TestCluster buildTestCluster(Scope scope, long seed) throws IOExceptio mockPlugins = mocks; } return new InternalTestCluster(seed, createTempDir(), supportsDedicatedMasters, minNumDataNodes, maxNumDataNodes, - InternalTestCluster.clusterName(scope.name(), seed) + "-cluster", nodeConfigurationSource, getNumClientNodes(), - InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING, nodePrefix, mockPlugins, getClientWrapper()); + InternalTestCluster.clusterName(scope.name(), seed) + "-cluster", nodeConfigurationSource, getNumClientNodes(), + InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING, nodePrefix, mockPlugins, getClientWrapper()); } protected NodeConfigurationSource getNodeConfigSource() { @@ -1772,7 +1772,7 @@ public Settings nodeSettings(int nodeOrdinal) { .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), isNetwork ? DiscoveryModule.DISCOVERY_TYPE_SETTING.getDefault(Settings.EMPTY) : "local") .put(networkSettings.build()). - put(ESIntegTestCase.this.nodeSettings(nodeOrdinal)).build(); + put(ESIntegTestCase.this.nodeSettings(nodeOrdinal)).build(); } @Override @@ -2071,8 +2071,8 @@ protected Settings prepareBackwardsDataDir(Path backwardsIndex, Object... settin assertFalse(Files.exists(src)); assertTrue(Files.exists(dest)); Settings.Builder builder = Settings.builder() - .put(settings) - .put(Environment.PATH_DATA_SETTING.getKey(), dataDir.toAbsolutePath()); + .put(settings) + .put(Environment.PATH_DATA_SETTING.getKey(), dataDir.toAbsolutePath()); Path configDir = indexDir.resolve("config"); if (Files.exists(configDir)) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index e49e6d4aa408d..6a5493ff1eb80 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1660,10 +1660,18 @@ public void setDisruptionScheme(ServiceDisruptionScheme scheme) { } public void clearDisruptionScheme() { + clearDisruptionScheme(true); + } + + public void clearDisruptionScheme(boolean ensureHealthyCluster) { if (activeDisruptionScheme != null) { TimeValue expectedHealingTime = activeDisruptionScheme.expectedTimeToHeal(); logger.info("Clearing active scheme {}, expected healing time {}", activeDisruptionScheme, expectedHealingTime); - activeDisruptionScheme.removeAndEnsureHealthy(this); + if (ensureHealthyCluster) { + activeDisruptionScheme.removeAndEnsureHealthy(this); + } else { + activeDisruptionScheme.removeFromCluster(this); + } } activeDisruptionScheme = null; } diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java index 4e135c4c2b0bf..f7094d8ae9feb 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java @@ -328,6 +328,18 @@ public String toString() { } } + public static class IsolateAllNodes extends DisruptedLinks { + + public IsolateAllNodes(Set nodes) { + super(nodes); + } + + @Override + public boolean disrupt(String node1, String node2) { + return true; + } + } + /** * Abstract class representing various types of network disruptions. Instances of this class override the {@link #applyDisruption} * method to apply their specific disruption type to requests that are send from a source to a target node. diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruptionTests.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruptionTests.java index 4d0f1123a1bee..edc261c175959 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruptionTests.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruptionTests.java @@ -56,6 +56,21 @@ private void checkTwoPartitions(TwoPartitions topology, Set partition1, assertTrue(topology.getMajoritySide().size() >= topology.getMinoritySide().size()); } + public void testIsolateAll() { + Set nodes = generateRandomStringSet(1, 10); + NetworkDisruption.DisruptedLinks topology = new NetworkDisruption.IsolateAllNodes(nodes); + for (int i = 0; i < 10; i++) { + final String node1 = randomFrom(nodes); + final String node2 = randomFrom(nodes); + if (node1.equals(node2)) { + continue; + } + assertTrue(topology.nodes().contains(node1)); + assertTrue(topology.nodes().contains(node2)); + assertTrue(topology.disrupt(node1, node2)); + } + } + public void testBridge() { Set partition1 = generateRandomStringSet(1, 10); Set partition2 = generateRandomStringSet(1, 10);