Skip to content

Commit

Permalink
Allow multiple unicast host providers (#31509)
Browse files Browse the repository at this point in the history
Introduces support for multiple host providers, which allows the settings based hosts resolver to be
treated just as any other UnicastHostsProvider. Also introduces the notion of a HostsResolver so
that plugins such as FileBasedDiscovery do not need to create their own thread pool for resolving
hosts, making it easier to add new similar kind of plugins.
  • Loading branch information
ywelsch committed Jun 22, 2018
1 parent 8ae2049 commit f22f91c
Show file tree
Hide file tree
Showing 19 changed files with 224 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public AzureUnicastHostsProvider(Settings settings, AzureComputeService azureCom
* Setting `cloud.azure.refresh_interval` to `0` will disable caching (default).
*/
@Override
public List<TransportAddress> buildDynamicHosts() {
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
if (refreshInterval.millis() != 0) {
if (dynamicHosts != null &&
(refreshInterval.millis() < 0 || (System.currentTimeMillis() - lastRefresh) < refreshInterval.millis())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
}

@Override
public List<TransportAddress> buildDynamicHosts() {
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
return dynamicHosts.getOrRefresh();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int no
protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes, List<List<Tag>> tagsList) {
try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(Settings.EMPTY, nodes, tagsList)) {
AwsEc2UnicastHostsProvider provider = new AwsEc2UnicastHostsProvider(nodeSettings, transportService, plugin.ec2Service);
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts();
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts(null);
logger.debug("--> addresses found: {}", dynamicHosts);
return dynamicHosts;
} catch (IOException e) {
Expand Down Expand Up @@ -307,7 +307,7 @@ protected List<TransportAddress> fetchDynamicNodes() {
}
};
for (int i=0; i<3; i++) {
provider.buildDynamicHosts();
provider.buildDynamicHosts(null);
}
assertThat(provider.fetchCount, is(3));
}
Expand All @@ -324,12 +324,12 @@ protected List<TransportAddress> fetchDynamicNodes() {
}
};
for (int i=0; i<3; i++) {
provider.buildDynamicHosts();
provider.buildDynamicHosts(null);
}
assertThat(provider.fetchCount, is(1));
Thread.sleep(1_000L); // wait for cache to expire
for (int i=0; i<3; i++) {
provider.buildDynamicHosts();
provider.buildDynamicHosts(null);
}
assertThat(provider.fetchCount, is(2));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,17 @@

package org.elasticsearch.discovery.file;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.watcher.ResourceWatcherService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
Expand All @@ -57,47 +39,19 @@
*/
public class FileBasedDiscoveryPlugin extends Plugin implements DiscoveryPlugin {

private static final Logger logger = Loggers.getLogger(FileBasedDiscoveryPlugin.class);

private final Settings settings;
private final Path configPath;
private ExecutorService fileBasedDiscoveryExecutorService;

public FileBasedDiscoveryPlugin(Settings settings, Path configPath) {
this.settings = settings;
this.configPath = configPath;
}

@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, Environment environment,
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
final int concurrentConnects = UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[file_based_discovery_resolve]");
fileBasedDiscoveryExecutorService = EsExecutors.newScaling(
Node.NODE_NAME_SETTING.get(settings) + "/" + "file_based_discovery_resolve",
0,
concurrentConnects,
60,
TimeUnit.SECONDS,
threadFactory,
threadPool.getThreadContext());

return Collections.emptyList();
}

@Override
public void close() throws IOException {
ThreadPool.terminate(fileBasedDiscoveryExecutorService, 0, TimeUnit.SECONDS);
}

@Override
public Map<String, Supplier<UnicastHostsProvider>> getZenHostsProviders(TransportService transportService,
NetworkService networkService) {
return Collections.singletonMap(
"file",
() -> new FileBasedUnicastHostsProvider(
new Environment(settings, configPath), transportService, fileBasedDiscoveryExecutorService));
() -> new FileBasedUnicastHostsProvider(new Environment(settings, configPath)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,19 @@
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.env.Environment;
import org.elasticsearch.transport.TransportService;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.discovery.zen.UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT;
import static org.elasticsearch.discovery.zen.UnicastZenPing.resolveHostsLists;

/**
* An implementation of {@link UnicastHostsProvider} that reads hosts/ports
* from {@link #UNICAST_HOSTS_FILE}.
Expand All @@ -59,23 +52,15 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast

static final String UNICAST_HOSTS_FILE = "unicast_hosts.txt";

private final TransportService transportService;
private final ExecutorService executorService;

private final Path unicastHostsFilePath;

private final TimeValue resolveTimeout;

FileBasedUnicastHostsProvider(Environment environment, TransportService transportService, ExecutorService executorService) {
FileBasedUnicastHostsProvider(Environment environment) {
super(environment.settings());
this.transportService = transportService;
this.executorService = executorService;
this.unicastHostsFilePath = environment.configFile().resolve("discovery-file").resolve(UNICAST_HOSTS_FILE);
this.resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
}

@Override
public List<TransportAddress> buildDynamicHosts() {
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
List<String> hostsList;
try (Stream<String> lines = Files.lines(unicastHostsFilePath)) {
hostsList = lines.filter(line -> line.startsWith("#") == false) // lines starting with `#` are comments
Expand All @@ -90,21 +75,8 @@ public List<TransportAddress> buildDynamicHosts() {
hostsList = Collections.emptyList();
}

final List<TransportAddress> dynamicHosts = new ArrayList<>();
try {
dynamicHosts.addAll(resolveHostsLists(
executorService,
logger,
hostsList,
1,
transportService,
resolveTimeout));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

final List<TransportAddress> dynamicHosts = hostsResolver.resolveHosts(hostsList, 1);
logger.debug("[discovery-file] Using dynamic discovery nodes {}", dynamicHosts);

return dynamicHosts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
Expand Down Expand Up @@ -123,8 +125,10 @@ public void testUnicastHostsDoesNotExist() throws Exception {
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.build();
final Environment environment = TestEnvironment.newEnvironment(settings);
final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(environment, transportService, executorService);
final List<TransportAddress> addresses = provider.buildDynamicHosts();
final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(environment);
final List<TransportAddress> addresses = provider.buildDynamicHosts((hosts, limitPortCounts) ->
UnicastZenPing.resolveHostsLists(executorService, logger, hosts, limitPortCounts, transportService,
TimeValue.timeValueSeconds(10)));
assertEquals(0, addresses.size());
}

Expand Down Expand Up @@ -163,6 +167,8 @@ private List<TransportAddress> setupAndRunHostProvider(final List<String> hostEn
}

return new FileBasedUnicastHostsProvider(
new Environment(settings, configPath), transportService, executorService).buildDynamicHosts();
new Environment(settings, configPath)).buildDynamicHosts((hosts, limitPortCounts) ->
UnicastZenPing.resolveHostsLists(executorService, logger, hosts, limitPortCounts, transportService,
TimeValue.timeValueSeconds(10)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public GceUnicastHostsProvider(Settings settings, GceInstancesService gceInstanc
* Information can be cached using `cloud.gce.refresh_interval` property if needed.
*/
@Override
public List<TransportAddress> buildDynamicHosts() {
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
// We check that needed properties have been set
if (this.project == null || this.project.isEmpty() || this.zones == null || this.zones.isEmpty()) {
throw new IllegalArgumentException("one or more gce discovery settings are missing. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ protected List<TransportAddress> buildDynamicNodes(GceInstancesServiceImpl gceIn
GceUnicastHostsProvider provider = new GceUnicastHostsProvider(nodeSettings, gceInstancesService,
transportService, new NetworkService(Collections.emptyList()));

List<TransportAddress> dynamicHosts = provider.buildDynamicHosts();
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts(null);
logger.info("--> addresses found: {}", dynamicHosts);
return dynamicHosts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.FaultDetection;
import org.elasticsearch.discovery.zen.SettingsBasedHostsProvider;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.env.Environment;
Expand Down Expand Up @@ -357,7 +358,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ZenDiscovery.MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING,
ZenDiscovery.MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING,
ZenDiscovery.MAX_PENDING_CLUSTER_STATES_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING,
SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT,
SearchService.DEFAULT_KEEPALIVE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.discovery.single.SingleNodeDiscovery;
import org.elasticsearch.discovery.zen.SettingsBasedHostsProvider;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.plugins.DiscoveryPlugin;
Expand All @@ -42,13 +44,15 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* A module for loading classes for node discovery.
Expand All @@ -57,18 +61,18 @@ public class DiscoveryModule {

public static final Setting<String> DISCOVERY_TYPE_SETTING =
new Setting<>("discovery.type", "zen", Function.identity(), Property.NodeScope);
public static final Setting<Optional<String>> DISCOVERY_HOSTS_PROVIDER_SETTING =
new Setting<>("discovery.zen.hosts_provider", (String)null, Optional::ofNullable, Property.NodeScope);
public static final Setting<List<String>> DISCOVERY_HOSTS_PROVIDER_SETTING =
Setting.listSetting("discovery.zen.hosts_provider", Collections.emptyList(), Function.identity(), Property.NodeScope);

private final Discovery discovery;

public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService,
ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,
AllocationService allocationService) {
final UnicastHostsProvider hostsProvider;
final Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators = new ArrayList<>();
Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
final Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
hostProviders.put("settings", () -> new SettingsBasedHostsProvider(settings, transportService));
for (DiscoveryPlugin plugin : plugins) {
plugin.getZenHostsProviders(transportService, networkService).entrySet().forEach(entry -> {
if (hostProviders.put(entry.getKey(), entry.getValue()) != null) {
Expand All @@ -80,17 +84,32 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
joinValidators.add(joinValidator);
}
}
Optional<String> hostsProviderName = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings);
if (hostsProviderName.isPresent()) {
Supplier<UnicastHostsProvider> hostsProviderSupplier = hostProviders.get(hostsProviderName.get());
if (hostsProviderSupplier == null) {
throw new IllegalArgumentException("Unknown zen hosts provider [" + hostsProviderName.get() + "]");
}
hostsProvider = Objects.requireNonNull(hostsProviderSupplier.get());
} else {
hostsProvider = Collections::emptyList;
List<String> hostsProviderNames = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings);
// for bwc purposes, add settings provider even if not explicitly specified
if (hostsProviderNames.contains("settings") == false) {
List<String> extendedHostsProviderNames = new ArrayList<>();
extendedHostsProviderNames.add("settings");
extendedHostsProviderNames.addAll(hostsProviderNames);
hostsProviderNames = extendedHostsProviderNames;
}

final Set<String> missingProviderNames = new HashSet<>(hostsProviderNames);
missingProviderNames.removeAll(hostProviders.keySet());
if (missingProviderNames.isEmpty() == false) {
throw new IllegalArgumentException("Unknown zen hosts providers " + missingProviderNames);
}

List<UnicastHostsProvider> filteredHostsProviders = hostsProviderNames.stream()
.map(hostProviders::get).map(Supplier::get).collect(Collectors.toList());

final UnicastHostsProvider hostsProvider = hostsResolver -> {
final List<TransportAddress> addresses = new ArrayList<>();
for (UnicastHostsProvider provider : filteredHostsProviders) {
addresses.addAll(provider.buildDynamicHosts(hostsResolver));
}
return Collections.unmodifiableList(addresses);
};

Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
discoveryTypes.put("zen",
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
Expand Down
Loading

0 comments on commit f22f91c

Please sign in to comment.