Skip to content

Commit

Permalink
Support Sniff mode for RCS (#93893)
Browse files Browse the repository at this point in the history
This PR adds sniff mode support for RCS. It's achieved by having
alternative handshake action and nodes action to return DiscoveryNode
with remote cluster server address instead of the main transport
address. The changes are only effective to the new model and how
existing remote cluster model works is fully maintained. REST tests are
updated to random between proxy and sniff mode.

An additional benefit of using the alternative handshake action is that
we can remove the special handling of handshake action on the FC side to
ensure all actions must be allowed by the remote access API key. This
will be addressed in a separate PR.
  • Loading branch information
ywangd committed Feb 28, 2023
1 parent a1ef513 commit 9d6c443
Show file tree
Hide file tree
Showing 20 changed files with 825 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ private static void configureRemoteCluster() throws IOException {
"name": "remote_access_key",
"role_descriptors": {
"role": {
"cluster": ["cluster:monitor/state"],
"cluster": ["cluster:internal/remote_cluster/handshake", "cluster:internal/remote_cluster/nodes"],
"index": [
{
"names": ["*"],
Expand All @@ -220,11 +220,16 @@ private static void configureRemoteCluster() throws IOException {
final Map<String, Object> apiKeyMap = responseAsMap(createApiKeyResponse);
final String encodedRemoteAccessApiKey = (String) apiKeyMap.get("encoded");

final Settings remoteClusterSettings = Settings.builder()
.put("cluster.remote." + REMOTE_CLUSTER_NAME + ".mode", "proxy")
.put("cluster.remote." + REMOTE_CLUSTER_NAME + ".proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0))
.put("cluster.remote." + REMOTE_CLUSTER_NAME + ".authorization", encodedRemoteAccessApiKey)
.build();
final Settings.Builder builder = Settings.builder()
.put("cluster.remote." + REMOTE_CLUSTER_NAME + ".authorization", encodedRemoteAccessApiKey);
if (randomBoolean()) {
builder.put("cluster.remote." + REMOTE_CLUSTER_NAME + ".mode", "proxy")
.put("cluster.remote." + REMOTE_CLUSTER_NAME + ".proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0));
} else {
builder.put("cluster.remote." + REMOTE_CLUSTER_NAME + ".mode", "sniff")
.putList("cluster.remote." + REMOTE_CLUSTER_NAME + ".seeds", fulfillingCluster.getRemoteClusterServerEndpoint(0));
}
final Settings remoteClusterSettings = builder.build();

final Request request = new Request("PUT", "/_cluster/settings");
request.setJsonEntity("{ \"persistent\":" + Strings.toString(remoteClusterSettings) + "}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.admin.cluster.node.usage.NodesUsageAction;
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodesUsageAction;
import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction;
import org.elasticsearch.action.admin.cluster.remote.RemoteInfoAction;
import org.elasticsearch.action.admin.cluster.remote.TransportRemoteInfoAction;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryAction;
Expand Down Expand Up @@ -578,6 +579,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(MainAction.INSTANCE, TransportMainAction.class);
actions.register(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class);
actions.register(RemoteInfoAction.INSTANCE, TransportRemoteInfoAction.class);
actions.register(RemoteClusterNodesAction.INSTANCE, RemoteClusterNodesAction.TransportAction.class);
actions.register(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
actions.register(NodesUsageAction.INSTANCE, TransportNodesUsageAction.class);
actions.register(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.remote;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportInfo;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE;
import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED;

public class RemoteClusterNodesAction extends ActionType<RemoteClusterNodesAction.Response> {

public static final RemoteClusterNodesAction INSTANCE = new RemoteClusterNodesAction();
public static final String NAME = "cluster:internal/remote_cluster/nodes";

public RemoteClusterNodesAction() {
super(NAME, RemoteClusterNodesAction.Response::new);
}

public static class Request extends ActionRequest {

public static final Request INSTANCE = new Request();

public Request() {}

public Request(StreamInput in) throws IOException {
super(in);
}

@Override
public ActionRequestValidationException validate() {
return null;
}
}

public static class Response extends ActionResponse {

private final List<DiscoveryNode> nodes;

public Response(List<DiscoveryNode> nodes) {
this.nodes = nodes;
}

public Response(StreamInput in) throws IOException {
super(in);
this.nodes = in.readList(DiscoveryNode::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(nodes);
}

public List<DiscoveryNode> getNodes() {
return nodes;
}
}

public static class TransportAction extends HandledTransportAction<Request, Response> {

private final TransportService transportService;

@Inject
public TransportAction(TransportService transportService, ActionFilters actionFilters) {
super(RemoteClusterNodesAction.NAME, transportService, actionFilters, Request::new);
this.transportService = transportService;
}

@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
final NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.clear();
nodesInfoRequest.addMetrics(NodesInfoRequest.Metric.SETTINGS.metricName(), NodesInfoRequest.Metric.TRANSPORT.metricName());
final ThreadContext threadContext = transportService.getThreadPool().getThreadContext();
try (var ignore = threadContext.stashContext()) {
threadContext.markAsSystemContext();
transportService.sendRequest(
transportService.getLocalNode(),
NodesInfoAction.NAME,
nodesInfoRequest,
new ActionListenerResponseHandler<>(ActionListener.wrap(response -> {
final List<DiscoveryNode> remoteClusterNodes = response.getNodes().stream().map(nodeInfo -> {
if (false == REMOTE_CLUSTER_SERVER_ENABLED.get(nodeInfo.getSettings())) {
return null;
}
final Map<String, BoundTransportAddress> profileAddresses = nodeInfo.getInfo(TransportInfo.class)
.getProfileAddresses();
final BoundTransportAddress remoteClusterServerAddress = profileAddresses.get(REMOTE_CLUSTER_PROFILE);
assert remoteClusterServerAddress != null
: "remote cluster server is enabled but corresponding transport profile is missing";
return nodeInfo.getNode().withTransportAddress(remoteClusterServerAddress.publishAddress());
}).filter(Objects::nonNull).toList();
listener.onResponse(new Response(remoteClusterNodes));
}, listener::onFailure), NodesInfoResponse::new)
);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,21 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

public DiscoveryNode withTransportAddress(TransportAddress transportAddress) {
return new DiscoveryNode(
getName(),
getId(),
getEphemeralId(),
getHostName(),
getHostAddress(),
transportAddress,
getAttributes(),
getRoles(),
getVersion(),
getExternalId()
);
}

/**
* Deduplicate the given string that must be a node id or node name.
* This method accepts {@code null} input for which it returns {@code null} for convenience when used in deserialization code.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,23 +166,31 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
this.configuredServerName = configuredServerName;
assert Strings.isEmpty(configuredAddress) == false : "Cannot use proxy connection strategy with no configured addresses";
this.address = address;
this.clusterNameValidator = (newConnection, actualProfile, listener) -> transportService.handshake(
RemoteConnectionManager.wrapConnectionWithClusterAlias(newConnection, clusterAlias),
actualProfile.getHandshakeTimeout(),
cn -> true,
listener.map(resp -> {
ClusterName remote = resp.getClusterName();
if (remoteClusterName.compareAndSet(null, remote)) {
return null;
} else {
if (remoteClusterName.get().equals(remote) == false) {
DiscoveryNode node = newConnection.getNode();
throw new ConnectTransportException(node, "handshake failed. unexpected remote cluster name " + remote);
this.clusterNameValidator = (newConnection, actualProfile, listener) -> {
assert actualProfile.getTransportProfile().equals(connectionManager.getConnectionProfile().getTransportProfile())
: "transport profile must be consistent between the connection manager and the actual profile";
transportService.handshake(
RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(
newConnection,
clusterAlias,
actualProfile.getTransportProfile()
),
actualProfile.getHandshakeTimeout(),
cn -> true,
listener.map(resp -> {
ClusterName remote = resp.getClusterName();
if (remoteClusterName.compareAndSet(null, remote)) {
return null;
} else {
if (remoteClusterName.get().equals(remote) == false) {
DiscoveryNode node = newConnection.getNode();
throw new ConnectTransportException(node, "handshake failed. unexpected remote cluster name " + remote);
}
return null;
}
return null;
}
})
);
})
);
};
}

static Stream<Setting.AffixSetting<?>> enablementSettings() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
Expand All @@ -22,7 +23,11 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE;

/**
* Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the
Expand Down Expand Up @@ -110,24 +115,42 @@ void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we stash any context here since this is an internal execution and should not leak any existing context information
threadContext.markAsSystemContext();

final ClusterStateRequest request = new ClusterStateRequest();
request.clear();
request.nodes(true);
request.local(true); // run this on the node that gets the request it's as good as any other
Transport.Connection connection = remoteConnectionManager.getAnyRemoteConnection();
transportService.sendRequest(
connection,
ClusterStateAction.NAME,
request,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
contextPreservingActionListener.map(response -> response.getState().nodes()::get),
ClusterStateResponse::new
)
);

// Use different action to collect nodes information depending on the connection model
if (REMOTE_CLUSTER_PROFILE.equals(remoteConnectionManager.getConnectionProfile().getTransportProfile())) {
transportService.sendRequest(
connection,
RemoteClusterNodesAction.NAME,
RemoteClusterNodesAction.Request.INSTANCE,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(contextPreservingActionListener.map(response -> {
final Map<String, DiscoveryNode> nodeLookup = response.getNodes()
.stream()
.collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, Function.identity()));
return nodeLookup::get;
}), RemoteClusterNodesAction.Response::new)
);
} else {
final ClusterStateRequest request = new ClusterStateRequest();
request.clear();
request.nodes(true);
request.local(true); // run this on the node that gets the request it's as good as any other

transportService.sendRequest(
connection,
ClusterStateAction.NAME,
request,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
contextPreservingActionListener.map(response -> response.getState().nodes()::get),
ClusterStateResponse::new
)
);
}
}
};

try {
// just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener
// this will cause some back pressure on the search end and eventually will cause rejections but that's fine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Build;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.CountDownActionListener;
Expand Down Expand Up @@ -53,7 +54,7 @@
*/
public final class RemoteClusterService extends RemoteClusterAware implements Closeable {

private final Logger logger = LogManager.getLogger(RemoteClusterService.class);
private static final Logger logger = LogManager.getLogger(RemoteClusterService.class);

/**
* The initial connect timeout for remote cluster connections
Expand Down Expand Up @@ -125,6 +126,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
key -> Setting.simpleString(key, v -> {}, Setting.Property.Dynamic, Setting.Property.NodeScope, Setting.Property.Filtered)
);

public static final String REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME = "cluster:internal/remote_cluster/handshake";

private final boolean enabled;

public boolean isEnabled() {
Expand All @@ -138,6 +141,10 @@ public boolean isEnabled() {
super(settings);
this.enabled = DiscoveryNode.isRemoteClusterClient(settings);
this.transportService = transportService;

if (RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED.get(settings)) {
registerRemoteClusterHandshakeRequestHandler(transportService);
}
}

public DiscoveryNode getLocalNode() {
Expand Down Expand Up @@ -450,6 +457,36 @@ Collection<RemoteClusterConnection> getConnections() {
return remoteClusters.values();
}

static void registerRemoteClusterHandshakeRequestHandler(TransportService transportService) {
transportService.registerRequestHandler(
REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME,
ThreadPool.Names.SAME,
false,
false,
TransportService.HandshakeRequest::new,
(request, channel, task) -> {
if (false == RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE.equals(channel.getProfileName())) {
throw new IllegalArgumentException(
Strings.format(
"remote cluster handshake action requires channel profile to be [%s], but got [%s]",
RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE,
channel.getProfileName()
)
);
}
logger.trace("handling remote cluster handshake request");
channel.sendResponse(
new TransportService.HandshakeResponse(
transportService.getLocalNode().getVersion(),
Build.CURRENT.hash(),
transportService.getLocalNode().withTransportAddress(transportService.boundRemoteAccessAddress().publishAddress()),
transportService.clusterName
)
);
}
);
}

private static class RemoteConnectionEnabled<T> implements Setting.Validator<T> {

private final String clusterAlias;
Expand Down

0 comments on commit 9d6c443

Please sign in to comment.