From 65d2e34e9e140f08867d4bbd65d875e25f323c0a Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sat, 15 Oct 2016 21:16:46 +0200 Subject: [PATCH 01/13] add MockZenPing and use it by default. --- .../common/util/ExtensionPoint.java | 4 + .../discovery/DiscoveryModule.java | 5 +- .../java/org/elasticsearch/node/Node.java | 7 +- .../plugins/DiscoveryPlugin.java | 6 ++ .../DiscoveryWithServiceDisruptionsIT.java | 4 + .../elasticsearch/test/ESIntegTestCase.java | 27 ++++-- .../test/discovery/MockZenPing.java | 92 +++++++++++++++++++ 7 files changed, 132 insertions(+), 13 deletions(-) create mode 100644 test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java diff --git a/core/src/main/java/org/elasticsearch/common/util/ExtensionPoint.java b/core/src/main/java/org/elasticsearch/common/util/ExtensionPoint.java index fcdfaafb1d5b9..a5dac12fab72f 100644 --- a/core/src/main/java/org/elasticsearch/common/util/ExtensionPoint.java +++ b/core/src/main/java/org/elasticsearch/common/util/ExtensionPoint.java @@ -200,6 +200,10 @@ protected void bindExtensions(Binder binder) { allocationMultibinder.addBinding().to(clazz); } } + + public boolean isEmpty() { + return extensions.isEmpty(); + } } /** diff --git a/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index b41316b65345d..42dfeefb46fad 100644 --- a/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -62,8 +62,6 @@ public DiscoveryModule(Settings settings) { addDiscoveryType("local", LocalDiscovery.class); addDiscoveryType("zen", ZenDiscovery.class); addElectMasterService("zen", ElectMasterService.class); - // always add the unicast hosts, or things get angry! - addZenPing(UnicastZenPing.class); } /** @@ -130,6 +128,9 @@ protected void configure() { unicastHostProviders.getOrDefault(discoveryType, Collections.emptyList())) { unicastHostsProviderMultibinder.addBinding().to(unicastHostProvider); } + if (zenPings.isEmpty()) { + zenPings.registerExtension(UnicastZenPing.class); + } zenPings.bind(binder()); } bind(Discovery.class).to(discoveryClass).asEagerSingleton(); diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 34eece0e7c9d7..7c3303b299c48 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -152,6 +152,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -331,7 +332,11 @@ protected Node(final Environment environment, Collection } final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool); modules.add(new NodeModule(this, monitorService)); - modules.add(new DiscoveryModule(this.settings)); + final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings); + pluginsService.filterPlugins(DiscoveryPlugin.class).stream().map(p -> p.getZenPings(this.settings)) + .flatMap(Set::stream).forEach(discoveryModule::addZenPing); + + modules.add(discoveryModule); ClusterModule clusterModule = new ClusterModule(settings, clusterService, pluginsService.filterPlugins(ClusterPlugin.class)); modules.add(clusterModule); diff --git a/core/src/main/java/org/elasticsearch/plugins/DiscoveryPlugin.java b/core/src/main/java/org/elasticsearch/plugins/DiscoveryPlugin.java index f6174c08d124f..2d41f75e12228 100644 --- a/core/src/main/java/org/elasticsearch/plugins/DiscoveryPlugin.java +++ b/core/src/main/java/org/elasticsearch/plugins/DiscoveryPlugin.java @@ -21,6 +21,10 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.zen.ping.ZenPing; + +import java.util.Collections; +import java.util.Set; /** * An additional extension point for {@link Plugin}s that extends Elasticsearch's discovery functionality. To add an additional @@ -52,4 +56,6 @@ public interface DiscoveryPlugin { default NetworkService.CustomNameResolver getCustomNameResolver(Settings settings) { return null; } + + default Set> getZenPings(Settings settings) { return Collections.emptySet(); } } diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index 58a6776973074..b400b0c7a5f4b 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -128,6 +128,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { private ClusterDiscoveryConfiguration discoveryConfig; + @Override + protected boolean addMockZenPings() { + return false; + } @Override protected Settings nodeSettings(int nodeOrdinal) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 26059ba60ca12..1f2f01cc35241 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -28,15 +28,8 @@ import org.apache.lucene.util.TestUtil; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; -import org.elasticsearch.discovery.DiscoveryModule; -import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.script.ScriptService; -import org.elasticsearch.transport.AssertingTransportInterceptor; -import org.elasticsearch.transport.MockTcpTransportPlugin; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; @@ -65,6 +58,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -74,6 +68,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; @@ -99,9 +94,10 @@ import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.ElectMasterService; +import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexService; @@ -119,12 +115,16 @@ import org.elasticsearch.node.NodeMocksPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.MockSearchService; import org.elasticsearch.search.SearchHit; import org.elasticsearch.test.client.RandomizingClient; +import org.elasticsearch.test.discovery.MockZenPing; import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.store.MockFSIndexStore; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.AssertingTransportInterceptor; +import org.elasticsearch.transport.MockTcpTransportPlugin; import org.hamcrest.Matchers; import org.junit.After; import org.junit.AfterClass; @@ -1750,7 +1750,6 @@ protected NodeConfigurationSource getNodeConfigSource() { public Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(NetworkModule.HTTP_ENABLED.getKey(), false) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local") .put(networkSettings.build()). put(ESIntegTestCase.this.nodeSettings(nodeOrdinal)).build(); } @@ -1787,6 +1786,10 @@ protected boolean addMockTransportService() { return true; } + protected boolean addMockZenPings() { + return true; + } + /** * Returns a function that allows to wrap / filter all clients that are exposed by the test cluster. This is useful * for debugging or request / response pre and post processing. It also allows to intercept all calls done by the test @@ -1823,6 +1826,10 @@ protected Collection> getMockPlugins() { if (addMockTransportService()) { mocks.add(MockTcpTransportPlugin.class); } + + if (addMockZenPings()) { + mocks.add(MockZenPing.TestPlugin.class); + } mocks.add(TestSeedPlugin.class); return Collections.unmodifiableList(mocks); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java new file mode 100644 index 0000000000000..ba53b1ff2e7e9 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java @@ -0,0 +1,92 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.discovery; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.discovery.zen.ping.PingContextProvider; +import org.elasticsearch.discovery.zen.ping.ZenPing; +import org.elasticsearch.plugins.DiscoveryPlugin; +import org.elasticsearch.plugins.Plugin; + +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +public final class MockZenPing extends AbstractLifecycleComponent implements ZenPing { + + static final Set activeNodes = ConcurrentCollections.newConcurrentSet(); + + private volatile PingContextProvider contextProvider; + + @Inject + public MockZenPing(Settings settings) { + super(settings); + } + + @Override + public void setPingContextProvider(PingContextProvider contextProvider) { + this.contextProvider = contextProvider; + } + + @Override + public void ping(PingListener listener, TimeValue timeout) { + List responseList = activeNodes.stream() + .filter(p -> p != this) // remove this as pings are not expected to return the local node + .map(MockZenPing::getPingResponse) + .collect(Collectors.toList()); + listener.onPing(responseList); + } + + private PingResponse getPingResponse() { + final ClusterState clusterState = contextProvider.clusterState(); + return new PingResponse(clusterState.nodes().getLocalNode(), clusterState.nodes().getMasterNode(), clusterState); + } + + @Override + protected void doStart() { + assert contextProvider != null; + boolean added = activeNodes.add(this); + assert added; + } + + @Override + protected void doStop() { + boolean found = activeNodes.remove(this); + assert found; + } + + @Override + protected void doClose() { + + } + + public static class TestPlugin extends Plugin implements DiscoveryPlugin { + + @Override + public Set> getZenPings(Settings settings) { + return Collections.singleton(MockZenPing.class); + } + } +} From 0312682488f8f90084571a0665dfbfcd8bf7e36d Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sat, 15 Oct 2016 22:08:16 +0200 Subject: [PATCH 02/13] support for multiple clusters and tribe nodes test --- .../java/org/elasticsearch/tribe/TribeIT.java | 18 ++--------------- .../test/discovery/MockZenPing.java | 20 +++++++++++++++---- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java index 89a32c118280f..0bc4974f28519 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java @@ -33,19 +33,15 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.MasterNotDiscoveredException; -import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing; import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.NodeConfigurationSource; import org.elasticsearch.transport.MockTcpTransportPlugin; -import org.elasticsearch.transport.Transport; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -54,7 +50,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.function.Consumer; @@ -66,6 +61,7 @@ import static java.util.stream.Collectors.toSet; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; @@ -195,22 +191,12 @@ private Settings.Builder createTribeSettings(Predicate filt settings.put(Node.NODE_MASTER_SETTING.getKey(), true); settings.put(NetworkModule.HTTP_ENABLED.getKey(), false); settings.put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME); - settings.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local"); doWithAllClusters(filter, c -> { String tribeSetting = "tribe." + c.getClusterName() + "."; settings.put(tribeSetting + ClusterName.CLUSTER_NAME_SETTING.getKey(), c.getClusterName()); settings.put(tribeSetting + DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "100ms"); settings.put(tribeSetting + NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME); - settings.put(tribeSetting + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local"); - - Set hosts = new HashSet<>(); - for (Transport transport : c.getInstances(Transport.class)) { - TransportAddress address = transport.boundAddress().publishAddress(); - hosts.add(address.getAddress() + ":" + address.getPort()); - } - settings.putArray(tribeSetting + UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey(), - hosts.toArray(new String[hosts.size()])); }); return settings; @@ -497,7 +483,7 @@ private static void assertNodes(Predicate filter) throws Ex assertBusy(() -> { ClusterState state = client().admin().cluster().prepareState().setNodes(true).get().getState(); Set nodes = StreamSupport.stream(state.getNodes().spliterator(), false).map(DiscoveryNode::getName).collect(toSet()); - assertThat(nodes.containsAll(expectedNodes), is(true)); + assertThat(nodes, containsInAnyOrder(expectedNodes.toArray())); }); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java index ba53b1ff2e7e9..370d1b5347ac8 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.test.discovery; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -31,12 +32,13 @@ import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; public final class MockZenPing extends AbstractLifecycleComponent implements ZenPing { - static final Set activeNodes = ConcurrentCollections.newConcurrentSet(); + static final Map> activeNodesPerCluster = ConcurrentCollections.newConcurrentMap(); private volatile PingContextProvider contextProvider; @@ -52,13 +54,18 @@ public void setPingContextProvider(PingContextProvider contextProvider) { @Override public void ping(PingListener listener, TimeValue timeout) { - List responseList = activeNodes.stream() + logger.info("pinging using mock zen ping"); + List responseList = getActiveNodesForCurrentCluster().stream() .filter(p -> p != this) // remove this as pings are not expected to return the local node .map(MockZenPing::getPingResponse) .collect(Collectors.toList()); listener.onPing(responseList); } + private ClusterName getClusterName() { + return contextProvider.clusterState().getClusterName(); + } + private PingResponse getPingResponse() { final ClusterState clusterState = contextProvider.clusterState(); return new PingResponse(clusterState.nodes().getLocalNode(), clusterState.nodes().getMasterNode(), clusterState); @@ -67,13 +74,18 @@ private PingResponse getPingResponse() { @Override protected void doStart() { assert contextProvider != null; - boolean added = activeNodes.add(this); + boolean added = getActiveNodesForCurrentCluster().add(this); assert added; } + private Set getActiveNodesForCurrentCluster() { + return activeNodesPerCluster.computeIfAbsent(getClusterName(), + clusterName -> ConcurrentCollections.newConcurrentSet()); + } + @Override protected void doStop() { - boolean found = activeNodes.remove(this); + boolean found = getActiveNodesForCurrentCluster().remove(this); assert found; } From 541d6c57bbcb01901db988de2833748ecf9a22cf Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sat, 15 Oct 2016 22:55:56 +0200 Subject: [PATCH 03/13] don't use mock pings when using network partitions --- .../support/master/IndexingMasterFailoverIT.java | 7 +++++-- .../elasticsearch/cluster/MinimumMasterNodesIT.java | 11 ++--------- .../cluster/routing/PrimaryAllocationIT.java | 6 ++---- .../indices/state/RareClusterStateIT.java | 10 +++------- 4 files changed, 12 insertions(+), 22 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java index 29dd2a150a3b2..38328a8005416 100644 --- a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java +++ b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java @@ -22,7 +22,6 @@ import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.fd.FaultDetection; @@ -46,6 +45,11 @@ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class IndexingMasterFailoverIT extends ESIntegTestCase { + @Override + protected boolean addMockZenPings() { + return false; + } + @Override protected Collection> nodePlugins() { final HashSet> classes = new HashSet<>(super.nodePlugins()); @@ -62,7 +66,6 @@ public void testMasterFailoverDuringIndexingWithMappingChanges() throws Throwabl logger.info("--> start 4 nodes, 3 master, 1 data"); final Settings sharedSettings = Settings.builder() - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen") .put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly .put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly .put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out diff --git a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java index b8e9534393203..83472a1edc4b5 100644 --- a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java @@ -28,7 +28,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; @@ -75,15 +74,13 @@ protected Collection> nodePlugins() { } @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); + protected boolean addMockZenPings() { + return false; } public void testSimpleMinimumMasterNodes() throws Exception { Settings settings = Settings.builder() - .put("discovery.type", "zen") .put("discovery.zen.minimum_master_nodes", 2) .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "200ms") .put("discovery.initial_state_timeout", "500ms") @@ -195,7 +192,6 @@ public void run() { public void testMultipleNodesShutdownNonMasterNodes() throws Exception { Settings settings = Settings.builder() - .put("discovery.type", "zen") .put("discovery.zen.minimum_master_nodes", 3) .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "1s") .put("discovery.initial_state_timeout", "500ms") @@ -271,7 +267,6 @@ public void run() { public void testDynamicUpdateMinimumMasterNodes() throws Exception { Settings settings = Settings.builder() - .put("discovery.type", "zen") .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "400ms") .put("discovery.initial_state_timeout", "500ms") .build(); @@ -329,7 +324,6 @@ private void assertNoMasterBlockOnAllNodes() throws InterruptedException { public void testCanNotBringClusterDown() throws ExecutionException, InterruptedException { int nodeCount = scaledRandomIntBetween(1, 5); Settings.Builder settings = Settings.builder() - .put("discovery.type", "zen") .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "200ms") .put("discovery.initial_state_timeout", "500ms"); @@ -368,7 +362,6 @@ public void testCanNotBringClusterDown() throws ExecutionException, InterruptedE public void testCanNotPublishWithoutMinMastNodes() throws Exception { Settings settings = Settings.builder() - .put("discovery.type", "zen") .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "200ms") .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2) .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "100ms") // speed things up diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index 9f21d1332646e..6243f138380d1 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -30,7 +30,6 @@ import org.elasticsearch.common.collect.ImmutableOpenIntMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -63,9 +62,8 @@ protected Collection> nodePlugins() { } @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); + protected boolean addMockZenPings() { + return false; } private void createStaleReplicaScenario() throws Exception { diff --git a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java index 5a257cc64fdfd..8582ca0e02f76 100644 --- a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java +++ b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java @@ -42,7 +42,6 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.index.Index; @@ -77,9 +76,8 @@ public class RareClusterStateIT extends ESIntegTestCase { @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); + protected boolean addMockZenPings() { + return false; } @Override @@ -173,9 +171,7 @@ public void onFailure(String source, Exception e) { @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/14932") public void testDeleteCreateInOneBulk() throws Exception { - internalCluster().startNodesAsync(2, Settings.builder() - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen") - .build()).get(); + internalCluster().startNodesAsync(2).get(); assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut()); prepareCreate("test").setSettings(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, true).addMapping("type").get(); ensureGreen("test"); From d18143dfd11f4e419badc6aa0332d1aea01a1cf8 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 16 Oct 2016 08:06:25 +0200 Subject: [PATCH 04/13] don't trip circuit breaker when publishing cluster state --- .../discovery/zen/publish/PublishClusterStateAction.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index c247936384039..2a173c755fe2a 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -100,8 +100,10 @@ public PublishClusterStateAction( this.discoverySettings = discoverySettings; this.clusterName = clusterName; this.pendingStatesQueue = new PendingClusterStatesQueue(logger, settings.getAsInt(SETTINGS_MAX_PENDING_CLUSTER_STATES, 25)); - transportService.registerRequestHandler(SEND_ACTION_NAME, BytesTransportRequest::new, ThreadPool.Names.SAME, new SendClusterStateRequestHandler()); - transportService.registerRequestHandler(COMMIT_ACTION_NAME, CommitClusterStateRequest::new, ThreadPool.Names.SAME, new CommitClusterStateRequestHandler()); + transportService.registerRequestHandler(SEND_ACTION_NAME, BytesTransportRequest::new, ThreadPool.Names.SAME, false, false, + new SendClusterStateRequestHandler()); + transportService.registerRequestHandler(COMMIT_ACTION_NAME, CommitClusterStateRequest::new, ThreadPool.Names.SAME, false, false, + new CommitClusterStateRequestHandler()); } public PendingClusterStatesQueue pendingStatesQueue() { From d51df7f1e16058a99c377ba1b2f07c3c299c5d30 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 16 Oct 2016 17:38:59 +0200 Subject: [PATCH 05/13] remove all non-tribe usages of local discovery --- .../cluster/SpecificMasterNodesIT.java | 30 ++-- .../cluster/service/ClusterServiceIT.java | 35 +--- .../discovery/zen/ZenDiscoveryIT.java | 55 +----- .../index/TransportIndexFailuresIT.java | 160 ------------------ .../DedicatedClusterSnapshotRestoreIT.java | 9 - .../SharedClusterSnapshotRestoreIT.java | 2 +- .../azure/AzureSnapshotRestoreTests.java | 4 +- .../elasticsearch/tribe/TribeUnitTests.java | 16 +- .../org/elasticsearch/node/NodeTests.java | 6 +- .../test/ESBackcompatTestCase.java | 8 +- .../test/ESSingleNodeTestCase.java | 6 +- .../ClusterDiscoveryConfiguration.java | 3 +- .../test/test/InternalTestClusterTests.java | 12 +- 13 files changed, 56 insertions(+), 290 deletions(-) delete mode 100644 core/src/test/java/org/elasticsearch/index/TransportIndexFailuresIT.java diff --git a/core/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java index 4d62f9b664fc3..c033ad7ff2737 100644 --- a/core/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java @@ -22,7 +22,6 @@ import org.apache.lucene.search.join.ScoreMode; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.node.Node; @@ -41,18 +40,9 @@ @ClusterScope(scope = Scope.TEST, numDataNodes = 0) public class SpecificMasterNodesIT extends ESIntegTestCase { - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); - } - protected final Settings.Builder settingsBuilder() { - return Settings.builder().put("discovery.type", "zen"); - } - public void testSimpleOnlyMasterNodeElection() throws IOException { logger.info("--> start data node / non master node"); - internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false).put("discovery.initial_state_timeout", "1s")); + internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false).put("discovery.initial_state_timeout", "1s")); try { assertThat(client().admin().cluster().prepareState().setMasterNodeTimeout("100ms").execute().actionGet().getState().nodes().getMasterNodeId(), nullValue()); fail("should not be able to find master"); @@ -60,7 +50,7 @@ public void testSimpleOnlyMasterNodeElection() throws IOException { // all is well, no master elected } logger.info("--> start master node"); - final String masterNodeName = internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); + final String masterNodeName = internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(masterNodeName)); assertThat(internalCluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(masterNodeName)); @@ -75,14 +65,14 @@ public void testSimpleOnlyMasterNodeElection() throws IOException { } logger.info("--> start master node"); - final String nextMasterEligibleNodeName = internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); + final String nextMasterEligibleNodeName = internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligibleNodeName)); assertThat(internalCluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligibleNodeName)); } public void testElectOnlyBetweenMasterNodes() throws IOException { logger.info("--> start data node / non master node"); - internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false).put("discovery.initial_state_timeout", "1s")); + internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false).put("discovery.initial_state_timeout", "1s")); try { assertThat(client().admin().cluster().prepareState().setMasterNodeTimeout("100ms").execute().actionGet().getState().nodes().getMasterNodeId(), nullValue()); fail("should not be able to find master"); @@ -90,12 +80,12 @@ public void testElectOnlyBetweenMasterNodes() throws IOException { // all is well, no master elected } logger.info("--> start master node (1)"); - final String masterNodeName = internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); + final String masterNodeName = internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(masterNodeName)); assertThat(internalCluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(masterNodeName)); logger.info("--> start master node (2)"); - final String nextMasterEligableNodeName = internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); + final String nextMasterEligableNodeName = internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(masterNodeName)); assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(masterNodeName)); assertThat(internalCluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(masterNodeName)); @@ -112,10 +102,10 @@ public void testElectOnlyBetweenMasterNodes() throws IOException { */ public void testCustomDefaultMapping() throws Exception { logger.info("--> start master node / non data"); - internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); + internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); logger.info("--> start data node / non master node"); - internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false)); + internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false)); createIndex("test"); assertAcked(client().admin().indices().preparePutMapping("test").setType("_default_").setSource("timestamp", "type=date")); @@ -134,10 +124,10 @@ public void testCustomDefaultMapping() throws Exception { public void testAliasFilterValidation() throws Exception { logger.info("--> start master node / non data"); - internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); + internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), true)); logger.info("--> start data node / non master node"); - internalCluster().startNode(settingsBuilder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false)); + internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), true).put(Node.NODE_MASTER_SETTING.getKey(), false)); assertAcked(prepareCreate("test").addMapping("type1", "{\"type1\" : {\"properties\" : {\"table_a\" : { \"type\" : \"nested\", \"properties\" : {\"field_a\" : { \"type\" : \"keyword\" },\"field_b\" :{ \"type\" : \"keyword\" }}}}}}")); client().admin().indices().prepareAliases().addAlias("test", "a_test", QueryBuilders.nestedQuery("table_a", QueryBuilders.termQuery("table_a.field_b", "y"), ScoreMode.Avg)).get(); diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java index a25e3abe65d50..111b84f981a07 100644 --- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java @@ -35,7 +35,6 @@ import org.elasticsearch.common.inject.Singleton; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -68,17 +67,8 @@ protected Collection> nodePlugins() { return Arrays.asList(TestPlugin.class); } - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); - } - public void testAckedUpdateTask() throws Exception { - Settings settings = Settings.builder() - .put("discovery.type", "local") - .build(); - internalCluster().startNode(settings); + internalCluster().startNode(); ClusterService clusterService = internalCluster().getInstance(ClusterService.class); final AtomicBoolean allNodesAcked = new AtomicBoolean(false); @@ -151,10 +141,7 @@ public void onFailure(String source, Exception e) { } public void testAckedUpdateTaskSameClusterState() throws Exception { - Settings settings = Settings.builder() - .put("discovery.type", "local") - .build(); - internalCluster().startNode(settings); + internalCluster().startNode(); ClusterService clusterService = internalCluster().getInstance(ClusterService.class); final AtomicBoolean allNodesAcked = new AtomicBoolean(false); @@ -222,10 +209,7 @@ public void onFailure(String source, Exception e) { } public void testAckedUpdateTaskNoAckExpected() throws Exception { - Settings settings = Settings.builder() - .put("discovery.type", "local") - .build(); - internalCluster().startNode(settings); + internalCluster().startNode(); ClusterService clusterService = internalCluster().getInstance(ClusterService.class); final AtomicBoolean allNodesAcked = new AtomicBoolean(false); @@ -294,10 +278,7 @@ public void onFailure(String source, Exception e) { } public void testAckedUpdateTaskTimeoutZero() throws Exception { - Settings settings = Settings.builder() - .put("discovery.type", "local") - .build(); - internalCluster().startNode(settings); + internalCluster().startNode(); ClusterService clusterService = internalCluster().getInstance(ClusterService.class); final AtomicBoolean allNodesAcked = new AtomicBoolean(false); @@ -371,11 +352,8 @@ public void onFailure(String source, Exception e) { @TestLogging("_root:debug,org.elasticsearch.action.admin.cluster.tasks:trace") public void testPendingUpdateTask() throws Exception { - Settings settings = Settings.builder() - .put("discovery.type", "local") - .build(); - String node_0 = internalCluster().startNode(settings); - internalCluster().startCoordinatingOnlyNode(settings); + String node_0 = internalCluster().startNode(); + internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node_0); final CountDownLatch block1 = new CountDownLatch(1); @@ -507,7 +485,6 @@ public void onFailure(String source, Exception e) { public void testLocalNodeMasterListenerCallbacks() throws Exception { Settings settings = Settings.builder() - .put("discovery.type", "zen") .put("discovery.zen.minimum_master_nodes", 1) .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "400ms") .put("discovery.initial_state_timeout", "500ms") diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java index d336534a15899..c99187c7a931a 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java @@ -24,9 +24,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -38,7 +36,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoveryStats; import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.discovery.zen.membership.MembershipAction; @@ -46,7 +43,6 @@ import org.elasticsearch.node.Node; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.TestCustomMetaData; -import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BytesTransportRequest; @@ -55,7 +51,6 @@ import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.hamcrest.Matchers; -import org.junit.Before; import java.io.IOException; import java.net.UnknownHostException; @@ -78,32 +73,10 @@ @TestLogging("_root:DEBUG") public class ZenDiscoveryIT extends ESIntegTestCase { - private Version previousMajorVersion; - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); - } - - @Before - public void computePrevMajorVersion() { - Version previousMajor; - // find a GA build whose major version is statesFound = new ArrayList<>(); final CountDownLatch nodesStopped = new CountDownLatch(1); - clusterService.add(new ClusterStateListener() { - @Override - public void clusterChanged(ClusterChangedEvent event) { - statesFound.add(event.state()); - try { - // block until both nodes have stopped to accumulate node failures - nodesStopped.await(); - } catch (InterruptedException e) { - //meh - } + clusterService.add(event -> { + statesFound.add(event.state()); + try { + // block until both nodes have stopped to accumulate node failures + nodesStopped.await(); + } catch (InterruptedException e) { + //meh } }); @@ -189,10 +158,7 @@ public void clusterChanged(ClusterChangedEvent event) { } public void testNodeRejectsClusterStateWithWrongMasterNode() throws Exception { - Settings settings = Settings.builder() - .put("discovery.type", "zen") - .build(); - List nodeNames = internalCluster().startNodesAsync(2, settings).get(); + List nodeNames = internalCluster().startNodesAsync(2).get(); client().admin().cluster().prepareHealth().setWaitForNodes("2").get(); List nonMasterNodes = new ArrayList<>(nodeNames); @@ -303,10 +269,7 @@ public void testDiscoveryStats() throws IOException { " }\n" + "}"; - Settings nodeSettings = Settings.builder() - .put("discovery.type", "zen") // <-- To override the local setting if set externally - .build(); - internalCluster().startNode(nodeSettings); + internalCluster().startNode(); logger.info("--> request node discovery stats"); NodesStatsResponse statsResponse = client().admin().cluster().prepareNodesStats().clear().setDiscovery(true).get(); diff --git a/core/src/test/java/org/elasticsearch/index/TransportIndexFailuresIT.java b/core/src/test/java/org/elasticsearch/index/TransportIndexFailuresIT.java deleted file mode 100644 index 91a18f9c053d7..0000000000000 --- a/core/src/test/java/org/elasticsearch/index/TransportIndexFailuresIT.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index; - -import org.elasticsearch.action.index.IndexAction; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.health.ClusterHealthStatus; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RoutingNodes; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.discovery.zen.fd.FaultDetection; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.transport.TransportService; - -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - -import static java.util.Collections.singleton; -import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; -import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; -import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; -import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; -import static org.hamcrest.Matchers.equalTo; - -/** - * Test failure when index replication actions fail mid-flight - */ -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0) -public class TransportIndexFailuresIT extends ESIntegTestCase { - - private static final Settings nodeSettings = Settings.builder() - .put("discovery.type", "zen") // <-- To override the local setting if set externally - .put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly - .put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // <-- for hitting simulated network failures quickly - .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly - .put("discovery.zen.minimum_master_nodes", 1) - .build(); - - @Override - protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class); - } - - @Override - protected int numberOfShards() { - return 1; - } - - @Override - protected int numberOfReplicas() { - return 1; - } - - public void testNetworkPartitionDuringReplicaIndexOp() throws Exception { - final String INDEX = "testidx"; - - List nodes = internalCluster().startNodesAsync(2, nodeSettings).get(); - - // Create index test with 1 shard, 1 replica and ensure it is green - createIndex(INDEX); - ensureGreen(INDEX); - - // Disable allocation so the replica cannot be reallocated when it fails - Settings s = Settings.builder().put("cluster.routing.allocation.enable", "none").build(); - client().admin().cluster().prepareUpdateSettings().setTransientSettings(s).get(); - - // Determine which node holds the primary shard - ClusterState state = getNodeClusterState(nodes.get(0)); - IndexShardRoutingTable shard = state.getRoutingTable().index(INDEX).shard(0); - String primaryNode; - String replicaNode; - if (shard.getShards().get(0).primary()) { - primaryNode = nodes.get(0); - replicaNode = nodes.get(1); - } else { - primaryNode = nodes.get(1); - replicaNode = nodes.get(0); - } - logger.info("--> primary shard is on {}", primaryNode); - - // Index a document to make sure everything works well - IndexResponse resp = internalCluster().client(primaryNode).prepareIndex(INDEX, "doc").setSource("foo", "bar").get(); - assertThat("document exists on primary node", - internalCluster().client(primaryNode).prepareGet(INDEX, "doc", resp.getId()).setPreference("_only_local").get().isExists(), - equalTo(true)); - assertThat("document exists on replica node", - internalCluster().client(replicaNode).prepareGet(INDEX, "doc", resp.getId()).setPreference("_only_local").get().isExists(), - equalTo(true)); - - // Disrupt the network so indexing requests fail to replicate - logger.info("--> preventing index/replica operations"); - TransportService mockTransportService = internalCluster().getInstance(TransportService.class, primaryNode); - ((MockTransportService) mockTransportService).addFailToSendNoConnectRule( - internalCluster().getInstance(TransportService.class, replicaNode), - singleton(IndexAction.NAME + "[r]") - ); - mockTransportService = internalCluster().getInstance(TransportService.class, replicaNode); - ((MockTransportService) mockTransportService).addFailToSendNoConnectRule( - internalCluster().getInstance(TransportService.class, primaryNode), - singleton(IndexAction.NAME + "[r]") - ); - - logger.info("--> indexing into primary"); - // the replica shard should now be marked as failed because the replication operation will fail - resp = internalCluster().client(primaryNode).prepareIndex(INDEX, "doc").setSource("foo", "baz").get(); - // wait until the cluster reaches an exact yellow state, meaning replica has failed - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(client().admin().cluster().prepareHealth().get().getStatus(), equalTo(ClusterHealthStatus.YELLOW)); - } - }); - assertThat("document should still be indexed and available", - client().prepareGet(INDEX, "doc", resp.getId()).get().isExists(), equalTo(true)); - - state = getNodeClusterState(randomFrom(nodes.toArray(Strings.EMPTY_ARRAY))); - RoutingNodes rn = state.getRoutingNodes(); - logger.info("--> counts: total: {}, unassigned: {}, initializing: {}, relocating: {}, started: {}", - rn.shards(input -> true).size(), - rn.shardsWithState(UNASSIGNED).size(), - rn.shardsWithState(INITIALIZING).size(), - rn.shardsWithState(RELOCATING).size(), - rn.shardsWithState(STARTED).size()); - logger.info("--> unassigned: {}, initializing: {}, relocating: {}, started: {}", - rn.shardsWithState(UNASSIGNED), - rn.shardsWithState(INITIALIZING), - rn.shardsWithState(RELOCATING), - rn.shardsWithState(STARTED)); - - assertThat("only a single shard is now active (replica should be failed and not reallocated)", - rn.shardsWithState(STARTED).size(), equalTo(1)); - } - - private ClusterState getNodeClusterState(String node) { - return internalCluster().client(node).admin().cluster().prepareState().setLocal(true).get().getState(); - } -} diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index fd57a1198e037..0e777cbd97ad7 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -44,7 +44,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.index.store.IndexStore; @@ -59,7 +58,6 @@ import org.elasticsearch.rest.action.admin.cluster.RestClusterStateAction; import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction; import org.elasticsearch.snapshots.mockstore.MockRepository; -import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.InternalTestCluster; @@ -93,13 +91,6 @@ @ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0) public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCase { - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - // TODO only restorePersistentSettingsTest needs this maybe factor out? - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); - } - @Override protected Collection> nodePlugins() { return Arrays.asList(MockRepository.Plugin.class); diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 200ec6ac4b14a..5be1a117cb7e6 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -57,9 +57,9 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.IndexStore; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreTests.java index 014014b432cdb..8251a3b7985ec 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreTests.java @@ -35,7 +35,6 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.repositories.RepositoryVerificationException; import org.elasticsearch.repositories.azure.AzureRepository.Repository; @@ -79,7 +78,8 @@ private static String getContainerName() { protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder().put(super.nodeSettings(nodeOrdinal)) // In snapshot tests, we explicitly disable cloud discovery - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local") + // nocommit - figure out what this means + // .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local") .build(); } diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java index 7ca0aa57d7a7f..fd54c5fadbeb9 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java @@ -26,22 +26,23 @@ import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.node.MockNode; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeValidationException; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.discovery.MockZenPing; import org.elasticsearch.transport.MockTcpTransportPlugin; import org.junit.AfterClass; import org.junit.BeforeClass; import java.io.IOException; import java.nio.file.Path; -import java.util.Collections; +import java.util.Arrays; +import java.util.List; import static org.hamcrest.CoreMatchers.either; import static org.hamcrest.CoreMatchers.equalTo; @@ -63,25 +64,25 @@ public static void createTribes() throws NodeValidationException { Settings baseSettings = Settings.builder() .put(NetworkModule.HTTP_ENABLED.getKey(), false) .put("transport.type", MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local") .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 2) .build(); + final List> mockPlugins = Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class); tribe1 = new TribeClientNode( Settings.builder() .put(baseSettings) .put("cluster.name", "tribe1") .put("node.name", "tribe1_node") .put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), random().nextLong()) - .build(), Collections.singleton(MockTcpTransportPlugin.class)).start(); + .build(), mockPlugins).start(); tribe2 = new TribeClientNode( Settings.builder() .put(baseSettings) .put("cluster.name", "tribe2") .put("node.name", "tribe2_node") .put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), random().nextLong()) - .build(), Collections.singleton(MockTcpTransportPlugin.class)).start(); + .build(), mockPlugins).start(); } @AfterClass @@ -106,11 +107,10 @@ private static void assertTribeNodeSuccessfullyCreated(Settings extraSettings) t .put("transport.type", MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).put("discovery.type", "local") .put("tribe.t1.transport.type", MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME) .put("tribe.t2.transport.type",MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME) - .put("tribe.t1.discovery.type", "local").put("tribe.t2.discovery.type", "local") .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) .put(extraSettings).build(); - try (Node node = new MockNode(settings, Collections.singleton(MockTcpTransportPlugin.class)).start()) { + try (Node node = new MockNode(settings, Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class)).start()) { try (Client client = node.client()) { assertBusy(() -> { ClusterState state = client.admin().cluster().prepareState().clear().setNodes(true).get().getState(); diff --git a/test/framework/src/main/java/org/elasticsearch/node/NodeTests.java b/test/framework/src/main/java/org/elasticsearch/node/NodeTests.java index cf565499a8d3b..ea92473476532 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/NodeTests.java +++ b/test/framework/src/main/java/org/elasticsearch/node/NodeTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.transport.MockTcpTransportPlugin; import java.io.IOException; import java.nio.file.Path; @@ -46,13 +47,12 @@ public void testNodeName() throws IOException { .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), InternalTestCluster.clusterName("single-node-cluster", randomLong())) .put(Environment.PATH_HOME_SETTING.getKey(), tempDir) .put(NetworkModule.HTTP_ENABLED.getKey(), false) - .put("discovery.type", "local") - .put("transport.type", "local") + .put("transport.type", "mock-socket-network") .put(Node.NODE_DATA_SETTING.getKey(), true); if (name != null) { settings.put(Node.NODE_NAME_SETTING.getKey(), name); } - try (Node node = new MockNode(settings.build(), Collections.emptyList())) { + try (Node node = new MockNode(settings.build(), Collections.singleton(MockTcpTransportPlugin.class))) { final Settings nodeSettings = randomBoolean() ? node.settings() : node.getEnvironment().settings(); if (name == null) { assertThat(Node.NODE_NAME_SETTING.get(nodeSettings), equalTo(node.getNodeEnvironment().nodeId().substring(0, 7))); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java index 16647b04a4738..0ece6fad3931d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java @@ -29,8 +29,6 @@ import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.DiscoveryModule; -import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.junit.listeners.LoggingListener; @@ -206,6 +204,11 @@ private Settings addLoggerSettings(Settings externalNodesSettings) { return finalSettings.build(); } + @Override + protected boolean addMockZenPings() { + return false; + } + protected int minExternalNodes() { return 1; } protected int maxExternalNodes() { @@ -243,7 +246,6 @@ public void assertAllShardsOnNodes(String index, String pattern) { protected Settings commonNodeSettings(int nodeOrdinal) { Settings.Builder builder = Settings.builder().put(requiredSettings()); builder.put(NetworkModule.TRANSPORT_TYPE_KEY, randomBoolean() ? "netty3" : "netty4"); // run same transport / disco as external - builder.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen"); return builder.build(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java index 2752e0f2ecadc..f096e662f4ad4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java @@ -45,6 +45,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.test.discovery.MockZenPing; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.MockTcpTransportPlugin; import org.junit.After; @@ -181,7 +182,6 @@ private Node newNode() { .put(ScriptService.SCRIPT_MAX_COMPILATIONS_PER_MINUTE.getKey(), 1000) .put(EsExecutors.PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created .put(NetworkModule.HTTP_ENABLED.getKey(), false) - .put("discovery.type", "local") .put("transport.type", MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME) .put(Node.NODE_DATA_SETTING.getKey(), true) .put(nodeSettings()) // allow test cases to provide their own settings or override these @@ -191,6 +191,10 @@ private Node newNode() { plugins = new ArrayList<>(plugins); plugins.add(MockTcpTransportPlugin.class); } + if (plugins.contains(MockZenPing.TestPlugin.class) == false) { + plugins = new ArrayList<>(plugins); + plugins.add(MockZenPing.TestPlugin.class); + } Node build = new MockNode(settings, plugins); try { build.start(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java index b393498ec89f6..3fd2b024a1d9b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.NodeConfigurationSource; @@ -39,7 +38,7 @@ public class ClusterDiscoveryConfiguration extends NodeConfigurationSource { - static Settings DEFAULT_NODE_SETTINGS = Settings.builder().put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); + static Settings DEFAULT_NODE_SETTINGS = Settings.EMPTY; private static final String IP_ADDR = "127.0.0.1"; final int numOfNodes; diff --git a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java index e5b704b2f2ce1..5602361bb751c 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java +++ b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java @@ -30,10 +30,11 @@ import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.NodeConfigurationSource; -import org.elasticsearch.transport.MockTcpTransport; +import org.elasticsearch.test.discovery.MockZenPing; import org.elasticsearch.transport.MockTcpTransportPlugin; import org.elasticsearch.transport.TransportSettings; @@ -141,7 +142,6 @@ public Settings nodeSettings(int nodeOrdinal) { NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 2 * ((masterNodes ? InternalTestCluster.DEFAULT_HIGH_NUM_MASTER_NODES : 0) + maxNumDataNodes + numClientNodes)) .put(NetworkModule.HTTP_ENABLED.getKey(), false) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local") .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).build(); } @@ -156,12 +156,13 @@ public Settings transportClientSettings() { String nodePrefix = "foobar"; Path baseDir = createTempDir(); + final List> mockPlugins = Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class); InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, baseDir, masterNodes, minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes, - enableHttpPipelining, nodePrefix, Collections.singleton(MockTcpTransportPlugin.class), Function.identity()); + enableHttpPipelining, nodePrefix, mockPlugins, Function.identity()); InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, baseDir, masterNodes, minNumDataNodes, maxNumDataNodes, clusterName2, nodeConfigurationSource, numClientNodes, - enableHttpPipelining, nodePrefix, Collections.singleton(MockTcpTransportPlugin.class), Function.identity()); + enableHttpPipelining, nodePrefix, mockPlugins, Function.identity()); assertClusters(cluster0, cluster1, false); long seed = randomLong(); @@ -288,7 +289,6 @@ public Settings nodeSettings(int nodeOrdinal) { .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), numNodes) .put(NetworkModule.HTTP_ENABLED.getKey(), false) .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local") .put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0).build(); } @@ -297,7 +297,7 @@ public Settings transportClientSettings() { return Settings.builder() .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).build(); } - }, 0, randomBoolean(), "", Collections.singleton(MockTcpTransportPlugin.class), Function.identity()); + }, 0, randomBoolean(), "", Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class), Function.identity()); cluster.beforeTest(random(), 0.0); try { Map> pathsPerRole = new HashMap<>(); From 31e5aea83508f3ed8e1e2d2d634bd19947830382 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 16 Oct 2016 18:50:34 +0200 Subject: [PATCH 06/13] remove local discovery and use a much simpler NoneDiscovery for tribes --- .../elasticsearch/cluster/ClusterState.java | 4 +- .../discovery/DiscoveryModule.java | 5 +- .../discovery/NoneDiscovery.java | 98 ++++ .../discovery/local/LocalDiscovery.java | 422 ------------------ .../org/elasticsearch/tribe/TribeService.java | 2 +- 5 files changed, 102 insertions(+), 429 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/discovery/NoneDiscovery.java delete mode 100644 core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java index e592b5092b7bb..7fc0a4d09b019 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -52,7 +52,6 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.local.LocalDiscovery; import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; import java.io.IOException; @@ -72,8 +71,7 @@ * single thread and controlled by the {@link ClusterService}. After every update the * {@link Discovery#publish} method publishes new version of the cluster state to all other nodes in the * cluster. The actual publishing mechanism is delegated to the {@link Discovery#publish} method and depends on - * the type of discovery. For example, for local discovery it is implemented by the {@link LocalDiscovery#publish} - * method. In the Zen Discovery it is handled in the {@link PublishClusterStateAction#publish} method. The + * the type of discovery. In the Zen Discovery it is handled in the {@link PublishClusterStateAction#publish} method. The * publishing mechanism can be overridden by other discovery. *

* The cluster state implements the {@link Diffable} interface in order to support publishing of cluster state diff --git a/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index 42dfeefb46fad..1e41204d64acf 100644 --- a/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.ExtensionPoint; -import org.elasticsearch.discovery.local.LocalDiscovery; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.ping.ZenPing; @@ -59,7 +58,7 @@ public class DiscoveryModule extends AbstractModule { public DiscoveryModule(Settings settings) { this.settings = settings; - addDiscoveryType("local", LocalDiscovery.class); + addDiscoveryType("none", NoneDiscovery.class); addDiscoveryType("zen", ZenDiscovery.class); addElectMasterService("zen", ElectMasterService.class); } @@ -111,7 +110,7 @@ protected void configure() { throw new IllegalArgumentException("Unknown Discovery type [" + discoveryType + "]"); } - if (discoveryType.equals("local") == false) { + if (discoveryType.equals("none") == false) { String masterServiceTypeKey = ZEN_MASTER_SERVICE_TYPE_SETTING.get(settings); final Class masterService = masterServiceType.get(masterServiceTypeKey); if (masterService == null) { diff --git a/core/src/main/java/org/elasticsearch/discovery/NoneDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/NoneDiscovery.java new file mode 100644 index 0000000000000..87506b7ed8d58 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/discovery/NoneDiscovery.java @@ -0,0 +1,98 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.discovery; + +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.zen.ElectMasterService; + +public class NoneDiscovery extends AbstractLifecycleComponent implements Discovery { + + + private final ClusterService clusterService; + private final DiscoverySettings discoverySettings; + + @Inject + public NoneDiscovery(Settings settings, ClusterService clusterService, ClusterSettings clusterSettings) { + super(settings); + this.clusterService = clusterService; + this.discoverySettings = new DiscoverySettings(settings, clusterSettings); + } + + @Override + public DiscoveryNode localNode() { + return clusterService.localNode(); + } + + @Override + public String nodeDescription() { + return clusterService.getClusterName().value() + "/" + clusterService.localNode().getId(); + } + + @Override + public void setAllocationService(AllocationService allocationService) { + + } + + @Override + public void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener) { + throw new UnsupportedOperationException(); + } + + @Override + public DiscoveryStats stats() { + return null; + } + + @Override + public DiscoverySettings getDiscoverySettings() { + return discoverySettings; + } + + @Override + public void startInitialJoin() { + + } + + @Override + public int getMinimumMasterNodes() { + return ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings); + } + + @Override + protected void doStart() { + + } + + @Override + protected void doStop() { + + } + + @Override + protected void doClose() { + + } +} diff --git a/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java deleted file mode 100644 index 389f5ee03bb06..0000000000000 --- a/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ /dev/null @@ -1,422 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.discovery.local; - -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.Diff; -import org.elasticsearch.cluster.IncompatibleClusterStateVersionException; -import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.allocation.AllocationService; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler; -import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler; -import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.discovery.DiscoveryStats; -import org.elasticsearch.discovery.zen.publish.PendingClusterStateStats; - -import java.util.HashSet; -import java.util.Optional; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; - -import static org.elasticsearch.cluster.ClusterState.Builder; - -public class LocalDiscovery extends AbstractLifecycleComponent implements Discovery { - - private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0]; - - private final ClusterService clusterService; - private AllocationService allocationService; - private final ClusterName clusterName; - - private final DiscoverySettings discoverySettings; - - private volatile boolean master = false; - - private static final ConcurrentMap clusterGroups = ConcurrentCollections.newConcurrentMap(); - - private volatile ClusterState lastProcessedClusterState; - - @Inject - public LocalDiscovery(Settings settings, ClusterService clusterService, ClusterSettings clusterSettings) { - super(settings); - this.clusterName = clusterService.getClusterName(); - this.clusterService = clusterService; - this.discoverySettings = new DiscoverySettings(settings, clusterSettings); - } - - @Override - public void setAllocationService(AllocationService allocationService) { - this.allocationService = allocationService; - } - - @Override - protected void doStart() { - - } - - @Override - public void startInitialJoin() { - synchronized (clusterGroups) { - ClusterGroup clusterGroup = clusterGroups.get(clusterName); - if (clusterGroup == null) { - clusterGroup = new ClusterGroup(); - clusterGroups.put(clusterName, clusterGroup); - } - logger.debug("Connected to cluster [{}]", clusterName); - - Optional current = clusterGroup.members().stream().filter(other -> ( - other.localNode().equals(this.localNode()) || other.localNode().getId().equals(this.localNode().getId()) - )).findFirst(); - if (current.isPresent()) { - throw new IllegalStateException("current cluster group already contains a node with the same id. current " - + current.get().localNode() + ", this node " + localNode()); - } - - clusterGroup.members().add(this); - - LocalDiscovery firstMaster = null; - for (LocalDiscovery localDiscovery : clusterGroup.members()) { - if (localDiscovery.localNode().isMasterNode()) { - firstMaster = localDiscovery; - break; - } - } - - if (firstMaster != null && firstMaster.equals(this)) { - // we are the first master (and the master) - master = true; - final LocalDiscovery master = firstMaster; - clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ClusterStateUpdateTask() { - - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public ClusterState execute(ClusterState currentState) { - DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); - for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) { - nodesBuilder.add(discovery.localNode()); - } - nodesBuilder.localNodeId(master.localNode().getId()).masterNodeId(master.localNode().getId()); - // remove the NO_MASTER block in this case - ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock()); - return ClusterState.builder(currentState).nodes(nodesBuilder).blocks(blocks).build(); - } - - @Override - public void onFailure(String source, Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e); - } - }); - } else if (firstMaster != null) { - // tell the master to send the fact that we are here - final LocalDiscovery master = firstMaster; - firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode() + "])", new ClusterStateUpdateTask() { - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public ClusterState execute(ClusterState currentState) { - DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); - for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) { - nodesBuilder.add(discovery.localNode()); - } - nodesBuilder.localNodeId(master.localNode().getId()).masterNodeId(master.localNode().getId()); - currentState = ClusterState.builder(currentState).nodes(nodesBuilder).build(); - return master.allocationService.reroute(currentState, "node_add"); - } - - @Override - public void onFailure(String source, Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e); - } - - }); - } - } // else, no master node, the next node that will start will fill things in... - } - - @Override - protected void doStop() { - synchronized (clusterGroups) { - ClusterGroup clusterGroup = clusterGroups.get(clusterName); - if (clusterGroup == null) { - logger.warn("Illegal state, should not have an empty cluster group when stopping, I should be there at teh very least..."); - return; - } - clusterGroup.members().remove(this); - if (clusterGroup.members().isEmpty()) { - // no more members, remove and return - clusterGroups.remove(clusterName); - return; - } - - LocalDiscovery firstMaster = null; - for (LocalDiscovery localDiscovery : clusterGroup.members()) { - if (localDiscovery.localNode().isMasterNode()) { - firstMaster = localDiscovery; - break; - } - } - - if (firstMaster != null) { - // if the removed node is the master, make the next one as the master - if (master) { - firstMaster.master = true; - } - - final Set newMembers = new HashSet<>(); - for (LocalDiscovery discovery : clusterGroup.members()) { - newMembers.add(discovery.localNode().getId()); - } - - final LocalDiscovery master = firstMaster; - master.clusterService.submitStateUpdateTask("local-disco-update", new ClusterStateUpdateTask() { - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public ClusterState execute(ClusterState currentState) { - DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode().getId()); - DiscoveryNodes.Delta delta = newNodes.delta(currentState.nodes()); - if (delta.added()) { - logger.warn("No new nodes should be created when a new discovery view is accepted"); - } - // reroute here, so we eagerly remove dead nodes from the routing - ClusterState updatedState = ClusterState.builder(currentState).nodes(newNodes).build(); - return master.allocationService.deassociateDeadNodes(updatedState, true, "node stopped"); - } - - @Override - public void onFailure(String source, Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e); - } - }); - } - } - } - - @Override - protected void doClose() { - } - - @Override - public DiscoveryNode localNode() { - return clusterService.localNode(); - } - - @Override - public String nodeDescription() { - return clusterName.value() + "/" + localNode().getId(); - } - - @Override - public void publish(ClusterChangedEvent clusterChangedEvent, final Discovery.AckListener ackListener) { - if (!master) { - throw new IllegalStateException("Shouldn't publish state when not master"); - } - LocalDiscovery[] members = members(); - if (members.length > 0) { - Set nodesToPublishTo = new HashSet<>(members.length); - for (LocalDiscovery localDiscovery : members) { - if (localDiscovery.master) { - continue; - } - nodesToPublishTo.add(localDiscovery.localNode()); - } - publish(members, clusterChangedEvent, new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener)); - } - } - - @Override - public DiscoveryStats stats() { - return new DiscoveryStats((PendingClusterStateStats)null); - } - - @Override - public DiscoverySettings getDiscoverySettings() { - return discoverySettings; - } - - @Override - public int getMinimumMasterNodes() { - return -1; - } - - private LocalDiscovery[] members() { - ClusterGroup clusterGroup = clusterGroups.get(clusterName); - if (clusterGroup == null) { - return NO_MEMBERS; - } - Queue members = clusterGroup.members(); - return members.toArray(new LocalDiscovery[members.size()]); - } - - private void publish(LocalDiscovery[] members, ClusterChangedEvent clusterChangedEvent, final BlockingClusterStatePublishResponseHandler publishResponseHandler) { - - try { - // we do the marshaling intentionally, to check it works well... - byte[] clusterStateBytes = null; - byte[] clusterStateDiffBytes = null; - - ClusterState clusterState = clusterChangedEvent.state(); - for (final LocalDiscovery discovery : members) { - if (discovery.master) { - continue; - } - ClusterState newNodeSpecificClusterState = null; - synchronized (this) { - // we do the marshaling intentionally, to check it works well... - // check if we published cluster state at least once and node was in the cluster when we published cluster state the last time - if (discovery.lastProcessedClusterState != null && clusterChangedEvent.previousState().nodes().nodeExists(discovery.localNode())) { - // both conditions are true - which means we can try sending cluster state as diffs - if (clusterStateDiffBytes == null) { - Diff diff = clusterState.diff(clusterChangedEvent.previousState()); - BytesStreamOutput os = new BytesStreamOutput(); - diff.writeTo(os); - clusterStateDiffBytes = BytesReference.toBytes(os.bytes()); - } - try { - newNodeSpecificClusterState = discovery.lastProcessedClusterState.readDiffFrom(StreamInput.wrap(clusterStateDiffBytes)).apply(discovery.lastProcessedClusterState); - logger.trace("sending diff cluster state version [{}] with size {} to [{}]", clusterState.version(), clusterStateDiffBytes.length, discovery.localNode().getName()); - } catch (IncompatibleClusterStateVersionException ex) { - logger.warn((Supplier) () -> new ParameterizedMessage("incompatible cluster state version [{}] - resending complete cluster state", clusterState.version()), ex); - } - } - if (newNodeSpecificClusterState == null) { - if (clusterStateBytes == null) { - clusterStateBytes = Builder.toBytes(clusterState); - } - newNodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode()); - } - discovery.lastProcessedClusterState = newNodeSpecificClusterState; - } - final ClusterState nodeSpecificClusterState = newNodeSpecificClusterState; - - nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED); - // ignore cluster state messages that do not include "me", not in the game yet... - if (nodeSpecificClusterState.nodes().getLocalNode() != null) { - assert nodeSpecificClusterState.nodes().getMasterNode() != null : "received a cluster state without a master"; - assert !nodeSpecificClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) : "received a cluster state with a master block"; - - discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ClusterStateUpdateTask() { - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public ClusterState execute(ClusterState currentState) { - if (currentState.supersedes(nodeSpecificClusterState)) { - return currentState; - } - - if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) { - // its a fresh update from the master as we transition from a start of not having a master to having one - logger.debug("got first state from fresh master [{}]", nodeSpecificClusterState.nodes().getMasterNodeId()); - return nodeSpecificClusterState; - } - - ClusterState.Builder builder = ClusterState.builder(nodeSpecificClusterState); - // if the routing table did not change, use the original one - if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) { - builder.routingTable(currentState.routingTable()); - } - if (nodeSpecificClusterState.metaData().version() == currentState.metaData().version()) { - builder.metaData(currentState.metaData()); - } - - return builder.build(); - } - - @Override - public void onFailure(String source, Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e); - publishResponseHandler.onFailure(discovery.localNode(), e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - publishResponseHandler.onResponse(discovery.localNode()); - } - }); - } else { - publishResponseHandler.onResponse(discovery.localNode()); - } - } - - TimeValue publishTimeout = discoverySettings.getPublishTimeout(); - if (publishTimeout.millis() > 0) { - try { - boolean awaited = publishResponseHandler.awaitAllNodes(publishTimeout); - if (!awaited) { - DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes(); - // everyone may have just responded - if (pendingNodes.length > 0) { - logger.warn("timed out waiting for all nodes to process published state [{}] (timeout [{}], pending nodes: {})", clusterState.version(), publishTimeout, pendingNodes); - } - } - } catch (InterruptedException e) { - // ignore & restore interrupt - Thread.currentThread().interrupt(); - } - } - - - } catch (Exception e) { - // failure to marshal or un-marshal - throw new IllegalStateException("Cluster state failed to serialize", e); - } - } - - private class ClusterGroup { - - private Queue members = ConcurrentCollections.newQueue(); - - Queue members() { - return members; - } - } -} diff --git a/core/src/main/java/org/elasticsearch/tribe/TribeService.java b/core/src/main/java/org/elasticsearch/tribe/TribeService.java index fd697340cd7be..7871a0a6f3954 100644 --- a/core/src/main/java/org/elasticsearch/tribe/TribeService.java +++ b/core/src/main/java/org/elasticsearch/tribe/TribeService.java @@ -124,7 +124,7 @@ public static Settings processSettings(Settings settings) { if (!NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.exists(settings)) { sb.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), nodesSettings.size()); } - sb.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local"); // a tribe node should not use zen discovery + sb.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "none"); // a tribe node should not use zen discovery // nothing is going to be discovered, since no master will be elected sb.put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0); if (sb.get("cluster.name") == null) { From 7e954834fcd6660b3f2be488eb949c3e2e35b856 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 17 Oct 2016 08:06:14 +0200 Subject: [PATCH 07/13] don't increase the maximum allocation retries in SharedClusterSnapshotRestoreIT#testDataFileCorruptionDuringRestore If there are many shards, it can cause starvation of allocations --- .../elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 5be1a117cb7e6..bf69f6016de74 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -693,7 +693,7 @@ public void testDataFileCorruptionDuringRestore() throws Exception { assertAcked(client.admin().cluster().preparePutRepository("test-repo") .setType("fs").setSettings(Settings.builder().put("location", repositoryLocation))); - prepareCreate("test-idx").setSettings(Settings.builder().put("index.allocation.max_retries", Integer.MAX_VALUE)).get(); + createIndex("test-idx"); ensureGreen(); logger.info("--> indexing some data"); From bd4e447e0c724aa57ba372b1994cd65009cdb793 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 17 Oct 2016 09:29:57 +0200 Subject: [PATCH 08/13] add MockZenPing where MockNode is used directly --- .../elasticsearch/client/transport/TransportClientIT.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java index 93c5e29208c83..dbb066dcb1b6b 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java @@ -32,12 +32,13 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.test.discovery.MockZenPing; import org.elasticsearch.transport.MockTcpTransportPlugin; import org.elasticsearch.transport.MockTransportClient; import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.Collections; +import java.util.Arrays; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -65,7 +66,7 @@ public void testNodeVersionIsUpdated() throws IOException, NodeValidationExcepti .put(NetworkModule.HTTP_ENABLED.getKey(), false) .put(Node.NODE_DATA_SETTING.getKey(), false) .put("cluster.name", "foobar") - .build(), Collections.singleton(MockTcpTransportPlugin.class)).start()) { + .build(), Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class)).start()) { TransportAddress transportAddress = node.injector().getInstance(TransportService.class).boundAddress().publishAddress(); client.addTransportAddress(transportAddress); // since we force transport clients there has to be one node started that we connect to. From 571d2a9065e07cd17b26eef879b8e94f59b60bac Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 17 Oct 2016 09:36:15 +0200 Subject: [PATCH 09/13] Java docs --- .../java/org/elasticsearch/discovery/NoneDiscovery.java | 6 +++++- .../java/org/elasticsearch/test/discovery/MockZenPing.java | 4 ++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/NoneDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/NoneDiscovery.java index 87506b7ed8d58..91b04ce396ba6 100644 --- a/core/src/main/java/org/elasticsearch/discovery/NoneDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/NoneDiscovery.java @@ -28,9 +28,13 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.zen.ElectMasterService; +/** + * A {@link Discovery} implementation that is used by {@link org.elasticsearch.tribe.TribeService}. This implementation + * doesn't support any clustering features. Most notably {@link #startInitialJoin()} does nothing and + * {@link #publish(ClusterChangedEvent, AckListener)} is not supported. + */ public class NoneDiscovery extends AbstractLifecycleComponent implements Discovery { - private final ClusterService clusterService; private final DiscoverySettings discoverySettings; diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java index 370d1b5347ac8..bac4c92c5fdd4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java @@ -36,6 +36,10 @@ import java.util.Set; import java.util.stream.Collectors; +/** + * A {@link ZenPing} implementation which returns results based on an static in-memory map. This allows pinging + * to be immediate and can be used to speed up tests. + */ public final class MockZenPing extends AbstractLifecycleComponent implements ZenPing { static final Map> activeNodesPerCluster = ConcurrentCollections.newConcurrentMap(); From 5666d6203ec5a848943fe6a13d37fd5397afd5af Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 17 Oct 2016 09:39:46 +0200 Subject: [PATCH 10/13] improve decision messages in MaxRetryAllocationDecider#canAllocate --- .../decider/MaxRetryAllocationDecider.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/MaxRetryAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/MaxRetryAllocationDecider.java index 395d347232939..0e7159c857baa 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/MaxRetryAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/MaxRetryAllocationDecider.java @@ -54,7 +54,8 @@ public MaxRetryAllocationDecider(Settings settings) { @Override public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) { - UnassignedInfo unassignedInfo = shardRouting.unassignedInfo(); + final UnassignedInfo unassignedInfo = shardRouting.unassignedInfo(); + final Decision decision; if (unassignedInfo != null && unassignedInfo.getNumFailedAllocations() > 0) { final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index()); final int maxRetry = SETTING_ALLOCATION_MAX_RETRY.get(indexMetaData.getSettings()); @@ -62,16 +63,21 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat // if we are called via the _reroute API we ignore the failure counter and try to allocate // this improves the usability since people don't need to raise the limits to issue retries since a simple _reroute call is // enough to manually retry. - return allocation.decision(Decision.YES, NAME, "shard has already failed allocating [" + decision = allocation.decision(Decision.YES, NAME, "shard has already failed allocating [" + unassignedInfo.getNumFailedAllocations() + "] times vs. [" + maxRetry + "] retries allowed " + unassignedInfo.toString() + " - retrying once on manual allocation"); } else if (unassignedInfo.getNumFailedAllocations() >= maxRetry) { - return allocation.decision(Decision.NO, NAME, "shard has already failed allocating [" + decision = allocation.decision(Decision.NO, NAME, "shard has already failed allocating [" + unassignedInfo.getNumFailedAllocations() + "] times vs. [" + maxRetry + "] retries allowed " + unassignedInfo.toString() + " - manually call [/_cluster/reroute?retry_failed=true] to retry"); + } else { + decision = allocation.decision(Decision.YES, NAME, "shard has already failed allocating [" + + unassignedInfo.getNumFailedAllocations() + "] times but [" + maxRetry + "] retries are allowed"); } + } else { + decision = allocation.decision(Decision.YES, NAME, "shard has no previous failures"); } - return allocation.decision(Decision.YES, NAME, "shard has no previous failures"); + return decision; } @Override From 53a8967e73a2fcdb416da315043664c243767864 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 17 Oct 2016 12:02:19 +0200 Subject: [PATCH 11/13] remove no commit in AzureSnapshotRestoreTests --- .../repositories/azure/AzureSnapshotRestoreTests.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreTests.java index 8251a3b7985ec..d1050e80adc62 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreTests.java @@ -74,15 +74,6 @@ private static String getContainerName() { return testName.contains(" ") ? Strings.split(testName, " ")[0] : testName; } - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - // In snapshot tests, we explicitly disable cloud discovery - // nocommit - figure out what this means - // .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local") - .build(); - } - @Override public Settings indexSettings() { // During restore we frequently restore index to exactly the same state it was before, that might cause the same From fb8d5bf42c2682785c24f0f41cfee4bbaf3cc520 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 17 Oct 2016 16:40:21 +0200 Subject: [PATCH 12/13] remove left over "local" discovery in testDataFolderAssignmentAndCleaning --- .../elasticsearch/test/test/InternalTestClusterTests.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java index 5602361bb751c..7c001f910d786 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java +++ b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java @@ -27,7 +27,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.plugins.Plugin; @@ -206,7 +205,6 @@ public Settings nodeSettings(int nodeOrdinal) { NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 2 + (masterNodes ? InternalTestCluster.DEFAULT_HIGH_NUM_MASTER_NODES : 0) + maxNumDataNodes + numClientNodes) .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local") .build(); } @Override @@ -220,7 +218,8 @@ public Settings transportClientSettings() { Path baseDir = createTempDir(); InternalTestCluster cluster = new InternalTestCluster(clusterSeed, baseDir, masterNodes, minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes, - enableHttpPipelining, nodePrefix, Collections.singleton(MockTcpTransportPlugin.class), Function.identity()); + enableHttpPipelining, nodePrefix, Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class), + Function.identity()); try { cluster.beforeTest(random(), 0.0); final Map shardNodePaths = new HashMap<>(); From dca8abd1ba4a684c882af592c6e6af51097801f6 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 18 Oct 2016 19:24:24 +0200 Subject: [PATCH 13/13] roll back DiscoveryPlugin changes --- core/src/main/java/org/elasticsearch/node/Node.java | 7 +------ .../java/org/elasticsearch/plugins/DiscoveryPlugin.java | 6 ------ .../java/org/elasticsearch/test/discovery/MockZenPing.java | 7 +++---- 3 files changed, 4 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 7c3303b299c48..34eece0e7c9d7 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -152,7 +152,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -332,11 +331,7 @@ protected Node(final Environment environment, Collection } final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool); modules.add(new NodeModule(this, monitorService)); - final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings); - pluginsService.filterPlugins(DiscoveryPlugin.class).stream().map(p -> p.getZenPings(this.settings)) - .flatMap(Set::stream).forEach(discoveryModule::addZenPing); - - modules.add(discoveryModule); + modules.add(new DiscoveryModule(this.settings)); ClusterModule clusterModule = new ClusterModule(settings, clusterService, pluginsService.filterPlugins(ClusterPlugin.class)); modules.add(clusterModule); diff --git a/core/src/main/java/org/elasticsearch/plugins/DiscoveryPlugin.java b/core/src/main/java/org/elasticsearch/plugins/DiscoveryPlugin.java index 2d41f75e12228..f6174c08d124f 100644 --- a/core/src/main/java/org/elasticsearch/plugins/DiscoveryPlugin.java +++ b/core/src/main/java/org/elasticsearch/plugins/DiscoveryPlugin.java @@ -21,10 +21,6 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.zen.ping.ZenPing; - -import java.util.Collections; -import java.util.Set; /** * An additional extension point for {@link Plugin}s that extends Elasticsearch's discovery functionality. To add an additional @@ -56,6 +52,4 @@ public interface DiscoveryPlugin { default NetworkService.CustomNameResolver getCustomNameResolver(Settings settings) { return null; } - - default Set> getZenPings(Settings settings) { return Collections.emptySet(); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java index bac4c92c5fdd4..d1e38e061bfb3 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java @@ -25,12 +25,12 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.zen.ping.PingContextProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.plugins.DiscoveryPlugin; import org.elasticsearch.plugins.Plugin; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -100,9 +100,8 @@ protected void doClose() { public static class TestPlugin extends Plugin implements DiscoveryPlugin { - @Override - public Set> getZenPings(Settings settings) { - return Collections.singleton(MockZenPing.class); + public void onModule(DiscoveryModule discoveryModule) { + discoveryModule.addZenPing(MockZenPing.class); } } }