Skip to content

Commit

Permalink
[Discovery] UnicastZenPing should also ping last known discoNodes
Browse files Browse the repository at this point in the history
At the moment, when a node looses connection to the master (due to a partition or the master was stopped), we ping the unicast hosts in order to discover other nodes and elect a new master or get of another master than has been elected in the mean time. This can go wrong if all unicast targets are on the same side of a minority partition and therefore will never rejoin once the partition is healed.

Closes #7336
  • Loading branch information
bleskes committed Aug 22, 2014
1 parent 4393336 commit b942d54
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 18 deletions.
Expand Up @@ -141,7 +141,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
@Inject
public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService,
DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version,
DiscoveryNodeService discoveryNodeService, ZenPingService pingService, ElectMasterService electMasterService, Version version,
DiscoverySettings discoverySettings) {
super(settings);
this.clusterName = clusterName;
Expand All @@ -152,6 +152,8 @@ public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa
this.discoverySettings = discoverySettings;
this.pingService = pingService;
this.version = version;
this.electMaster = electMasterService;


// also support direct discovery.zen settings, for cases when it gets extended
this.pingTimeout = settings.getAsTime("discovery.zen.ping.timeout", settings.getAsTime("discovery.zen.ping_timeout", componentSettings.getAsTime("ping_timeout", componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3)))));
Expand All @@ -167,7 +169,6 @@ public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa

logger.debug("using ping.timeout [{}], join.timeout [{}], master_election.filter_client [{}], master_election.filter_data [{}]", pingTimeout, joinTimeout, masterElectionFilterClientNodes, masterElectionFilterDataNodes);

this.electMaster = new ElectMasterService(settings);
nodeSettingsService.addListener(new ApplySettings());

this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this, clusterName);
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;

Expand All @@ -44,6 +45,7 @@ public ZenDiscoveryModule addUnicastHostProvider(Class<? extends UnicastHostsPro

@Override
protected void configure() {
bind(ElectMasterService.class).asEagerSingleton();
bind(ZenPingService.class).asEagerSingleton();
Multibinder<UnicastHostsProvider> unicastHostsProviderMultibinder = Multibinder.newSetBinder(binder(), UnicastHostsProvider.class);
for (Class<? extends UnicastHostsProvider> unicastHostProvider : unicastHostProviders) {
Expand Down
Expand Up @@ -24,12 +24,10 @@
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;

import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.*;

/**
*
Expand All @@ -42,6 +40,7 @@ public class ElectMasterService extends AbstractComponent {

private volatile int minimumMasterNodes;

@Inject
public ElectMasterService(Settings settings) {
super(settings);
this.minimumMasterNodes = settings.getAsInt(DISCOVERY_ZEN_MINIMUM_MASTER_NODES, -1);
Expand Down Expand Up @@ -69,6 +68,18 @@ public boolean hasEnoughMasterNodes(Iterable<DiscoveryNode> nodes) {
return count >= minimumMasterNodes;
}

/**
* Returns the given nodes sorted by likelyhood of being elected as master, most likely first.
* Non-master nodes are not removed but are rather put in the end
* @param nodes
* @return
*/
public List<DiscoveryNode> sortByMasterLikelihood(Iterable<DiscoveryNode> nodes) {
ArrayList<DiscoveryNode> sortedNodes = Lists.newArrayList(nodes);
CollectionUtil.introSort(sortedNodes, nodeComparator);
return sortedNodes;
}

/**
* Returns a list of the next possible masters.
*/
Expand Down Expand Up @@ -120,6 +131,12 @@ private static class NodeComparator implements Comparator<DiscoveryNode> {

@Override
public int compare(DiscoveryNode o1, DiscoveryNode o2) {
if (o1.masterNode() && !o2.masterNode()) {
return -1;
}
if (!o1.masterNode() && o2.masterNode()) {
return 1;
}
return o1.id().compareTo(o2.id());
}
}
Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
Expand All @@ -55,20 +56,20 @@ public class ZenPingService extends AbstractLifecycleComponent<ZenPing> implemen

// here for backward comp. with discovery plugins
public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService,
@Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
this(settings, threadPool, transportService, clusterName, networkService, Version.CURRENT, unicastHostsProviders);
ElectMasterService electMasterService, @Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
this(settings, threadPool, transportService, clusterName, networkService, Version.CURRENT, electMasterService, unicastHostsProviders);
}

@Inject
public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService,
Version version, @Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
Version version, ElectMasterService electMasterService, @Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
super(settings);
ImmutableList.Builder<ZenPing> zenPingsBuilder = ImmutableList.builder();
if (componentSettings.getAsBoolean("multicast.enabled", true)) {
zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterName, networkService, version));
}
// always add the unicast hosts, so it will be able to receive unicast requests even when working in multicast
zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName, version, unicastHostsProviders));
zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName, version, electMasterService, unicastHostsProviders));

this.zenPings = zenPingsBuilder.build();
}
Expand Down
Expand Up @@ -19,8 +19,12 @@

package org.elasticsearch.discovery.zen.ping.unicast;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.Lists;
import org.elasticsearch.*;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -35,6 +39,7 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
Expand Down Expand Up @@ -62,6 +67,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
private final ThreadPool threadPool;
private final TransportService transportService;
private final ClusterName clusterName;
private final ElectMasterService electMasterService;

private final int concurrentConnects;

Expand All @@ -78,11 +84,13 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen

private final CopyOnWriteArrayList<UnicastHostsProvider> hostsProviders = new CopyOnWriteArrayList<>();

public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, Version version, @Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName,
Version version, ElectMasterService electMasterService, @Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterName = clusterName;
this.electMasterService = electMasterService;

if (unicastHostsProviders != null) {
for (UnicastHostsProvider unicastHostsProvider : unicastHostsProviders) {
Expand Down Expand Up @@ -244,18 +252,30 @@ void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final Send
DiscoveryNodes discoNodes = nodesProvider.nodes();
pingRequest.pingResponse = new PingResponse(discoNodes.localNode(), discoNodes.masterNode(), clusterName);

HashSet<DiscoveryNode> nodesToPing = new HashSet<>(Arrays.asList(nodes));
HashSet<DiscoveryNode> nodesToPingSet = new HashSet<>();
for (PingResponse temporalResponse : temporalResponses) {
// Only send pings to nodes that have the same cluster name.
if (clusterName.equals(temporalResponse.clusterName())) {
nodesToPing.add(temporalResponse.target());
nodesToPingSet.add(temporalResponse.target());
}
}

for (UnicastHostsProvider provider : hostsProviders) {
nodesToPing.addAll(provider.buildDynamicNodes());
nodesToPingSet.addAll(provider.buildDynamicNodes());
}

// add all possible master nodes that were active in the last known cluster configuration
for (ObjectCursor<DiscoveryNode> masterNode : discoNodes.getMasterNodes().values()) {
nodesToPingSet.add(masterNode.value);
}

// sort the nodes by likelihood of being an active master
List<DiscoveryNode> sortedNodesToPing = electMasterService.sortByMasterLikelihood(nodesToPingSet);

// new add the the unicast targets first
ArrayList<DiscoveryNode> nodesToPing = Lists.newArrayList(nodes);
nodesToPing.addAll(sortedNodesToPing);

final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
for (final DiscoveryNode node : nodesToPing) {
// make sure we are connected
Expand Down
Expand Up @@ -649,6 +649,42 @@ public void unicastSinglePingResponseContainsMaster() throws Exception {
assertMaster(masterNode, nodes);
}

@Test
@TestLogging("discovery.zen:TRACE,action:TRACE")
public void isolatedUnicastNodes() throws Exception {
List<String> nodes = startUnicastCluster(3, new int[]{0}, -1);
// Figure out what is the elected master node
final String unicastTarget = nodes.get(0);

Set<String> unicastTargetSide = new HashSet<>();
unicastTargetSide.add(unicastTarget);

Set<String> restOfClusterSide = new HashSet<>();
restOfClusterSide.addAll(nodes);
restOfClusterSide.remove(unicastTarget);

// Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list
// includes all the other nodes that have pinged it and the issue doesn't manifest
for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) {
for (ZenPing zenPing : pingService.zenPings()) {
((UnicastZenPing) zenPing).clearTemporalReponses();
}
}

// Simulate a network issue between the unicast target node and the rest of the cluster
NetworkDisconnectPartition networkDisconnect = new NetworkDisconnectPartition(unicastTargetSide, restOfClusterSide, getRandom());
setDisruptionScheme(networkDisconnect);
networkDisconnect.startDisrupting();
// Wait until elected master has removed that the unlucky node...
ensureStableCluster(2, nodes.get(1));

// The isolate master node must report no master, so it starts with pinging
assertNoMaster(unicastTarget);
networkDisconnect.stopDisrupting();
// Wait until the master node sees all 3 nodes again.
ensureStableCluster(3);
}


/** Test cluster join with issues in cluster state publishing * */
@Test
Expand Down Expand Up @@ -695,7 +731,6 @@ public void sendRequest(DiscoveryNode node, long requestId, String action, Trans
nonMasterTransportService.clearRule(discoveryNodes.masterNode());

ensureStableCluster(2);

}


Expand Down
@@ -0,0 +1,105 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.discovery.zen;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;

import java.util.*;

public class ElectMasterServiceTest extends ElasticsearchTestCase {

ElectMasterService electMasterService() {
return new ElectMasterService(ImmutableSettings.EMPTY);
}

List<DiscoveryNode> generateRandomNodes() {
int count = scaledRandomIntBetween(1, 100);
ArrayList<DiscoveryNode> nodes = new ArrayList<>(count);

Map<String, String> master = new HashMap<>();
master.put("master", "true");
Map<String, String> nonMaster = new HashMap<>();
nonMaster.put("master", "false");

for (int i = 0; i < count; i++) {
Map<String, String> attributes = randomBoolean() ? master : nonMaster;
DiscoveryNode node = new DiscoveryNode("n_" + i, "n_" + i, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT);
nodes.add(node);
}

Collections.shuffle(nodes, getRandom());
return nodes;
}

@Test
public void sortByMasterLikelihood() {
List<DiscoveryNode> nodes = generateRandomNodes();
List<DiscoveryNode> sortedNodes = electMasterService().sortByMasterLikelihood(nodes);
assertEquals(nodes.size(), sortedNodes.size());
DiscoveryNode prevNode = sortedNodes.get(0);
for (int i = 1; i < sortedNodes.size(); i++) {
DiscoveryNode node = sortedNodes.get(i);
if (!prevNode.masterNode()) {
assertFalse(node.masterNode());
} else if (node.masterNode()) {
assertTrue(prevNode.id().compareTo(node.id()) < 0);
}
prevNode = node;
}

}

@Test
public void electMaster() {
List<DiscoveryNode> nodes = generateRandomNodes();
ElectMasterService service = electMasterService();
int min_master_nodes = randomIntBetween(0, nodes.size());
service.minimumMasterNodes(min_master_nodes);

int master_nodes = 0;
for (DiscoveryNode node : nodes) {
if (node.masterNode()) {
master_nodes++;
}
}
DiscoveryNode master = null;
if (service.hasEnoughMasterNodes(nodes)) {
master = service.electMaster(nodes);
}

if (master_nodes == 0) {
assertNull(master);
} else if (min_master_nodes > 0 && master_nodes < min_master_nodes) {
assertNull(master);
} else {
for (DiscoveryNode node : nodes) {
if (node.masterNode()) {
assertTrue(master.id().compareTo(node.id()) <= 0);
}
}
}
}
}
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.test.ElasticsearchTestCase;
Expand All @@ -55,6 +56,7 @@ public void testSimplePings() {
ThreadPool threadPool = new ThreadPool(getClass().getName());
ClusterName clusterName = new ClusterName("test");
NetworkService networkService = new NetworkService(settings);
ElectMasterService electMasterService = new ElectMasterService(settings);

NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT);
final TransportService transportServiceA = new TransportService(transportA, threadPool).start();
Expand All @@ -73,7 +75,7 @@ public void testSimplePings() {
addressB.address().getAddress().getHostAddress() + ":" + addressB.address().getPort())
.build();

UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName, Version.CURRENT, null);
UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName, Version.CURRENT, electMasterService, null);
zenPingA.setNodesProvider(new DiscoveryNodesProvider() {
@Override
public DiscoveryNodes nodes() {
Expand All @@ -87,7 +89,7 @@ public NodeService nodeService() {
});
zenPingA.start();

UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, transportServiceB, clusterName, Version.CURRENT, null);
UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, transportServiceB, clusterName, Version.CURRENT, electMasterService, null);
zenPingB.setNodesProvider(new DiscoveryNodesProvider() {
@Override
public DiscoveryNodes nodes() {
Expand Down

0 comments on commit b942d54

Please sign in to comment.