Skip to content

Commit

Permalink
Transport: Add a dedicated ping channel
Browse files Browse the repository at this point in the history
Today, we have a low/med/high channel groups in our transport layer. High is used to publish cluster state and send ping requests. Sometimes, the overhead of publishing large cluster states can interfere with ping requests.

Introduce a new, dedicated ping channel (with size 1) to have a channel that only handles ping requests.
closes elastic#3362
  • Loading branch information
kimchy committed Jul 22, 2013
1 parent ba6c18e commit 3771dba
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 14 deletions.
Expand Up @@ -266,7 +266,7 @@ public void run() {
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
return;
}
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withHighType().withTimeout(pingRetryTimeout),
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withPingType().withTimeout(pingRetryTimeout),
new BaseTransportResponseHandler<MasterPingResponseResponse>() {
@Override
public MasterPingResponseResponse newInstance() {
Expand Down Expand Up @@ -324,7 +324,7 @@ public void handleException(TransportException exp) {
notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout");
} else {
// resend the request, not reschedule, rely on send timeout
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withHighType().withTimeout(pingRetryTimeout), this);
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withPingType().withTimeout(pingRetryTimeout), this);
}
}
}
Expand Down
Expand Up @@ -200,7 +200,7 @@ public void run() {
if (!running) {
return;
}
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withHighType().withTimeout(pingRetryTimeout),
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withPingType().withTimeout(pingRetryTimeout),
new BaseTransportResponseHandler<PingResponse>() {
@Override
public PingResponse newInstance() {
Expand Down Expand Up @@ -248,7 +248,7 @@ public void handleException(TransportException exp) {
} else {
// resend the request, not reschedule, rely on send timeout
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()),
options().withHighType().withTimeout(pingRetryTimeout), this);
options().withPingType().withTimeout(pingRetryTimeout), this);
}
}
}
Expand Down
Expand Up @@ -36,7 +36,8 @@ public static TransportRequestOptions options() {
public static enum Type {
LOW,
MED,
HIGH;
HIGH,
PING;

public static Type fromString(String type) {
if ("low".equalsIgnoreCase(type)) {
Expand All @@ -45,6 +46,8 @@ public static Type fromString(String type) {
return MED;
} else if ("high".equalsIgnoreCase(type)) {
return HIGH;
} else if ("ping".equalsIgnoreCase(type)) {
return PING;
} else {
throw new ElasticSearchIllegalArgumentException("failed to match transport type for [" + type + "]");
}
Expand Down Expand Up @@ -77,7 +80,16 @@ public TransportRequestOptions withType(Type type) {
}

/**
* A request that requires very low latency. Usually reserved for ping requests with very small payload.
* A request that requires very low latency.
*/
public TransportRequestOptions withPingType() {
this.type = Type.PING;
return this;
}


/**
* A channel reserved for high prio requests.
*/
public TransportRequestOptions withHighType() {
this.type = Type.HIGH;
Expand Down
Expand Up @@ -82,9 +82,10 @@
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;

/**
* There are 3 types of connections per node, low/med/high. Low if for batch oriented APIs (like recovery or
* There are 4 types of connections per node, low/med/high/ping. Low if for batch oriented APIs (like recovery or
* batch) with high payload that will cause regular request. (like search or single index) to take
* longer. Med is for the typical search / single doc index. And High is for ping type requests (like FD).
* longer. Med is for the typical search / single doc index. And High for things like cluster state. Ping is reserved for
* sending out ping requests to other nodes.
*/
public class NettyTransport extends AbstractLifecycleComponent<Transport> implements Transport {

Expand Down Expand Up @@ -124,6 +125,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
final int connectionsPerNodeLow;
final int connectionsPerNodeMed;
final int connectionsPerNodeHigh;
final int connectionsPerNodePing;

final ByteSizeValue maxCumulationBufferCapacity;
final int maxCompositeBufferComponents;
Expand Down Expand Up @@ -191,6 +193,7 @@ public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService n
this.connectionsPerNodeLow = componentSettings.getAsInt("connections_per_node.low", settings.getAsInt("transport.connections_per_node.low", 2));
this.connectionsPerNodeMed = componentSettings.getAsInt("connections_per_node.med", settings.getAsInt("transport.connections_per_node.med", 6));
this.connectionsPerNodeHigh = componentSettings.getAsInt("connections_per_node.high", settings.getAsInt("transport.connections_per_node.high", 1));
this.connectionsPerNodePing = componentSettings.getAsInt("connections_per_node.ping", settings.getAsInt("transport.connections_per_node.ping", 1));

this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null);
this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1);
Expand All @@ -211,8 +214,8 @@ public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService n
receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes());
}

logger.debug("using worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}], receive_predictor[{}->{}]",
workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeLow, connectionsPerNodeMed, connectionsPerNodeHigh, receivePredictorMin, receivePredictorMax);
logger.debug("using worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}/{}], receive_predictor[{}->{}]",
workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeLow, connectionsPerNodeMed, connectionsPerNodeHigh, connectionsPerNodePing, receivePredictorMin, receivePredictorMax);
}

public Settings settings() {
Expand Down Expand Up @@ -605,7 +608,7 @@ public void connectToNode(DiscoveryNode node, boolean light) {
if (light) {
nodeChannels = connectToChannelsLight(node);
} else {
nodeChannels = new NodeChannels(new Channel[connectionsPerNodeLow], new Channel[connectionsPerNodeMed], new Channel[connectionsPerNodeHigh]);
nodeChannels = new NodeChannels(new Channel[connectionsPerNodeLow], new Channel[connectionsPerNodeMed], new Channel[connectionsPerNodeHigh], new Channel[connectionsPerNodePing]);
try {
connectToChannels(nodeChannels, node);
} catch (Exception e) {
Expand Down Expand Up @@ -646,13 +649,14 @@ private NodeChannels connectToChannelsLight(DiscoveryNode node) {
Channel[] channels = new Channel[1];
channels[0] = connect.getChannel();
channels[0].getCloseFuture().addListener(new ChannelCloseListener(node));
return new NodeChannels(channels, channels, channels);
return new NodeChannels(channels, channels, channels, channels);
}

private void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {
ChannelFuture[] connectLow = new ChannelFuture[nodeChannels.low.length];
ChannelFuture[] connectMed = new ChannelFuture[nodeChannels.med.length];
ChannelFuture[] connectHigh = new ChannelFuture[nodeChannels.high.length];
ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length];
InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
for (int i = 0; i < connectLow.length; i++) {
connectLow[i] = clientBootstrap.connect(address);
Expand All @@ -663,6 +667,9 @@ private void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {
for (int i = 0; i < connectHigh.length; i++) {
connectHigh[i] = clientBootstrap.connect(address);
}
for (int i = 0; i < connectPing.length; i++) {
connectPing[i] = clientBootstrap.connect(address);
}

try {
for (int i = 0; i < connectLow.length; i++) {
Expand Down Expand Up @@ -692,6 +699,15 @@ private void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {
nodeChannels.high[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}

for (int i = 0; i < connectPing.length; i++) {
connectPing[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectPing[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectPing[i].getCause());
}
nodeChannels.ping[i] = connectPing[i].getChannel();
nodeChannels.ping[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}

if (nodeChannels.low.length == 0) {
if (nodeChannels.med.length > 0) {
nodeChannels.low = nodeChannels.med;
Expand All @@ -713,6 +729,13 @@ private void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {
nodeChannels.high = nodeChannels.low;
}
}
if (nodeChannels.ping.length == 0) {
if (nodeChannels.high.length > 0) {
nodeChannels.ping = nodeChannels.high;
} else {
nodeChannels.ping = nodeChannels.med;
}
}
} catch (RuntimeException e) {
// clean the futures
for (ChannelFuture future : ImmutableList.<ChannelFuture>builder().add(connectLow).add(connectMed).add(connectHigh).build()) {
Expand Down Expand Up @@ -821,15 +844,18 @@ public static class NodeChannels {
private final AtomicInteger medCounter = new AtomicInteger();
private Channel[] high;
private final AtomicInteger highCounter = new AtomicInteger();
private Channel[] ping;
private final AtomicInteger pingCounter = new AtomicInteger();

public NodeChannels(Channel[] low, Channel[] med, Channel[] high) {
public NodeChannels(Channel[] low, Channel[] med, Channel[] high, Channel[] ping) {
this.low = low;
this.med = med;
this.high = high;
this.ping = ping;
}

public boolean hasChannel(Channel channel) {
return hasChannel(channel, low) || hasChannel(channel, med) || hasChannel(channel, high);
return hasChannel(channel, low) || hasChannel(channel, med) || hasChannel(channel, high) || hasChannel(channel, ping);
}

private boolean hasChannel(Channel channel, Channel[] channels) {
Expand All @@ -846,6 +872,8 @@ public Channel channel(TransportRequestOptions.Type type) {
return med[Math.abs(medCounter.incrementAndGet()) % med.length];
} else if (type == TransportRequestOptions.Type.HIGH) {
return high[Math.abs(highCounter.incrementAndGet()) % high.length];
} else if (type == TransportRequestOptions.Type.PING) {
return ping[Math.abs(pingCounter.incrementAndGet()) % ping.length];
} else {
return low[Math.abs(lowCounter.incrementAndGet()) % low.length];
}
Expand All @@ -856,6 +884,7 @@ public synchronized void close() {
closeChannelsAndWait(low, futures);
closeChannelsAndWait(med, futures);
closeChannelsAndWait(high, futures);
closeChannelsAndWait(ping, futures);
for (ChannelFuture future : futures) {
future.awaitUninterruptibly();
}
Expand Down

0 comments on commit 3771dba

Please sign in to comment.