Skip to content

Commit

Permalink
Remove seeds depedency for remote cluster settings (#52829)
Browse files Browse the repository at this point in the history
Currently 3 remote cluster settings (ping interval, skip unavailable,
and compression) have a dependency on the seeds setting being
comfigured. With proxy mode, it is now possible that these settings the
seeds setting has not been configured. This commit removes this
dependency and adds new validation for these settings.
  • Loading branch information
Tim-Brooks committed Feb 26, 2020
1 parent 55a830f commit 767f9ad
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ public void testSkipUnavailableDependsOnSeeds() throws IOException {
() -> client().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(),
containsString("missing required setting [cluster.remote.remote1.seeds] " +
"for setting [cluster.remote.remote1.skip_unavailable]"));
containsString("Cannot configure setting [cluster.remote.remote1.skip_unavailable] if remote cluster is " +
"not enabled."));
}

Map<String, Object> settingsMap = new HashMap<>();
Expand All @@ -264,8 +264,8 @@ public void testSkipUnavailableDependsOnSeeds() throws IOException {
ResponseException responseException = expectThrows(ResponseException.class,
() -> client().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(), containsString("missing required setting [cluster.remote.remote1.seeds] " +
"for setting [cluster.remote.remote1.skip_unavailable]"));
assertThat(responseException.getMessage(), containsString("Cannot configure setting " +
"[cluster.remote.remote1.skip_unavailable] if remote cluster is not enabled."));
}

if (randomBoolean()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE,
RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
RemoteConnectionStrategy.REMOTE_CONNECTION_MODE,
ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES,
ProxyConnectionStrategy.PROXY_ADDRESS,
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
ProxyConnectionStrategy.SERVER_NAME,
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_SEEDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,12 @@ public static Setting<Boolean> boolSetting(String key, Setting<Boolean> fallback
return new Setting<>(key, fallbackSetting, b -> parseBoolean(b, key, isFiltered(properties)), properties);
}

public static Setting<Boolean> boolSetting(String key, Setting<Boolean> fallbackSetting, Validator<Boolean> validator,
Property... properties) {
return new Setting<>(new SimpleKey(key), fallbackSetting, fallbackSetting::getRaw, b -> parseBoolean(b, key,
isFiltered(properties)), validator, properties);
}

public static Setting<Boolean> boolSetting(String key, boolean defaultValue, Validator<Boolean> validator, Property... properties) {
return new Setting<>(key, Boolean.toString(defaultValue), b -> parseBoolean(b, key, isFiltered(properties)), validator, properties);
}
Expand Down Expand Up @@ -1629,6 +1635,12 @@ public static Setting<TimeValue> timeSetting(String key, Setting<TimeValue> fall
return new Setting<>(key, fallbackSetting, (s) -> TimeValue.parseTimeValue(s, key), properties);
}

public static Setting<TimeValue> timeSetting(String key, Setting<TimeValue> fallBackSetting, Validator<TimeValue> validator,
Property... properties) {
return new Setting<>(new SimpleKey(key), fallBackSetting, fallBackSetting::getRaw, (s) -> TimeValue.parseTimeValue(s, key),
validator, properties);
}

public static Setting<TimeValue> positiveTimeSetting(String key, TimeValue defaultValue, Property... properties) {
return timeSetting(key, defaultValue, TimeValue.timeValueMillis(0), properties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
/**
* The remote address for the proxy. The connections will be opened to the configured address.
*/
public static final Setting.AffixSetting<String> REMOTE_CLUSTER_ADDRESSES = Setting.affixKeySetting(
public static final Setting.AffixSetting<String> PROXY_ADDRESS = Setting.affixKeySetting(
"cluster.remote.",
"proxy_address",
(ns, key) -> Setting.simpleString(key, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY, s -> {
Expand Down Expand Up @@ -99,7 +99,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
transportService,
connectionManager,
REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings),
PROXY_ADDRESS.getConcreteSettingForNamespace(clusterAlias).get(settings),
SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings));
}

Expand Down Expand Up @@ -141,7 +141,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
}

static Stream<Setting.AffixSetting<?>> enablementSettings() {
return Stream.of(ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES);
return Stream.of(ProxyConnectionStrategy.PROXY_ADDRESS);
}

static Writeable.Reader<RemoteConnectionInfo.ModeInfo> infoReader() {
Expand All @@ -155,7 +155,7 @@ protected boolean shouldOpenMoreConnections() {

@Override
protected boolean strategyMustBeRebuilt(Settings newSettings) {
String address = REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
String address = PROXY_ADDRESS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
int numOfSockets = REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
return numOfSockets != maxNumConnections || configuredAddress.equals(address) == false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void listenForUpdates(ClusterSettings clusterSettings) {
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES,
ProxyConnectionStrategy.PROXY_ADDRESS,
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
ProxyConnectionStrategy.SERVER_NAME);
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -120,8 +122,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
Setting.affixKeySetting(
"search.remote.",
"skip_unavailable",
key -> boolSetting(key, false, Setting.Property.Deprecated, Setting.Property.Dynamic, Setting.Property.NodeScope),
() -> SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
key -> boolSetting(key, false, Setting.Property.Deprecated, Setting.Property.Dynamic, Setting.Property.NodeScope));

public static final SettingUpgrader<Boolean> SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE_UPGRADER = new SettingUpgrader<Boolean>() {

Expand All @@ -141,27 +142,27 @@ public String getKey(final String key) {
Setting.affixKeySetting(
"cluster.remote.",
"skip_unavailable",
key -> boolSetting(
(ns, key) -> boolSetting(
key,
// the default needs to be false when fallback is removed
"_na_".equals(key)
? SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace(key)
: SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSetting(key.replaceAll("^cluster", "search")),
new RemoteConnectionEnabled<>(ns, key),
Setting.Property.Dynamic,
Setting.Property.NodeScope),
() -> SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
Setting.Property.NodeScope));

public static final Setting.AffixSetting<TimeValue> REMOTE_CLUSTER_PING_SCHEDULE = Setting.affixKeySetting(
"cluster.remote.",
"transport.ping_schedule",
key -> timeSetting(key, TransportSettings.PING_SCHEDULE, Setting.Property.Dynamic, Setting.Property.NodeScope),
() -> SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
(ns, key) -> timeSetting(key, TransportSettings.PING_SCHEDULE, new RemoteConnectionEnabled<>(ns, key), Setting.Property.Dynamic,
Setting.Property.NodeScope));

public static final Setting.AffixSetting<Boolean> REMOTE_CLUSTER_COMPRESS = Setting.affixKeySetting(
"cluster.remote.",
"transport.compress",
key -> boolSetting(key, TransportSettings.TRANSPORT_COMPRESS, Setting.Property.Dynamic, Setting.Property.NodeScope),
() -> SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
(ns, key) -> boolSetting(key, TransportSettings.TRANSPORT_COMPRESS,
new RemoteConnectionEnabled<>(ns, key), Setting.Property.Dynamic, Setting.Property.NodeScope));

private final TransportService transportService;
private final Map<String, RemoteClusterConnection> remoteClusters = ConcurrentCollections.newConcurrentMap();
Expand Down Expand Up @@ -436,4 +437,38 @@ public Client getRemoteClusterClient(ThreadPool threadPool, String clusterAlias)
Collection<RemoteClusterConnection> getConnections() {
return remoteClusters.values();
}

private static class RemoteConnectionEnabled<T> implements Setting.Validator<T> {

private final String clusterAlias;
private final String key;

private RemoteConnectionEnabled(String clusterAlias, String key) {
this.clusterAlias = clusterAlias;
this.key = key;
}

@Override
public void validate(T value) {
}

@Override
public void validate(T value, Map<Setting<?>, Object> settings, boolean isPresent) {
if (isPresent && RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, settings) == false) {
throw new IllegalArgumentException("Cannot configure setting [" + key + "] if remote cluster is not enabled.");
}
}

@Override
public Iterator<Setting<?>> settings() {
return Stream.concat(Stream.of(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias)),
settingsStream()).iterator();
}

private Stream<Setting<?>> settingsStream() {
return Arrays.stream(RemoteConnectionStrategy.ConnectionStrategy.values())
.flatMap(strategy -> strategy.getEnablementSettings().get())
.map(as -> as.getConcreteSettingForNamespace(clusterAlias));
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ public int getNumberOfChannels() {
return numberOfChannels;
}

public Supplier<Stream<Setting.AffixSetting<?>>> getEnablementSettings() {
return enablementSettings;
}

public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
return reader.get();
}
Expand Down Expand Up @@ -149,7 +153,7 @@ static RemoteConnectionStrategy buildStrategy(String clusterAlias, TransportServ

static Set<String> getRemoteClusters(Settings settings) {
final Stream<Setting.AffixSetting<?>> enablementSettings = Arrays.stream(ConnectionStrategy.values())
.flatMap(strategy -> strategy.enablementSettings.get());
.flatMap(strategy -> strategy.getEnablementSettings().get());
return enablementSettings.flatMap(s -> getClusterAlias(settings, s)).collect(Collectors.toSet());
}

Expand All @@ -159,7 +163,21 @@ public static boolean isConnectionEnabled(String clusterAlias, Settings settings
List<String> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings);
return seeds.isEmpty() == false;
} else {
String address = ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings);
String address = ProxyConnectionStrategy.PROXY_ADDRESS.getConcreteSettingForNamespace(clusterAlias).get(settings);
return Strings.isEmpty(address) == false;
}
}

@SuppressWarnings("unchecked")
public static boolean isConnectionEnabled(String clusterAlias, Map<Setting<?>, Object> settings) {
ConnectionStrategy mode = (ConnectionStrategy) settings.get(REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias));
if (mode.equals(ConnectionStrategy.SNIFF)) {
List<String> seeds = (List<String>) settings.get(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS
.getConcreteSettingForNamespace(clusterAlias));
return seeds.isEmpty() == false;
} else {
String address = (String) settings.get(ProxyConnectionStrategy.PROXY_ADDRESS
.getConcreteSettingForNamespace(clusterAlias));
return Strings.isEmpty(address) == false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ public void testProxyStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesChange(

Setting<?> modeSetting = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE
.getConcreteSettingForNamespace("cluster-alias");
Setting<?> addressesSetting = ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES
Setting<?> addressesSetting = ProxyConnectionStrategy.PROXY_ADDRESS
.getConcreteSettingForNamespace("cluster-alias");
Setting<?> socketConnections = ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS
.getConcreteSettingForNamespace("cluster-alias");
Expand Down Expand Up @@ -320,7 +320,7 @@ public void testProxyStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesChange(

public void testModeSettingsCannotBeUsedWhenInDifferentMode() {
List<Tuple<Setting.AffixSetting<?>, String>> restrictedSettings = Arrays.asList(
new Tuple<>(ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, "192.168.0.1:8080"),
new Tuple<>(ProxyConnectionStrategy.PROXY_ADDRESS, "192.168.0.1:8080"),
new Tuple<>(ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, "3"));

RemoteConnectionStrategy.ConnectionStrategy sniff = RemoteConnectionStrategy.ConnectionStrategy.SNIFF;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ private Settings buildRandomSettings(String clusterAlias, List<String> addresses

private static Settings buildProxySettings(String clusterAlias, List<String> addresses) {
Settings.Builder builder = Settings.builder();
builder.put(ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).getKey(),
builder.put(ProxyConnectionStrategy.PROXY_ADDRESS.getConcreteSettingForNamespace(clusterAlias).getKey(),
addresses.get(0));
builder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(), "proxy");
return builder.build();
Expand Down

0 comments on commit 767f9ad

Please sign in to comment.