Skip to content

Commit

Permalink
Tcp Transport: Connection pool between nodes and different connection…
Browse files Browse the repository at this point in the history
… types, closes #564.
  • Loading branch information
kimchy committed Dec 15, 2010
1 parent 085066e commit d6bab1a
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 46 deletions.
Expand Up @@ -291,8 +291,7 @@ class TransportHandler extends BaseTransportRequestHandler<BulkRequest> {
}

@Override public boolean spawn() {
// no need to spawn, since in the doExecute we always execute with threaded operation set to true
return false;
return true; // spawn, we do some work here...
}
}
}
Expand Up @@ -74,7 +74,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}

@Override protected TransportRequestOptions transportOptions() {
return TransportRequestOptions.options().withCompress(true);
// low type since we don't want the large bulk requests to cause high latency on typical requests
return TransportRequestOptions.options().withCompress(true).withLowType();
}

@Override protected BulkShardRequest newRequestInstance() {
Expand Down
Expand Up @@ -42,6 +42,6 @@ public class ClientTransportBulkAction extends BaseClientTransportAction<BulkReq
}

@Override protected TransportRequestOptions options() {
return TransportRequestOptions.options().withCompress(true);
return TransportRequestOptions.options().withLowType().withCompress(true);
}
}
Expand Up @@ -258,7 +258,7 @@ public void stop() {
threadPool.schedule(MasterPinger.this, pingInterval);
return;
}
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withTimeout(pingRetryTimeout),
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withHighType().withTimeout(pingRetryTimeout),
new BaseTransportResponseHandler<MasterPingResponseResponse>() {
@Override public MasterPingResponseResponse newInstance() {
return new MasterPingResponseResponse();
Expand Down Expand Up @@ -296,7 +296,7 @@ public void stop() {
notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout");
} else {
// resend the request, not reschedule, rely on send timeout
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withTimeout(pingRetryTimeout), this);
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withHighType().withTimeout(pingRetryTimeout), this);
}
}
}
Expand Down
Expand Up @@ -198,7 +198,7 @@ private SendPingRequest(DiscoveryNode node) {
if (!running) {
return;
}
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withTimeout(pingRetryTimeout),
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withHighType().withTimeout(pingRetryTimeout),
new BaseTransportResponseHandler<PingResponse>() {
@Override public PingResponse newInstance() {
return new PingResponse();
Expand Down Expand Up @@ -232,7 +232,8 @@ private SendPingRequest(DiscoveryNode node) {
}
} else {
// resend the request, not reschedule, rely on send timeout
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withTimeout(pingRetryTimeout), this);
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()),
options().withHighType().withTimeout(pingRetryTimeout), this);
}
}
}
Expand Down
Expand Up @@ -167,7 +167,7 @@ private RecoveryResponse recover(final StartRecoveryRequest request) {
long position = indexInput.getFilePointer();
indexInput.readBytes(buf, 0, toRead, false);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, md.checksum(), buf, toRead),
TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet();
TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE).txGet();
readCount += toRead;
}
indexInput.close();
Expand Down Expand Up @@ -258,15 +258,15 @@ private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticSearchExcepti
totalOperations++;
if (++counter == translogBatchSize) {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE).txGet();
counter = 0;
operations = Lists.newArrayList();
}
}
// send the leftover
if (!operations.isEmpty()) {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE).txGet();
}
return totalOperations;
}
Expand Down
Expand Up @@ -32,10 +32,18 @@ public static TransportRequestOptions options() {
return new TransportRequestOptions();
}

public static enum Type {
LOW,
MED,
HIGH
}

private TimeValue timeout;

private boolean compress;

private Type type = Type.MED;

public TransportRequestOptions withTimeout(long timeout) {
return withTimeout(TimeValue.timeValueMillis(timeout));
}
Expand All @@ -50,11 +58,44 @@ public TransportRequestOptions withCompress(boolean compress) {
return this;
}

public TransportRequestOptions withType(Type type) {
this.type = type;
return this;
}

/**
* A request that requires very low latency. Usually reserved for ping requests with very small payload.
*/
public TransportRequestOptions withHighType() {
this.type = Type.HIGH;
return this;
}

/**
* The typical requests flows go through this one.
*/
public TransportRequestOptions withMedType() {
this.type = Type.MED;
return this;
}

/**
* Batch oriented (big payload) based requests use this one.
*/
public TransportRequestOptions withLowType() {
this.type = Type.LOW;
return this;
}

public TimeValue timeout() {
return this.timeout;
}

public boolean compress() {
return this.compress;
}

public Type type() {
return this.type;
}
}
Expand Up @@ -62,6 +62,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.common.network.NetworkService.TcpSettings.*;
Expand All @@ -72,6 +73,10 @@
import static org.elasticsearch.common.util.concurrent.EsExecutors.*;

/**
* There are 3 types of connections per node, low/med/high. Low if for batch oriented APIs (like recovery or
* batch) with high payload that will cause regular request. (like search or single index) to take
* longer. Med is for the typical search / single doc index. And High is for ping type requests (like FD).
*
* @author kimchy (shay.banon)
*/
public class NettyTransport extends AbstractLifecycleComponent<Transport> implements Transport {
Expand Down Expand Up @@ -112,6 +117,10 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem

final ByteSizeValue tcpReceiveBufferSize;

final int connectionsPerNodeLow;
final int connectionsPerNodeMed;
final int connectionsPerNodeHigh;

private final ThreadPool threadPool;

private volatile OpenChannelsHandler serverOpenChannels;
Expand All @@ -121,7 +130,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
private volatile ServerBootstrap serverBootstrap;

// node id to actual channel
final ConcurrentMap<DiscoveryNode, Channel> connectedNodes = newConcurrentMap();
final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();


private volatile Channel serverChannel;
Expand Down Expand Up @@ -156,6 +165,9 @@ public NettyTransport(Settings settings, ThreadPool threadPool) {
this.reuseAddress = componentSettings.getAsBoolean("reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress()));
this.tcpSendBufferSize = componentSettings.getAsBytesSize("tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE));
this.tcpReceiveBufferSize = componentSettings.getAsBytesSize("tcp_receive_buffer_size", settings.getAsBytesSize(TCP_RECEIVE_BUFFER_SIZE, TCP_DEFAULT_RECEIVE_BUFFER_SIZE));
this.connectionsPerNodeLow = componentSettings.getAsInt("connections_per_node.low", 2);
this.connectionsPerNodeMed = componentSettings.getAsInt("connections_per_node.low", 7);

This comment has been minimized.

Copy link
@lukas-vlcek

lukas-vlcek Dec 15, 2010

Contributor

connections_per_node.med ?

this.connectionsPerNodeHigh = componentSettings.getAsInt("connections_per_node.low", 1);

This comment has been minimized.

Copy link
@lukas-vlcek

lukas-vlcek Dec 15, 2010

Contributor

connections_per_node.high ?

}

public Settings settings() {
Expand Down Expand Up @@ -309,10 +321,10 @@ ThreadPool threadPool() {
serverBootstrap = null;
}

for (Iterator<Channel> it = connectedNodes.values().iterator(); it.hasNext();) {
Channel channel = it.next();
for (Iterator<NodeChannels> it = connectedNodes.values().iterator(); it.hasNext();) {
NodeChannels nodeChannels = it.next();
it.remove();
closeChannel(channel);
nodeChannels.close();
}

if (clientBootstrap != null) {
Expand Down Expand Up @@ -369,8 +381,8 @@ void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Excepti
if (isCloseConnectionException(e.getCause())) {
// disconnect the node
Channel channel = ctx.getChannel();
for (Map.Entry<DiscoveryNode, Channel> entry : connectedNodes.entrySet()) {
if (entry.getValue().equals(channel)) {
for (Map.Entry<DiscoveryNode, NodeChannels> entry : connectedNodes.entrySet()) {
if (entry.getValue().hasChannel(channel)) {
disconnectFromNode(entry.getKey());
}
}
Expand All @@ -388,7 +400,7 @@ TransportAddress wrapAddress(SocketAddress socketAddress) {
}

@Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException {
Channel targetChannel = nodeChannel(node);
Channel targetChannel = nodeChannel(node, options);

if (compress) {
options.withCompress(true);
Expand Down Expand Up @@ -420,30 +432,32 @@ TransportAddress wrapAddress(SocketAddress socketAddress) {
if (!lifecycle.started()) {
throw new ElasticSearchIllegalStateException("Can't add nodes to a stopped transport");
}
if (node == null) {
throw new ConnectTransportException(node, "Can't connect to a null node");
}
try {
if (node == null) {
throw new ConnectTransportException(node, "Can't connect to a null node");
}
Channel channel = connectedNodes.get(node);
if (channel != null) {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null) {
return;
}
synchronized (this) {
// recheck here, within the sync block (we cache connections, so we don't care about this single sync block)
channel = connectedNodes.get(node);
if (channel != null) {
nodeChannels = connectedNodes.get(node);
if (nodeChannels != null) {
return;
}

InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
ChannelFuture connectFuture = clientBootstrap.connect(address);
connectFuture.awaitUninterruptibly((long) (connectTimeout.millis() * 1.25));
if (!connectFuture.isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectFuture.getCause());
nodeChannels = new NodeChannels(new Channel[connectionsPerNodeLow], new Channel[connectionsPerNodeMed], new Channel[connectionsPerNodeHigh]);
try {
connectToChannels(nodeChannels.high, node);
connectToChannels(nodeChannels.med, node);
connectToChannels(nodeChannels.low, node);
} catch (Exception e) {
nodeChannels.close();
throw e;
}
channel = connectFuture.getChannel();
channel.getCloseFuture().addListener(new ChannelCloseListener(node));
connectedNodes.put(node, channel);

connectedNodes.put(node, nodeChannels);

if (logger.isDebugEnabled()) {
logger.debug("Connected to node [{}]", node);
Expand All @@ -455,30 +469,37 @@ TransportAddress wrapAddress(SocketAddress socketAddress) {
}
}

private void connectToChannels(Channel[] channels, DiscoveryNode node) {
for (int i = 0; i < channels.length; i++) {
InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
ChannelFuture connectFuture = clientBootstrap.connect(address);
connectFuture.awaitUninterruptibly((long) (connectTimeout.millis() * 1.25));
if (!connectFuture.isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectFuture.getCause());
}
channels[i] = connectFuture.getChannel();
channels[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
}

@Override public void disconnectFromNode(DiscoveryNode node) {
Channel channel = connectedNodes.remove(node);
if (channel != null) {
NodeChannels nodeChannels = connectedNodes.remove(node);
if (nodeChannels != null) {
try {
closeChannel(channel);
nodeChannels.close();
} finally {
logger.debug("Disconnected from [{}]", node);
transportServiceAdapter.raiseNodeDisconnected(node);
}
}
}

private Channel nodeChannel(DiscoveryNode node) throws ConnectTransportException {
Channel channel = connectedNodes.get(node);
if (channel == null) {
private Channel nodeChannel(DiscoveryNode node, TransportRequestOptions options) throws ConnectTransportException {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels == null) {
throw new NodeNotConnectedException(node, "Node not connected");
}
return channel;
}

private void closeChannel(Channel channel) {
if (channel.isOpen()) {
channel.close().awaitUninterruptibly();
}
return nodeChannels.channel(options.type());
}

private class ChannelCloseListener implements ChannelFutureListener {
Expand All @@ -493,4 +514,61 @@ private ChannelCloseListener(DiscoveryNode node) {
disconnectFromNode(node);
}
}

public static class NodeChannels {

private Channel[] low;
private final AtomicInteger lowCounter = new AtomicInteger();
private Channel[] med;
private final AtomicInteger medCounter = new AtomicInteger();
private Channel[] high;
private final AtomicInteger highCounter = new AtomicInteger();

public NodeChannels(Channel[] low, Channel[] med, Channel[] high) {
this.low = low;
this.med = med;
this.high = high;
}

public boolean hasChannel(Channel channel) {
return hasChannel(channel, low) || hasChannel(channel, med) || hasChannel(channel, high);
}

private boolean hasChannel(Channel channel, Channel[] channels) {
for (Channel channel1 : channels) {
if (channel.equals(channel1)) {
return true;
}
}
return false;
}

public Channel channel(TransportRequestOptions.Type type) {
if (type == TransportRequestOptions.Type.MED) {
return med[Math.abs(medCounter.incrementAndGet()) % med.length];
} else if (type == TransportRequestOptions.Type.HIGH) {
return high[Math.abs(highCounter.incrementAndGet()) % high.length];
} else {
return low[Math.abs(lowCounter.incrementAndGet()) % low.length];
}
}

public void close() {
closeChannels(low);
closeChannels(med);
closeChannels(high);
}

private void closeChannels(Channel[] channels) {
for (Channel channel : channels) {
try {
if (channel != null && channel.isOpen()) {
channel.close();
}
} catch (Exception e) {
//ignore
}
}
}
}
}

0 comments on commit d6bab1a

Please sign in to comment.