Skip to content

Commit

Permalink
An improvement to unicast discovery to also ping nodes the node itsel…
Browse files Browse the repository at this point in the history
…f received a ping from.

Also moved the unicast tests all in a single package.

Closes #5508
  • Loading branch information
martijnvg committed Mar 31, 2014
1 parent bd9c1bc commit 3b73209
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 11 deletions.
Expand Up @@ -240,7 +240,14 @@ void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final Send
DiscoveryNodes discoNodes = nodesProvider.nodes();
pingRequest.pingResponse = new PingResponse(discoNodes.localNode(), discoNodes.masterNode(), clusterName);

List<DiscoveryNode> nodesToPing = newArrayList(nodes);
HashSet<DiscoveryNode> nodesToPing = new HashSet<>(Arrays.asList(nodes));
for (PingResponse temporalResponse : temporalResponses) {
// Only send pings to nodes that have the same cluster name.
if (clusterName.equals(temporalResponse.clusterName())) {
nodesToPing.add(temporalResponse.target());
}
}

for (UnicastHostsProvider provider : hostsProviders) {
nodesToPing.addAll(provider.buildDynamicNodes());
}
Expand Down
Expand Up @@ -29,19 +29,15 @@

import static org.hamcrest.Matchers.equalTo;

@ClusterScope(scope=Scope.SUITE, numNodes=2)
public class DiscoveryTests extends ElasticsearchIntegrationTest {
@ClusterScope(scope=Scope.TEST, numNodes=2)
public class ZenUnicastDiscoveryTests extends ElasticsearchIntegrationTest {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.settingsBuilder()
.put("discovery.zen.ping.multicast.enabled", false)
// Can't use this, b/c at the moment all node will only ping localhost:9300 and the shared
// cluster will be running there, which leads of no node joining, because the cluster name
// isn't equal.
// .put("discovery.zen.ping.unicast.hosts", "localhost")
.put("discovery.zen.ping.unicast.hosts", "localhost:25300,localhost:25301")
.put("transport.tcp.port", "25300-25400")
.put("discovery.zen.ping.unicast.hosts", "localhost")
.put("transport.tcp.port", "25300-25400") // Need to use custom tcp port range otherwise we collide with the shared cluster
.put(super.nodeSettings(nodeOrdinal)).build();
}

Expand Down
@@ -0,0 +1,106 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.discovery;

import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;

import static org.hamcrest.Matchers.equalTo;

@ClusterScope(scope=Scope.TEST, numNodes=0)
public class ZenUnicastDiscoveryTestsMinimumMasterNodes extends ElasticsearchIntegrationTest {

@Test
// Without the 'include temporalResponses responses to nodesToConnect' improvement in UnicastZenPing#sendPings this
// test fails, because 2 nodes elect themselves as master and the health request times out b/c waiting_for_nodes=3
// can't be satisfied.
public void testUnicastDiscovery() throws Exception {
final Settings settings = ImmutableSettings.settingsBuilder()
.put("discovery.zen.ping.multicast.enabled", false)
.put("discovery.zen.minimum_master_nodes", 2)
.put("discovery.zen.ping.unicast.hosts", "localhost")
.put("transport.tcp.port", "25400-25500") // Need to use custom tcp port range otherwise we collide with the shared cluster
.build();

final CountDownLatch latch = new CountDownLatch(3);
final AtomicArray<String> nodes = new AtomicArray<>(3);
Runnable r1 = new Runnable() {

@Override
public void run() {
logger.info("--> start first node");
nodes.set(0, cluster().startNode(settings));
latch.countDown();
}
};
new Thread(r1).start();

sleep(between(500, 3000));
Runnable r2 = new Runnable() {

@Override
public void run() {
logger.info("--> start second node");
nodes.set(1, cluster().startNode(settings));
latch.countDown();
}
};
new Thread(r2).start();


sleep(between(500, 3000));
Runnable r3 = new Runnable() {

@Override
public void run() {
logger.info("--> start third node");
nodes.set(2, cluster().startNode(settings));
latch.countDown();
}
};
new Thread(r3).start();
latch.await();

ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("3").execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

DiscoveryNode masterDiscoNode = null;
for (String node : nodes.toArray(new String[3])) {
ClusterState state = cluster().client(node).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.nodes().size(), equalTo(3));
if (masterDiscoNode == null) {
masterDiscoNode = state.nodes().masterNode();
} else {
assertThat(masterDiscoNode.equals(state.nodes().masterNode()), equalTo(true));
}
}
}
}
Expand Up @@ -17,10 +17,11 @@
* under the License.
*/

package org.elasticsearch.cluster;
package org.elasticsearch.discovery;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -38,7 +39,7 @@
*/
@LuceneTestCase.Slow
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numNodes = 0)
public class ZenUnicastDiscoveryTests extends ElasticsearchIntegrationTest {
public class ZenUnicastDiscoveryTestsSpecificNodes extends ElasticsearchIntegrationTest {

@Test
@TestLogging("discovery.zen:TRACE")
Expand Down

0 comments on commit 3b73209

Please sign in to comment.