Skip to content

Commit

Permalink
Reduce channels in AbstractSimpleTransportTestCase (elastic#34863)
Browse files Browse the repository at this point in the history
This is related to elastic#30876. The AbstractSimpleTransportTestCase initiates
many tcp connections. There are normally over 1,000 connections in
TIME_WAIT at the end of the test. This is because every test opens at
least two different transports that connect to each other with 13
channel connection profiles. This commit modifies the default
connection profile used by this test to 6. One connection for each
type, except for REG which gets 2 connections.
  • Loading branch information
Tim-Brooks committed Oct 25, 2018
1 parent 0360d5d commit f352f59
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected Version getCurrentVersion() {
}
};
MockTransportService mockTransportService =
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());
mockTransportService.start();
return mockTransportService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class ConnectionManager implements Closeable {
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();

public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) {
this(settings, transport, threadPool, buildDefaultConnectionProfile(settings));
this(settings, transport, threadPool, ConnectionProfile.buildDefaultConnectionProfile(settings));
}

public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, ConnectionProfile defaultProfile) {
Expand Down Expand Up @@ -323,23 +323,4 @@ public void onConnectionClosed(Transport.Connection connection) {
}
}
}

public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
int connectionsPerNodeRecovery = TransportService.CONNECTIONS_PER_NODE_RECOVERY.get(settings);
int connectionsPerNodeBulk = TransportService.CONNECTIONS_PER_NODE_BULK.get(settings);
int connectionsPerNodeReg = TransportService.CONNECTIONS_PER_NODE_REG.get(settings);
int connectionsPerNodeState = TransportService.CONNECTIONS_PER_NODE_STATE.get(settings);
int connectionsPerNodePing = TransportService.CONNECTIONS_PER_NODE_PING.get(settings);
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
// if we are not master eligible we don't need a dedicated channel to publish the state
builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
// if we are not a data-node we don't need any dedicated channels for recovery
builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.elasticsearch.transport;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;

import java.util.ArrayList;
Expand Down Expand Up @@ -91,6 +93,31 @@ public static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionPro
}
}

/**
* Builds a default connection profile based on the provided settings.
*
* @param settings to build the connection profile from
* @return the connection profile
*/
public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
int connectionsPerNodeRecovery = TransportService.CONNECTIONS_PER_NODE_RECOVERY.get(settings);
int connectionsPerNodeBulk = TransportService.CONNECTIONS_PER_NODE_BULK.get(settings);
int connectionsPerNodeReg = TransportService.CONNECTIONS_PER_NODE_REG.get(settings);
int connectionsPerNodeState = TransportService.CONNECTIONS_PER_NODE_STATE.get(settings);
int connectionsPerNodePing = TransportService.CONNECTIONS_PER_NODE_PING.get(settings);
Builder builder = new Builder();
builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
// if we are not master eligible we don't need a dedicated channel to publish the state
builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
// if we are not a data-node we don't need any dedicated channels for recovery
builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
return builder.build();
}

/**
* A builder to build a new {@link ConnectionProfile}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void stopThreadPool() {
}

public void testConnectionProfileResolve() {
final ConnectionProfile defaultProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY);
final ConnectionProfile defaultProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
assertEquals(defaultProfile, ConnectionProfile.resolveConnectionProfile(null, defaultProfile));

final ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
Expand Down Expand Up @@ -96,31 +96,31 @@ public void testConnectionProfileResolve() {
}

public void testDefaultConnectionProfile() {
ConnectionProfile profile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY);
ConnectionProfile profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
assertEquals(13, profile.getNumConnections());
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));

profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build());
profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build());
assertEquals(12, profile.getNumConnections());
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));

profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build());
profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build());
assertEquals(11, profile.getNumConnections());
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));

profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.data", false)
profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.data", false)
.put("node.master", false).build());
assertEquals(10, profile.getNumConnections());
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
protected abstract MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake);

protected int channelsPerNodeConnection() {
return 13;
// This is a customized profile for this test case.
return 6;
}

@Override
Expand All @@ -122,9 +123,17 @@ public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(getClass().getName());
clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
serviceA = buildService("TS_A", version0, clusterSettings); // this one supports dynamic tracer updates
Settings connectionSettings = Settings.builder()
.put(TransportService.CONNECTIONS_PER_NODE_RECOVERY.getKey(), 1)
.put(TransportService.CONNECTIONS_PER_NODE_BULK.getKey(), 1)
.put(TransportService.CONNECTIONS_PER_NODE_REG.getKey(), 2)
.put(TransportService.CONNECTIONS_PER_NODE_STATE.getKey(), 1)
.put(TransportService.CONNECTIONS_PER_NODE_PING.getKey(), 1)
.build();

serviceA = buildService("TS_A", version0, clusterSettings, connectionSettings); // this one supports dynamic tracer updates
nodeA = serviceA.getLocalNode();
serviceB = buildService("TS_B", version1, null); // this one doesn't support dynamic tracer updates
serviceB = buildService("TS_B", version1, null, connectionSettings); // this one doesn't support dynamic tracer updates
nodeB = serviceB.getLocalNode();
// wait till all nodes are properly connected and the event has been sent, so tests in this class
// will not get this callback called on the connections done in this setup
Expand Down Expand Up @@ -171,7 +180,12 @@ private MockTransportService buildService(final String name, final Version versi
}

protected MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings) {
return buildService(name, version, clusterSettings, Settings.EMPTY, true, true);
return buildService(name, version, clusterSettings, Settings.EMPTY);
}

protected MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings,
Settings settings) {
return buildService(name, version, clusterSettings, settings, true, true);
}

@Override
Expand Down Expand Up @@ -2004,7 +2018,7 @@ protected String handleRequest(TcpChannel mockChannel, String profileName, Strea
assertEquals("handshake failed", exception.getCause().getMessage());
}

ConnectionProfile connectionProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY);
ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null);
TcpTransport.NodeChannels connection = originalTransport.openConnection(
new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public Version executeHandshake(DiscoveryNode node, TcpChannel mockChannel, Time
}
};
MockTransportService mockTransportService =
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());
mockTransportService.start();
return mockTransportService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionManager;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportRequestOptions;
Expand Down Expand Up @@ -111,7 +110,7 @@ public void testTcpHandshake() throws IOException, InterruptedException {
assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport);
TcpTransport originalTransport = (TcpTransport) serviceA.getOriginalTransport();

ConnectionProfile connectionProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY);
ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null);
TcpTransport.NodeChannels connection = originalTransport.openConnection(
new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected Version getCurrentVersion() {

};
MockTransportService mockTransportService =
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings,
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings,
Collections.emptySet());
mockTransportService.start();
return mockTransportService;
Expand Down

0 comments on commit f352f59

Please sign in to comment.