Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to enable pings for specific remote clusters #34753

Merged
merged 16 commits into from Oct 31, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/reference/modules/remote-clusters.asciidoc
Expand Up @@ -152,6 +152,14 @@ PUT _cluster/settings
by default, but they can selectively be made optional by setting this setting
to `true`.

`cluster.remote.${cluster_alias}.transport.ping_schedule`::

Schedule a regular application-level ping message to ensure that transport
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to say that this setting sets the time between pings, otherwise it's not clear what values other than -1 mean. I also think that the "defaults to ... which defaults to ..." in the last sentence might cause confusion. I drafted an alternative:

Sets the time interval between regular application-level ping messages that are sent to ensure that transport connections to nodes belonging to remote clusters are kept alive. If set to -1, application-level ping messages to this remote cluster are not sent. If unset, application-level ping messages are sent according to the global transport.ping_schedule setting, which defaults to -1 meaning that pings are not sent.

I'm not sure this is correct, however. If we set

transport.ping_schedule: 5s
cluster.remote.foo.transport.ping_schedule: -1

Does this disable pings to the foo remote? Should it? I think it'd be useful to be able to do so. I haven't dug into the implementation but there's no test for this case as far as I can see.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it sounds great, I was hoping you would help out rephrasing the docs, thanks a lot for that. The behaviour should be what you describe with transport.ping_schedule as a fallback, but I will add a test for this specific case that you mention, it's a good point.

connections to nodes belonging to remote clusters are kept alive. Defaults
to the value that the global `transport.ping_schedule` setting is set to
(which defaults to `-1`, meaning pings are disabled).


[float]
[[retrieve-remote-clusters-info]]
=== Retrieving remote clusters info
Expand Down
6 changes: 3 additions & 3 deletions docs/reference/modules/transport.asciidoc
Expand Up @@ -46,9 +46,9 @@ between all nodes. Defaults to `false`.

|`transport.ping_schedule` | Schedule a regular application-level ping message
to ensure that transport connections between nodes are kept alive. Defaults to
`5s` in the transport client and `-1` (disabled) elsewhere. It is preferable to
correctly configure TCP keep-alives instead of using this feature, because TCP
keep-alives apply to all kinds of long-lived connection and not just to
`5s` in the transport client and `-1` (disabled) elsewhere. It is preferable
to correctly configure TCP keep-alives instead of using this feature, because
TCP keep-alives apply to all kinds of long-lived connections and not just to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debates rage to this day on the internet about the use of singular or plural after "kinds of". I suspect that British English prefers the singular and US English the plural, and both are ok 😄 🇬🇧 (I'm ok with this change, just thought you'd like to know)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting :)

transport connections.

|=======================================================================
Expand Down
Expand Up @@ -292,6 +292,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteClusterService.SEARCH_REMOTE_NODE_ATTRIBUTE,
RemoteClusterService.ENABLE_REMOTE_CLUSTERS,
RemoteClusterService.SEARCH_ENABLE_REMOTE_CLUSTERS,
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE,
TransportService.TRACE_LOG_EXCLUDE_SETTING,
TransportService.TRACE_LOG_INCLUDE_SETTING,
TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING,
Expand Down
Expand Up @@ -18,8 +18,8 @@
*/
package org.elasticsearch.transport;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -67,16 +67,15 @@ public class ConnectionManager implements Closeable {
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();

public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) {
this(settings, transport, threadPool, buildDefaultConnectionProfile(settings));
this(settings, transport, threadPool, TcpTransport.PING_SCHEDULE.get(settings));
}

public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, ConnectionProfile defaultProfile) {
public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, TimeValue pingSchedule) {
this.transport = transport;
this.threadPool = threadPool;
this.pingSchedule = TcpTransport.PING_SCHEDULE.get(settings);
this.defaultProfile = defaultProfile;
this.pingSchedule = pingSchedule;
this.defaultProfile = buildDefaultConnectionProfile(settings);
this.lifecycle.moveToStarted();

if (pingSchedule.millis() > 0) {
threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, new ScheduledPing());
}
Expand Down Expand Up @@ -252,6 +251,10 @@ private void ensureOpen() {
}
}

TimeValue getPingSchedule() {
return pingSchedule;
}

private class ScheduledPing extends AbstractLifecycleRunnable {

private ScheduledPing() {
Expand Down
Expand Up @@ -18,8 +18,6 @@
*/
package org.elasticsearch.transport;

import java.net.InetSocketAddress;
import java.util.function.Supplier;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.SetOnce;
Expand Down Expand Up @@ -48,6 +46,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -64,6 +63,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -104,17 +104,11 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
* @param connectionManager the connection manager to use for this remote connection
* @param maxNumRemoteConnections the maximum number of connections to the remote cluster
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
* @param proxyAddress the proxy address
*/
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections,
Predicate<DiscoveryNode> nodePredicate) {
this(settings, clusterAlias, seedNodes, transportService, connectionManager, maxNumRemoteConnections, nodePredicate, null);
}

RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections, Predicate<DiscoveryNode>
nodePredicate,
String proxyAddress) {
Predicate<DiscoveryNode> nodePredicate, String proxyAddress) {
super(settings);
this.transportService = transportService;
this.maxNumRemoteConnections = maxNumRemoteConnections;
Expand Down
Expand Up @@ -60,6 +60,7 @@
import java.util.stream.Stream;

import static org.elasticsearch.common.settings.Setting.boolSetting;
import static org.elasticsearch.common.settings.Setting.timeSetting;

/**
* Basic service for accessing remote clusters via gateway nodes
Expand Down Expand Up @@ -166,6 +167,12 @@ public String getKey(final String key) {
Setting.Property.NodeScope),
REMOTE_CLUSTERS_SEEDS);

public static final Setting.AffixSetting<TimeValue> REMOTE_CLUSTER_PING_SCHEDULE = Setting.affixKeySetting(
"cluster.remote.",
"transport.ping_schedule",
key -> timeSetting(key, TcpTransport.PING_SCHEDULE, Setting.Property.NodeScope),
REMOTE_CLUSTERS_SEEDS);

private static final Predicate<DiscoveryNode> DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion())
&& (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode());

Expand Down Expand Up @@ -211,10 +218,13 @@ private synchronized void updateRemoteClusters(Map<String, Tuple<String, List<Su
}

if (remote == null) { // this is a new cluster we have to add a new representation
remote = new RemoteClusterConnection(settings, entry.getKey(), seedList, transportService,
new ConnectionManager(settings, transportService.transport, transportService.threadPool), numRemoteConnections,
getNodePredicate(settings), proxyAddress);
remoteClusters.put(entry.getKey(), remote);
String clusterAlias = entry.getKey();
TimeValue pingSchedule = REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterAlias).get(settings);
ConnectionManager connectionManager = new ConnectionManager(settings, transportService.transport,
transportService.threadPool, pingSchedule);
remote = new RemoteClusterConnection(settings, clusterAlias, seedList, transportService, connectionManager,
numRemoteConnections, getNodePredicate(settings), proxyAddress);
remoteClusters.put(clusterAlias, remote);
}

// now update the seed nodes no matter if it's new or already existing
Expand Down Expand Up @@ -340,31 +350,27 @@ public void onFailure(Exception e) {
* @throws IllegalArgumentException if the remote cluster is unknown
*/
public Transport.Connection getConnection(DiscoveryNode node, String cluster) {
RemoteClusterConnection connection = remoteClusters.get(cluster);
if (connection == null) {
throw new IllegalArgumentException("no such remote cluster: " + cluster);
}
return connection.getConnection(node);
return getRemoteClusterConnection(cluster).getConnection(node);
}

/**
* Ensures that the given cluster alias is connected. If the cluster is connected this operation
* will invoke the listener immediately.
*/
public void ensureConnected(String clusterAlias, ActionListener<Void> listener) {
RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterAlias);
if (remoteClusterConnection == null) {
throw new IllegalArgumentException("no such remote cluster: " + clusterAlias);
}
remoteClusterConnection.ensureConnected(listener);
void ensureConnected(String clusterAlias, ActionListener<Void> listener) {
getRemoteClusterConnection(clusterAlias).ensureConnected(listener);
}

public Transport.Connection getConnection(String cluster) {
return getRemoteClusterConnection(cluster).getConnection();
}

RemoteClusterConnection getRemoteClusterConnection(String cluster) {
RemoteClusterConnection connection = remoteClusters.get(cluster);
if (connection == null) {
throw new IllegalArgumentException("no such remote cluster: " + cluster);
}
return connection.getConnection();
return connection;
}

@Override
Expand All @@ -386,7 +392,6 @@ synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavail
}
}


@Override
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxyAddress) {
updateRemoteCluster(clusterAlias, addresses, proxyAddress, ActionListener.wrap((x) -> {}, (x) -> {}));
Expand Down