Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move connection profile into connection manager #32858

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportRequestOptions;

import java.io.IOException;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -147,7 +146,6 @@ private Bootstrap createBootstrap() {

bootstrap.handler(getClientChannelInitializer());

bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(defaultConnectionProfile.getConnectTimeout().millis()));
bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings));
bootstrap.option(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings));

Expand Down Expand Up @@ -175,14 +173,8 @@ private void createServerBootstrap(ProfileSettings profileSettings) {
String name = profileSettings.profileName;
if (logger.isDebugEnabled()) {
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], "
+ "connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]",
+ "receive_predictor[{}->{}]",
name, workerCount, profileSettings.portOrRange, profileSettings.bindHosts, profileSettings.publishHosts, compress,
defaultConnectionProfile.getConnectTimeout(),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.REG),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.PING),
receivePredictorMin, receivePredictorMax);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,13 @@ public void apply(Settings value, Settings current, Settings previous) {
TcpTransport.TCP_REUSE_ADDRESS_PROFILE,
TcpTransport.TCP_SEND_BUFFER_SIZE_PROFILE,
TcpTransport.TCP_RECEIVE_BUFFER_SIZE_PROFILE,
TcpTransport.CONNECTIONS_PER_NODE_RECOVERY,
TcpTransport.CONNECTIONS_PER_NODE_BULK,
TcpTransport.CONNECTIONS_PER_NODE_REG,
TcpTransport.CONNECTIONS_PER_NODE_STATE,
TcpTransport.CONNECTIONS_PER_NODE_PING,
TransportService.CONNECTIONS_PER_NODE_RECOVERY,
TransportService.CONNECTIONS_PER_NODE_BULK,
TransportService.CONNECTIONS_PER_NODE_REG,
TransportService.CONNECTIONS_PER_NODE_STATE,
TransportService.CONNECTIONS_PER_NODE_PING,
TransportService.TCP_CONNECT_TIMEOUT,
TcpTransport.PING_SCHEDULE,
TcpTransport.TCP_CONNECT_TIMEOUT,
NetworkService.NETWORK_SERVER,
TcpTransport.TCP_NO_DELAY,
TcpTransport.TCP_KEEP_ALIVE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,21 @@ public class ConnectionManager implements Closeable {
private final Transport transport;
private final ThreadPool threadPool;
private final TimeValue pingSchedule;
private final ConnectionProfile defaultProfile;
private final Lifecycle lifecycle = new Lifecycle();
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();

public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) {
this(settings, transport, threadPool, buildDefaultConnectionProfile(settings));
}

public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, ConnectionProfile defaultProfile) {
this.logger = Loggers.getLogger(getClass(), settings);
this.transport = transport;
this.threadPool = threadPool;
this.pingSchedule = TcpTransport.PING_SCHEDULE.get(settings);
this.defaultProfile = defaultProfile;
this.lifecycle.moveToStarted();

if (pingSchedule.millis() > 0) {
Expand All @@ -84,13 +90,18 @@ public void removeListener(TransportConnectionListener listener) {
this.connectionListener.listeners.remove(listener);
}

public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
return transport.openConnection(node, ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile));
}

/**
* Connects to a node with the given connection profile. If the node is already connected this method has no effect.
* Once a successful is established, it can be validated before being exposed.
*/
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
CheckedBiConsumer<Transport.Connection, ConnectionProfile, IOException> connectionValidator)
throws ConnectTransportException {
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
if (node == null) {
throw new ConnectTransportException(null, "can't connect to a null node");
}
Expand All @@ -104,8 +115,8 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
}
boolean success = false;
try {
connection = transport.openConnection(node, connectionProfile);
connectionValidator.accept(connection, connectionProfile);
connection = transport.openConnection(node, resolvedProfile);
connectionValidator.accept(connection, resolvedProfile);
// we acquire a connection lock, so no way there is an existing connection
connectedNodes.put(node, connection);
if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -279,4 +290,23 @@ public void onNodeConnected(DiscoveryNode node) {
}
}
}

static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
int connectionsPerNodeRecovery = TransportService.CONNECTIONS_PER_NODE_RECOVERY.get(settings);
int connectionsPerNodeBulk = TransportService.CONNECTIONS_PER_NODE_BULK.get(settings);
int connectionsPerNodeReg = TransportService.CONNECTIONS_PER_NODE_REG.get(settings);
int connectionsPerNodeState = TransportService.CONNECTIONS_PER_NODE_STATE.get(settings);
int connectionsPerNodePing = TransportService.CONNECTIONS_PER_NODE_PING.get(settings);
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
// if we are not master eligible we don't need a dedicated channel to publish the state
builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
// if we are not a data-node we don't need any dedicated channels for recovery
builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -61,14 +62,35 @@ public static ConnectionProfile buildSingleChannelProfile(TransportRequestOption
private final TimeValue connectTimeout;
private final TimeValue handshakeTimeout;

private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections, TimeValue connectTimeout, TimeValue handshakeTimeout)
{
private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections, TimeValue connectTimeout,
TimeValue handshakeTimeout) {
this.handles = handles;
this.numConnections = numConnections;
this.connectTimeout = connectTimeout;
this.handshakeTimeout = handshakeTimeout;
}

/**
* takes a {@link ConnectionProfile} resolves it to a fully specified (i.e., no nulls) profile
*/
public static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionProfile profile, ConnectionProfile fallbackProfile) {
Objects.requireNonNull(fallbackProfile);
if (profile == null) {
return fallbackProfile;
} else if (profile.getConnectTimeout() != null && profile.getHandshakeTimeout() != null) {
return profile;
} else {
ConnectionProfile.Builder builder = new ConnectionProfile.Builder(profile);
if (profile.getConnectTimeout() == null) {
builder.setConnectTimeout(fallbackProfile.getConnectTimeout());
}
if (profile.getHandshakeTimeout() == null) {
builder.setHandshakeTimeout(fallbackProfile.getHandshakeTimeout());
}
return builder.build();
}
}

/**
* A builder to build a new {@link ConnectionProfile}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
this.nodePredicate = nodePredicate;
this.clusterAlias = clusterAlias;
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.setConnectTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings));
builder.setHandshakeTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings));
builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING); // TODO make this configurable?
builder.addConnections(0, // we don't want this to be used for anything else but search
TransportRequestOptions.Type.BULK,
Expand Down
72 changes: 8 additions & 64 deletions server/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
Expand Down Expand Up @@ -135,30 +134,16 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
// the scheduled internal ping interval setting, defaults to disabled (-1)
public static final Setting<TimeValue> PING_SCHEDULE =
timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_RECOVERY =
intSetting("transport.connections_per_node.recovery", 2, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_BULK =
intSetting("transport.connections_per_node.bulk", 3, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_REG =
intSetting("transport.connections_per_node.reg", 6, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_STATE =
intSetting("transport.connections_per_node.state", 1, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_PING =
intSetting("transport.connections_per_node.ping", 1, 1, Setting.Property.NodeScope);
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT =
timeSetting("transport.tcp.connect_timeout", NetworkService.TCP_CONNECT_TIMEOUT, Setting.Property.NodeScope);
public static final Setting<Boolean> TCP_NO_DELAY =
boolSetting("transport.tcp_no_delay", NetworkService.TCP_NO_DELAY, Setting.Property.NodeScope);
public static final Setting<Boolean> TCP_KEEP_ALIVE =
boolSetting("transport.tcp.keep_alive", NetworkService.TCP_KEEP_ALIVE, Setting.Property.NodeScope);
public static final Setting<Boolean> TCP_REUSE_ADDRESS =
boolSetting("transport.tcp.reuse_address", NetworkService.TCP_REUSE_ADDRESS, Setting.Property.NodeScope);
public static final Setting<ByteSizeValue> TCP_SEND_BUFFER_SIZE =
Setting.byteSizeSetting("transport.tcp.send_buffer_size", NetworkService.TCP_SEND_BUFFER_SIZE,
Setting.Property.NodeScope);
Setting.byteSizeSetting("transport.tcp.send_buffer_size", NetworkService.TCP_SEND_BUFFER_SIZE, Setting.Property.NodeScope);
public static final Setting<ByteSizeValue> TCP_RECEIVE_BUFFER_SIZE =
Setting.byteSizeSetting("transport.tcp.receive_buffer_size", NetworkService.TCP_RECEIVE_BUFFER_SIZE,
Setting.Property.NodeScope);
Setting.byteSizeSetting("transport.tcp.receive_buffer_size", NetworkService.TCP_RECEIVE_BUFFER_SIZE, Setting.Property.NodeScope);


public static final Setting.AffixSetting<Boolean> TCP_NO_DELAY_PROFILE = affixKeySetting("transport.profiles.", "tcp_no_delay",
Expand Down Expand Up @@ -213,7 +198,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
protected final boolean compress;
private volatile BoundTransportAddress boundAddress;
private final String transportName;
protected final ConnectionProfile defaultConnectionProfile;

private final ConcurrentMap<Long, HandshakeResponseHandler> pendingHandshakes = new ConcurrentHashMap<>();
private final CounterMetric numHandshakes = new CounterMetric();
Expand All @@ -237,7 +221,6 @@ public TcpTransport(String transportName, Settings settings, ThreadPool threadPo
this.compress = Transport.TRANSPORT_TCP_COMPRESS.get(settings);
this.networkService = networkService;
this.transportName = transportName;
defaultConnectionProfile = buildDefaultConnectionProfile(settings);
final Settings defaultFeatures = DEFAULT_FEATURES_SETTING.get(settings);
if (defaultFeatures == null) {
this.features = new String[0];
Expand All @@ -261,25 +244,6 @@ public TcpTransport(String transportName, Settings settings, ThreadPool threadPo
}
}

static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
int connectionsPerNodeRecovery = CONNECTIONS_PER_NODE_RECOVERY.get(settings);
int connectionsPerNodeBulk = CONNECTIONS_PER_NODE_BULK.get(settings);
int connectionsPerNodeReg = CONNECTIONS_PER_NODE_REG.get(settings);
int connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.get(settings);
int connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings);
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.setConnectTimeout(TCP_CONNECT_TIMEOUT.get(settings));
builder.setHandshakeTimeout(TCP_CONNECT_TIMEOUT.get(settings));
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
// if we are not master eligible we don't need a dedicated channel to publish the state
builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
// if we are not a data-node we don't need any dedicated channels for recovery
builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
return builder.build();
}

@Override
protected void doStart() {
}
Expand Down Expand Up @@ -456,41 +420,21 @@ public void sendRequest(long requestId, String action, TransportRequest request,
}
}

/**
* takes a {@link ConnectionProfile} that have been passed as a parameter to the public methods
* and resolves it to a fully specified (i.e., no nulls) profile
*/
protected static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionProfile connectionProfile,
ConnectionProfile defaultConnectionProfile) {
Objects.requireNonNull(defaultConnectionProfile);
if (connectionProfile == null) {
return defaultConnectionProfile;
} else if (connectionProfile.getConnectTimeout() != null && connectionProfile.getHandshakeTimeout() != null) {
return connectionProfile;
} else {
ConnectionProfile.Builder builder = new ConnectionProfile.Builder(connectionProfile);
if (connectionProfile.getConnectTimeout() == null) {
builder.setConnectTimeout(defaultConnectionProfile.getConnectTimeout());
}
if (connectionProfile.getHandshakeTimeout() == null) {
builder.setHandshakeTimeout(defaultConnectionProfile.getHandshakeTimeout());
}
return builder.build();
}
}

protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectionProfile) {
return resolveConnectionProfile(connectionProfile, defaultConnectionProfile);
// This allows transport implementations to potentially override specific connection profiles. This
// primarily exists for the test implementations.
protected ConnectionProfile maybeOverrideConnectionProfile(ConnectionProfile connectionProfile) {
return connectionProfile;
}

@Override
public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
Objects.requireNonNull(connectionProfile, "connection profile cannot be null");
if (node == null) {
throw new ConnectTransportException(null, "can't open connection to a null node");
}
boolean success = false;
NodeChannels nodeChannels = null;
connectionProfile = resolveConnectionProfile(connectionProfile);
connectionProfile = maybeOverrideConnectionProfile(connectionProfile);
closeLock.readLock().lock(); // ensure we don't open connections while we are closing
try {
ensureOpen();
Expand Down