Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schedule transport ping interval #10189

Merged
merged 1 commit into from Mar 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -80,6 +80,7 @@
import org.elasticsearch.threadpool.ThreadPoolModule;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.netty.NettyTransport;

import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -152,7 +153,9 @@ public TransportClient(Settings.Builder settings, boolean loadConfigSettings) th
*/
public TransportClient(Settings pSettings, boolean loadConfigSettings) throws ElasticsearchException {
Tuple<Settings, Environment> tuple = InternalSettingsPreparer.prepareSettings(pSettings, loadConfigSettings);
Settings settings = settingsBuilder().put(tuple.v1())
Settings settings = settingsBuilder()
.put(NettyTransport.PING_SCHEDULE, "5s") // enable by default the transport schedule ping interval
.put(tuple.v1())
.put("network.server", false)
.put("node.client", true)
.put(CLIENT_TYPE_SETTING, CLIENT_TYPE)
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/org/elasticsearch/transport/netty/NettyHeader.java
Expand Up @@ -21,13 +21,34 @@

import org.elasticsearch.Version;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;

/**
*/
public class NettyHeader {

public static final int HEADER_SIZE = 2 + 4 + 8 + 1 + 4;

/**
* The magic number (must be lower than 0) for a ping message. This is handled
* specifically in {@link org.elasticsearch.transport.netty.SizeHeaderFrameDecoder}.
*/
public static final int PING_DATA_SIZE = -1;
private final static ChannelBuffer pingHeader;
static {
pingHeader = ChannelBuffers.buffer(6);
pingHeader.writeByte('E');
pingHeader.writeByte('S');
pingHeader.writeInt(PING_DATA_SIZE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment that says why we use -1 ie. to skip this message?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

}

/**
* A ping header is same as regular header, just with -1 for the size of the message.
*/
public static ChannelBuffer pingHeader() {
return pingHeader.duplicate();
}

public static void writeHeader(ChannelBuffer buffer, long requestId, byte status, Version version) {
int index = buffer.readerIndex();
buffer.setByte(index, 'E');
Expand Down
80 changes: 64 additions & 16 deletions src/main/java/org/elasticsearch/transport/netty/NettyTransport.java
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.netty.NettyUtils;
import org.elasticsearch.common.netty.OpenChannelsHandler;
import org.elasticsearch.common.netty.ReleaseChannelFutureListener;
Expand Down Expand Up @@ -76,6 +77,7 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -110,6 +112,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
public static final String CONNECTIONS_PER_NODE_REG = "transport.connections_per_node.reg";
public static final String CONNECTIONS_PER_NODE_STATE = "transport.connections_per_node.state";
public static final String CONNECTIONS_PER_NODE_PING = "transport.connections_per_node.ping";
public static final String PING_SCHEDULE = "transport.ping_schedule"; // the scheduled internal ping interval setting
public static final TimeValue DEFAULT_PING_SCHEDULE = TimeValue.timeValueMillis(-1); // the default ping schedule, defaults to disabled (-1)
public static final String DEFAULT_PORT_RANGE = "9300-9400";
public static final String DEFAULT_PROFILE = "default";

Expand All @@ -132,6 +136,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
protected final int connectionsPerNodeState;
protected final int connectionsPerNodePing;

private final TimeValue pingSchedule;

protected final BigArrays bigArrays;
protected final ThreadPool threadPool;
protected volatile OpenChannelsHandler serverOpenChannels;
Expand All @@ -149,6 +155,9 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
// connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?)
private final ReadWriteLock globalLock = new ReentrantReadWriteLock();

// package visibility for tests
final ScheduledPing scheduledPing;

@Inject
public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version) {
super(settings);
Expand Down Expand Up @@ -200,6 +209,12 @@ public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService n
} else {
receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes());
}

this.scheduledPing = new ScheduledPing();
this.pingSchedule = settings.getAsTime(PING_SCHEDULE, DEFAULT_PING_SCHEDULE);
if (pingSchedule.millis() > 0) {
threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, scheduledPing);
}
}

public Settings settings() {
Expand Down Expand Up @@ -752,6 +767,7 @@ public void connectToNode(DiscoveryNode node, boolean light) {
}
}
// we acquire a connection lock, so no way there is an existing connection
nodeChannels.start();
connectedNodes.put(node, nodeChannels);
if (logger.isDebugEnabled()) {
logger.debug("connected to node [{}]", node);
Expand Down Expand Up @@ -1047,6 +1063,7 @@ public void run() {

public static class NodeChannels {

ImmutableList<Channel> allChannels = ImmutableList.of();
private Channel[] recovery;
private final AtomicInteger recoveryCounter = new AtomicInteger();
private Channel[] bulk;
Expand All @@ -1066,12 +1083,12 @@ public NodeChannels(Channel[] recovery, Channel[] bulk, Channel[] reg, Channel[]
this.ping = ping;
}

public boolean hasChannel(Channel channel) {
return hasChannel(channel, recovery) || hasChannel(channel, bulk) || hasChannel(channel, reg) || hasChannel(channel, state) || hasChannel(channel, ping);
public void start() {
this.allChannels = ImmutableList.<Channel>builder().add(recovery).add(bulk).add(reg).add(state).add(ping).build();
}

private boolean hasChannel(Channel channel, Channel[] channels) {
for (Channel channel1 : channels) {
public boolean hasChannel(Channel channel) {
for (Channel channel1 : allChannels) {
if (channel.equals(channel1)) {
return true;
}
Expand All @@ -1097,18 +1114,7 @@ public Channel channel(TransportRequestOptions.Type type) {

public synchronized void close() {
List<ChannelFuture> futures = new ArrayList<>();
closeChannelsAndWait(recovery, futures);
closeChannelsAndWait(bulk, futures);
closeChannelsAndWait(reg, futures);
closeChannelsAndWait(state, futures);
closeChannelsAndWait(ping, futures);
for (ChannelFuture future : futures) {
future.awaitUninterruptibly();
}
}

private void closeChannelsAndWait(Channel[] channels, List<ChannelFuture> futures) {
for (Channel channel : channels) {
for (Channel channel : allChannels) {
try {
if (channel != null && channel.isOpen()) {
futures.add(channel.close());
Expand All @@ -1117,6 +1123,48 @@ private void closeChannelsAndWait(Channel[] channels, List<ChannelFuture> future
//ignore
}
}
for (ChannelFuture future : futures) {
future.awaitUninterruptibly();
}
}
}

class ScheduledPing implements Runnable {

final CounterMetric successfulPings = new CounterMetric();
final CounterMetric failedPings = new CounterMetric();

@Override
public void run() {
if (lifecycle.stoppedOrClosed()) {
return;
}
for (Map.Entry<DiscoveryNode, NodeChannels> entry : connectedNodes.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you assign the key and the value here before we use it? it's way easier to read

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

DiscoveryNode node = entry.getKey();
NodeChannels channels = entry.getValue();
// we only support the ping message format since 1.6
if (node.version().onOrAfter(Version.V_1_6_0)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please make sure you remove the conditional on master

for (Channel channel : channels.allChannels) {
try {
ChannelFuture future = channel.write(NettyHeader.pingHeader());
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
successfulPings.inc();
}
});
} catch (Throwable t) {
if (channel.isOpen()) {
logger.debug("[{}] failed to send ping transport message", t, node);
failedPings.inc();
} else {
logger.trace("[{}] failed to send ping transport message (channel closed)", t, node);
}
}
}
}
}
threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, this);
}
}
}
Expand Up @@ -68,6 +68,12 @@ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffe
}

int dataLen = buffer.getInt(buffer.readerIndex() + 2);
if (dataLen == NettyHeader.PING_DATA_SIZE) {
// discard the messages we read and continue, this is achieved by skipping the bytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we assert that the first 2 bytes are actually E and S

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have an if check before in the if statement and send stream corrupt if not and I believe a test for it already

// and returning null
buffer.skipBytes(6);
return null;
}
if (dataLen <= 0) {
throw new StreamCorruptedException("invalid data length: " + dataLen);
}
Expand Down
Expand Up @@ -114,6 +114,7 @@
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.client.RandomizingClient;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.transport.netty.NettyTransport;
import org.hamcrest.Matchers;
import org.joda.time.DateTimeZone;
import org.junit.*;
Expand Down Expand Up @@ -465,6 +466,9 @@ private static ImmutableSettings.Builder setRandomSettings(Random random, Immuta
// see #7210
builder.put(RecoverySettings.INDICES_RECOVERY_COMPRESS, false);
}
if (random.nextBoolean()) {
builder.put(NettyTransport.PING_SCHEDULE, RandomInts.randomIntBetween(random, 100, 2000) + "ms");
}
return builder;
}

Expand Down
@@ -0,0 +1,140 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.netty;

import com.google.common.collect.ImmutableMap;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

/**
*/
public class NettyScheduledPingTests extends ElasticsearchTestCase {

@Test
public void testScheduledPing() throws Exception {
ThreadPool threadPool = new ThreadPool(getClass().getName());

int startPort = 11000 + randomIntBetween(0, 255);
int endPort = startPort + 10;
Settings settings = ImmutableSettings.builder().put(NettyTransport.PING_SCHEDULE, "5ms").put("transport.tcp.port", startPort + "-" + endPort).build();

final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT);
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool);
serviceA.start();

final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT);
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool);
serviceB.start();

DiscoveryNode nodeA = new DiscoveryNode("TS_A", "TS_A", serviceA.boundAddress().publishAddress(), ImmutableMap.<String, String>of(), Version.CURRENT);
DiscoveryNode nodeB = new DiscoveryNode("TS_B", "TS_B", serviceB.boundAddress().publishAddress(), ImmutableMap.<String, String>of(), Version.CURRENT);

serviceA.connectToNode(nodeB);
serviceB.connectToNode(nodeA);

assertBusy(new Runnable() {
@Override
public void run() {
assertThat(nettyA.scheduledPing.successfulPings.count(), greaterThan(100l));
assertThat(nettyB.scheduledPing.successfulPings.count(), greaterThan(100l));
}
});
assertThat(nettyA.scheduledPing.failedPings.count(), equalTo(0l));
assertThat(nettyB.scheduledPing.failedPings.count(), equalTo(0l));

serviceA.registerHandler("sayHello", new BaseTransportRequestHandler<TransportRequest.Empty>() {
@Override
public TransportRequest.Empty newInstance() {
return TransportRequest.Empty.INSTANCE;
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}

@Override
public void messageReceived(TransportRequest.Empty request, TransportChannel channel) {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.options());
} catch (IOException e) {
e.printStackTrace();
assertThat(e.getMessage(), false, equalTo(true));
}
}
});

// send some messages while ping requests are going around
int rounds = scaledRandomIntBetween(100, 5000);
for (int i = 0; i < rounds; i++) {
serviceB.submitRequest(nodeA, "sayHello",
TransportRequest.Empty.INSTANCE, TransportRequestOptions.options().withCompress(randomBoolean()), new BaseTransportResponseHandler<TransportResponse.Empty>() {
@Override
public TransportResponse.Empty newInstance() {
return TransportResponse.Empty.INSTANCE;
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}

@Override
public void handleResponse(TransportResponse.Empty response) {
}

@Override
public void handleException(TransportException exp) {
exp.printStackTrace();
assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true));
}
}).txGet();
}

assertBusy(new Runnable() {
@Override
public void run() {
assertThat(nettyA.scheduledPing.successfulPings.count(), greaterThan(200l));
assertThat(nettyB.scheduledPing.successfulPings.count(), greaterThan(200l));
}
});
assertThat(nettyA.scheduledPing.failedPings.count(), equalTo(0l));
assertThat(nettyB.scheduledPing.failedPings.count(), equalTo(0l));

Releasables.close(serviceA, serviceB);
terminate(threadPool);
}
}