Skip to content

Commit

Permalink
Allow proxy mode server name to be configured (#50951)
Browse files Browse the repository at this point in the history
Currently, proxy mode allows a remote cluster connection to be setup by
expecting all open connections to be routed through an intermediate
proxy. The proxy must use some logic to ensure that the connections end
up on the correct remote cluster. One mechanism provided is that the
default distribution TLS implementations will forward the host component
of the configured address to the remote connection using the SNI
extension. This is limiting as it requires that the proxy be configured
in a way that always uses a valid hostname as the proxy address.

Instead, this commit adds an additional setting to allow the server_name
to be configured independently. This allows the proxy address to be
specified as a IP literal, but the server_name specified as an arbitrary
string which still must be a valid hostname. It also decouples the
server_name from the requirement of being a DNS resolvable domain.
  • Loading branch information
Tim-Brooks committed Jan 14, 2020
1 parent 1fe2d76 commit 6e7478b
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,11 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteConnectionStrategy.REMOTE_CONNECTION_MODE,
ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES,
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
ProxyConnectionStrategy.INCLUDE_SERVER_NAME,
ProxyConnectionStrategy.SERVER_NAME,
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_SEEDS,
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategy.SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER,
ProxyConnectionStrategy.SERVER_NAME,
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,13 @@

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static org.elasticsearch.common.settings.Setting.boolSetting;
import static org.elasticsearch.common.settings.Setting.intSetting;

public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
Expand All @@ -75,12 +72,12 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
Setting.Property.Dynamic, Setting.Property.NodeScope));

/**
* Whether to include the hostname as a server_name attribute
* A configurable server_name attribute
*/
public static final Setting.AffixSetting<Boolean> INCLUDE_SERVER_NAME = Setting.affixKeySetting(
public static final Setting.AffixSetting<String> SERVER_NAME = Setting.affixKeySetting(
"cluster.remote.",
"include_server_name",
(ns, key) -> boolSetting(key, false, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY),
"server_name",
(ns, key) -> Setting.simpleString(key, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY),
Setting.Property.Dynamic, Setting.Property.NodeScope));

static final int CHANNELS_PER_CONNECTION = 1;
Expand All @@ -89,9 +86,8 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
private static final Logger logger = LogManager.getLogger(ProxyConnectionStrategy.class);

private final int maxNumConnections;
private final AtomicLong counter = new AtomicLong(0);
private final String configuredAddress;
private final boolean includeServerName;
private final String configuredServerName;
private final Supplier<TransportAddress> address;
private final AtomicReference<ClusterName> remoteClusterName = new AtomicReference<>();
private final ConnectionManager.ConnectionValidator clusterNameValidator;
Expand All @@ -104,28 +100,28 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
connectionManager,
REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings),
INCLUDE_SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings));
SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings));
}

ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumConnections, String configuredAddress) {
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress,
() -> resolveAddress(configuredAddress), false);
() -> resolveAddress(configuredAddress), null);
}

ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumConnections, String configuredAddress, boolean includeServerName) {
int maxNumConnections, String configuredAddress, String configuredServerName) {
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress,
() -> resolveAddress(configuredAddress), includeServerName);
() -> resolveAddress(configuredAddress), configuredServerName);
}

ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumConnections, String configuredAddress, Supplier<TransportAddress> address,
boolean includeServerName) {
String configuredServerName) {
super(clusterAlias, transportService, connectionManager);
this.maxNumConnections = maxNumConnections;
this.configuredAddress = configuredAddress;
this.includeServerName = includeServerName;
this.configuredServerName = configuredServerName;
assert Strings.isEmpty(configuredAddress) == false : "Cannot use proxy connection strategy with no configured addresses";
this.address = address;
this.clusterNameValidator = (newConnection, actualProfile, listener) ->
Expand Down Expand Up @@ -217,10 +213,10 @@ public void onFailure(Exception e) {
for (int i = 0; i < remaining; ++i) {
String id = clusterAlias + "#" + resolved;
Map<String, String> attributes;
if (includeServerName) {
attributes = Collections.singletonMap("server_name", resolved.address().getHostString());
} else {
if (Strings.isNullOrEmpty(configuredServerName)) {
attributes = Collections.emptyMap();
} else {
attributes = Collections.singletonMap("server_name", configuredServerName);
}
DiscoveryNode node = new DiscoveryNode(id, resolved, attributes, DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT.minimumCompatibilityVersion());
Expand Down Expand Up @@ -252,12 +248,6 @@ public void onFailure(Exception e) {
}
}

private TransportAddress nextAddress(List<TransportAddress> resolvedAddresses) {
long curr;
while ((curr = counter.getAndIncrement()) == Long.MIN_VALUE) ;
return resolvedAddresses.get(Math.toIntExact(Math.floorMod(curr, (long) resolvedAddresses.size())));
}

private static TransportAddress resolveAddress(String address) {
return new TransportAddress(parseConfiguredAddress(address));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void listenForUpdates(ClusterSettings clusterSettings) {
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES,
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
ProxyConnectionStrategy.INCLUDE_SERVER_NAME);
ProxyConnectionStrategy.SERVER_NAME);
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.transport;

import org.elasticsearch.Version;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -132,12 +131,11 @@ public void writeTo(StreamOutput out) throws IOException {
.stream()
.map(
s -> {
final Tuple<String, Integer> hostPort = RemoteConnectionStrategy.parseHostPort(s);
assert hostPort.v2() != null : s;
final String host = RemoteConnectionStrategy.parseHost(s);
final int port = RemoteConnectionStrategy.parsePort(s);
try {
return new TransportAddress(
InetAddress.getByAddress(hostPort.v1(), TransportAddress.META_ADDRESS.getAddress()),
hostPort.v2());
InetAddress.getByAddress(host, TransportAddress.META_ADDRESS.getAddress()), port);
} catch (final UnknownHostException e) {
throw new AssertionError(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -170,11 +169,9 @@ private static <T> Stream<String> getClusterAlias(Settings settings, Setting.Aff
return allConcreteSettings.map(affixSetting::getNamespace);
}

static InetSocketAddress parseConfiguredAddress(String remoteHost) {
final Tuple<String, Integer> hostPort = parseHostPort(remoteHost);
final String host = hostPort.v1();
assert hostPort.v2() != null : remoteHost;
final int port = hostPort.v2();
static InetSocketAddress parseConfiguredAddress(String configuredAddress) {
final String host = parseHost(configuredAddress);
final int port = parsePort(configuredAddress);
InetAddress hostAddress;
try {
hostAddress = InetAddress.getByName(host);
Expand All @@ -184,10 +181,8 @@ static InetSocketAddress parseConfiguredAddress(String remoteHost) {
return new InetSocketAddress(hostAddress, port);
}

static Tuple<String, Integer> parseHostPort(final String remoteHost) {
final String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost));
final int port = parsePort(remoteHost);
return Tuple.tuple(host, port);
static String parseHost(final String configuredAddress) {
return configuredAddress.substring(0, indexOfPortSeparator(configuredAddress));
}

static int parsePort(String remoteHost) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ private static DiscoveryNode resolveSeedNode(String clusterAlias, String address
Version.CURRENT.minimumCompatibilityVersion());
} else {
TransportAddress transportAddress = new TransportAddress(parseConfiguredAddress(proxyAddress));
String hostName = address.substring(0, indexOfPortSeparator(address));
String hostName = RemoteConnectionStrategy.parseHost(proxyAddress);
return new DiscoveryNode("", clusterAlias + "#" + address, UUIDs.randomBase64UUID(), hostName, address,
transportAddress, Collections.singletonMap("server_name", hostName), DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT.minimumCompatibilityVersion());
Expand All @@ -498,14 +498,6 @@ static Predicate<DiscoveryNode> getNodePredicate(Settings settings) {
return DEFAULT_NODE_PREDICATE;
}

private static int indexOfPortSeparator(String remoteHost) {
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
if (portSeparator == -1 || portSeparator == remoteHost.length()) {
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead");
}
return portSeparator;
}

private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, DiscoveryNode node) {
if (proxyAddress == null || proxyAddress.isEmpty()) {
return node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void testProxyStrategyWillOpenNewConnectionsOnDisconnect() throws Excepti

try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), false)) {
numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), null)) {
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));

Expand Down Expand Up @@ -206,7 +206,7 @@ public void testClusterNameValidationPreventConnectingToDifferentClusters() thro

try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), false)) {
numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), null)) {
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));

Expand Down Expand Up @@ -255,7 +255,7 @@ public void testProxyStrategyWillResolveAddressesEachConnect() throws Exception
int numOfConnections = randomIntBetween(4, 8);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, address.toString(), addressSupplier, false)) {
numOfConnections, address.toString(), addressSupplier, null)) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();
Expand Down Expand Up @@ -357,13 +357,13 @@ public void testServerNameAttributes() {
localService.start();
localService.acceptIncomingRequests();

String serverName = "localhost:" + address1.getPort();
String address = "localhost:" + address1.getPort();

ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
int numOfConnections = randomIntBetween(4, 8);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, serverName, true)) {
numOfConnections, address, "localhost")) {
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));

PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
Expand Down

0 comments on commit 6e7478b

Please sign in to comment.