Skip to content

Commit

Permalink
Add connectToNodeAsExtension in TransportService (opensearch-project#…
Browse files Browse the repository at this point in the history
…6866)

* Add connectToNodeAsExtension in TransportService

Signed-off-by: Craig Perkins <cwperx@amazon.com>

* Add to CHANGELOG

Signed-off-by: Craig Perkins <cwperx@amazon.com>

* Update java docstrings

Signed-off-by: Craig Perkins <cwperx@amazon.com>

---------

Signed-off-by: Craig Perkins <cwperx@amazon.com>
(cherry picked from commit b752555)
  • Loading branch information
cwperks committed May 1, 2023
1 parent b8a73f7 commit a5a2cff
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Segment Replication] Add new cluster setting to set replication strategy by default for all indices in cluster. ([#6791](https://github.com/opensearch-project/OpenSearch/pull/6791))
- Enable sort optimization for all NumericTypes ([#6464](https://github.com/opensearch-project/OpenSearch/pull/6464)
- Remove 'cluster_manager' role attachment when using 'node.master' deprecated setting ([#6331](https://github.com/opensearch-project/OpenSearch/pull/6331))
- Add connectToNodeAsExtension in TransportService ([#6866](https://github.com/opensearch-project/OpenSearch/pull/6866))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void close() {}
* Build the service.
*
* @param clusterSettings if non null, the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings
* * updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}.
* * updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}.
*/
public TransportService(
Settings settings,
Expand Down Expand Up @@ -400,6 +400,15 @@ public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
connectToNode(node, (ConnectionProfile) null);
}

/**
* Connect to the specified node as an extension with the default connection profile
*
* @param node the node to connect to
*/
public void connectToNodeAsExtension(DiscoveryNode node, String extensionUniqueId) throws ConnectTransportException {
connectToNodeAsExtension(node, (ConnectionProfile) null, extensionUniqueId);
}

// We are skipping node validation for extensibility as extensionNode and opensearchNode(LocalNode) will have different ephemeral id's
public void connectToExtensionNode(final DiscoveryNode node) {
PlainActionFuture.get(fut -> connectToExtensionNode(node, (ConnectionProfile) null, ActionListener.map(fut, x -> null)));
Expand All @@ -415,6 +424,19 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
PlainActionFuture.get(fut -> connectToNode(node, connectionProfile, ActionListener.map(fut, x -> null)));
}

/**
* Connect to the specified node with the given connection profile
*
* @param node the node to connect to
* @param connectionProfile the connection profile to use when connecting to this node
* @param extensionUniqueIq the id of the extension
*/
public void connectToNodeAsExtension(final DiscoveryNode node, ConnectionProfile connectionProfile, String extensionUniqueIq) {
PlainActionFuture.get(
fut -> connectToNodeAsExtension(node, connectionProfile, extensionUniqueIq, ActionListener.map(fut, x -> null))
);
}

public void connectToExtensionNode(final DiscoveryNode node, ConnectionProfile connectionProfile) {
PlainActionFuture.get(fut -> connectToExtensionNode(node, connectionProfile, ActionListener.map(fut, x -> null)));
}
Expand Down Expand Up @@ -450,6 +472,33 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
connectionManager.connectToNode(node, connectionProfile, connectionValidator(node), listener);
}

/**
* Connect to the specified node as an extension with the given connection profile.
* The ActionListener will be called on the calling thread or the generic thread pool.
*
* @param node the node to connect to
* @param connectionProfile the connection profile to use when connecting to this node
* @param extensionUniqueId the id of the extension
* @param listener the action listener to notify
*/
public void connectToNodeAsExtension(
final DiscoveryNode node,
ConnectionProfile connectionProfile,
String extensionUniqueId,
ActionListener<Void> listener
) {
if (isLocalNode(node)) {
listener.onResponse(null);
return;
}
connectionManager.connectToNode(
node,
connectionProfile,
connectionValidatorForExtensionConnectingToNode(node, extensionUniqueId),
listener
);
}

public void connectToExtensionNode(final DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener<Void> listener) {
if (isLocalNode(node)) {
listener.onResponse(null);
Expand All @@ -473,6 +522,25 @@ public ConnectionManager.ConnectionValidator connectionValidator(DiscoveryNode n
};
}

public ConnectionManager.ConnectionValidator connectionValidatorForExtensionConnectingToNode(
DiscoveryNode node,
String extensionUniqueId
) {
return (newConnection, actualProfile, listener) -> {
// We don't validate cluster names to allow for CCS connections.
threadPool.getThreadContext().putHeader("extension_unique_id", extensionUniqueId);
handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true, ActionListener.map(listener, resp -> {
final DiscoveryNode remote = resp.discoveryNode;

if (node.equals(remote) == false) {
throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote);
}

return null;
}));
};
}

public ConnectionManager.ConnectionValidator extensionConnectionValidator(DiscoveryNode node) {
return (newConnection, actualProfile, listener) -> {
// We don't validate cluster names to allow for CCS connections.
Expand Down

0 comments on commit a5a2cff

Please sign in to comment.