Permalink
Browse files

Split read based timeouts.

A Nifty client actually has to deal with two separate timeouts:

- request timeout - the maximum amount of time that the caller is willing to wait
  for an I/O request to complete
- receive timeout - the maximum amount of time that the client is willing for the
  server to make any progress

These times can be very different. e.g. a server might trickle a request sending a
packet every three seconds for a long time and the caller is willing to wait minutes
for a request to complete as long as it makes progress.

This patch fixes the name confusion around read, request and receive timeouts in the
nifty client code, uses consistent naming everywhere and adds a new timeout (requestTimeout)
while the existing receiveTimeout now does what the name suggests.

Existing code should still work fine as both timeouts are set to the same value by default.
  • Loading branch information...
1 parent 6085ee2 commit 413e7c23f846a972cd8d4e8fb8313a9876546488 @hgschmie hgschmie committed Nov 20, 2013
@@ -56,7 +56,13 @@
private final Channel nettyChannel;
private Duration sendTimeout = null;
+
+ // Timeout until the whole request must be complete
private Duration requestTimeout = null;
+
+ // Timeout for not receiving any data from the server
+ private Duration receiveTimeout = null;
+
private final Map<Integer, Request> requestMap = new HashMap<>();
private volatile TException channelError;
private final Timer timer;
@@ -118,12 +124,24 @@ public Duration getSendTimeout()
@Override
public void setReceiveTimeout(Duration receiveTimeout)
{
- this.requestTimeout = receiveTimeout;
+ this.receiveTimeout = receiveTimeout;
}
@Override
public Duration getReceiveTimeout()
{
+ return receiveTimeout;
+ }
+
+ @Override
+ public void setRequestTimeout(Duration requestTimeout)
+ {
+ this.requestTimeout = requestTimeout;
+ }
+
+ @Override
+ public Duration getRequestTimeout()
+ {
return this.requestTimeout;
}
@@ -146,11 +164,6 @@ public void executeInIoThread(Runnable runnable)
nioSocketChannel.getWorker().executeInIoThread(runnable, true);
}
- private boolean hasRequestTimeout()
- {
- return requestTimeout != null;
- }
-
@Override
public void sendAsynchronousRequest(final ChannelBuffer message,
final boolean oneway,
@@ -208,7 +221,7 @@ private void messageSent(ChannelFuture future, Request request, boolean oneway)
if (oneway) {
retireRequest(request);
} else {
- queueReceiveTimeout(request);
+ queueRequestAndReceiveTimeout(request);
}
} else {
TTransportException transportException =
@@ -252,7 +265,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event)
private Request makeRequest(int sequenceId, Listener listener)
{
- Request request = new Request(sequenceId, listener);
+ Request request = new Request(listener);
requestMap.put(sequenceId, request);
return request;
}
@@ -265,13 +278,18 @@ private void retireRequest(Request request)
private void cancelRequestTimeouts(Request request)
{
Timeout sendTimeout = request.getSendTimeout();
- if (sendTimeout != null) {
+ if (sendTimeout != null && !sendTimeout.isCancelled()) {
sendTimeout.cancel();
}
- Timeout responseTimeout = request.getReceiveTimeout();
- if (responseTimeout != null) {
- responseTimeout.cancel();
+ Timeout requestTimeout = request.getRequestTimeout();
+ if (requestTimeout != null && requestTimeout.isCancelled()) {
+ requestTimeout.cancel();
+ }
+
+ Timeout receiveTimeout = request.getReceiveTimeout();
+ if (receiveTimeout != null && receiveTimeout.isCancelled()) {
+ receiveTimeout.cancel();
}
}
@@ -361,35 +379,28 @@ private void fireChannelErrorCallback(Listener listener, Throwable throwable)
fireChannelErrorCallback(listener, wrapException(throwable));
}
- private void onSendTimeoutExpired(Request request)
+ private void onSendTimeoutReceived(Request request)
{
- Timeout expiredTimeout = request.getSendTimeout();
-
- if (!expiredTimeout.isCancelled()) {
- cancelAllTimeouts();
- WriteTimeoutException timeoutException =
- new WriteTimeoutException(
- "Timed out waiting " + getSendTimeout() + " to send request");
-
- fireChannelErrorCallback(request.getListener(), timeoutException);
- }
+ cancelAllTimeouts();
+ WriteTimeoutException timeoutException = new WriteTimeoutException("Timed out waiting " + getSendTimeout() + " to send data to server");
+ fireChannelErrorCallback(request.getListener(), timeoutException);
}
- private void onReceiveTimeoutExpired(Request request)
+ private void onRequestTimeoutReceived(Request request)
{
- Timeout expiredTimeout = request.getReceiveTimeout();
-
- if (!expiredTimeout.isCancelled()) {
- cancelAllTimeouts();
-
- ReadTimeoutException timeoutException =
- new ReadTimeoutException(
- "Timed out waiting " + getReceiveTimeout() + " to receive response");
+ cancelAllTimeouts();
+ ReadTimeoutException timeoutException = new ReadTimeoutException("Timed out waiting " + getRequestTimeout() + " to complete request");
+ fireChannelErrorCallback(request.getListener(), timeoutException);
+ }
- fireChannelErrorCallback(request.getListener(), timeoutException);
- }
+ private void onReceiveTimeoutReceived(Request request)
+ {
+ cancelAllTimeouts();
+ ReadTimeoutException timeoutException = new ReadTimeoutException("Timed out waiting " + getReceiveTimeout() + " to receive data from server");
+ fireChannelErrorCallback(request.getListener(), timeoutException);
}
+
private void queueSendTimeout(final Request request) throws TTransportException
{
if (this.sendTimeout != null) {
@@ -398,7 +409,7 @@ private void queueSendTimeout(final Request request) throws TTransportException
TimerTask sendTimeoutTask = new IoThreadBoundTimerTask(this, new TimerTask() {
@Override
public void run(Timeout timeout) {
- onSendTimeoutExpired(request);
+ onSendTimeoutReceived(request);
}
});
@@ -414,21 +425,37 @@ public void run(Timeout timeout) {
}
}
- private void queueReceiveTimeout(final Request request) throws TTransportException
+ private void queueRequestAndReceiveTimeout(final Request request) throws TTransportException
{
if (this.requestTimeout != null) {
long requestTimeoutMs = this.requestTimeout.toMillis();
if (requestTimeoutMs > 0) {
- TimerTask receiveTimeoutTask = new IoThreadBoundTimerTask(this, new TimerTask() {
+ TimerTask requestTimeoutTask = new IoThreadBoundTimerTask(this, new TimerTask() {
@Override
public void run(Timeout timeout) {
- onReceiveTimeoutExpired(request);
+ onRequestTimeoutReceived(request);
}
});
Timeout timeout;
try {
- timeout = timer.newTimeout(receiveTimeoutTask, requestTimeoutMs, TimeUnit.MILLISECONDS);
+ timeout = timer.newTimeout(requestTimeoutTask, requestTimeoutMs, TimeUnit.MILLISECONDS);
+ }
+ catch (IllegalStateException e) {
+ throw new TTransportException("Unable to schedule request timeout");
+ }
+ request.setRequestTimeout(timeout);
+ }
+ }
+
+ if (this.receiveTimeout != null) {
+ long receiveTimeoutNanos = this.receiveTimeout.roundTo(TimeUnit.NANOSECONDS);
+ if (receiveTimeoutNanos > 0) {
+ TimerTask receiveTimeoutTask = new IoThreadBoundTimerTask(this, new ReceiveTimeoutTask(receiveTimeoutNanos, request));
+
+ Timeout timeout;
+ try {
+ timeout = timer.newTimeout(receiveTimeoutTask, receiveTimeoutNanos, TimeUnit.NANOSECONDS);
}
catch (IllegalStateException e) {
throw new TTransportException("Unable to schedule receive timeout");
@@ -438,6 +465,7 @@ public void run(Timeout timeout) {
}
}
+
/**
* Used to create TimerTasks that will fire
*/
@@ -476,25 +504,30 @@ public void run()
*/
private static class Request
{
- private final int sequenceId;
private final Listener listener;
private Timeout sendTimeout;
- private Timeout receiveTimeout;
+ private Timeout requestTimeout;
- public Request(int sequenceId, Listener listener)
+ private volatile Timeout receiveTimeout;
+
+ public Request(Listener listener)
{
- this.sequenceId = sequenceId;
this.listener = listener;
}
- public int getSequenceId()
+ public Listener getListener()
{
- return sequenceId;
+ return listener;
}
- public Listener getListener()
+ public Timeout getRequestTimeout()
{
- return listener;
+ return requestTimeout;
+ }
+
+ public void setRequestTimeout(Timeout requestTimeout)
+ {
+ this.requestTimeout = requestTimeout;
}
public Timeout getReceiveTimeout()
@@ -517,4 +550,45 @@ public void setSendTimeout(Timeout sendTimeout)
this.sendTimeout = sendTimeout;
}
}
+
+ private final class ReceiveTimeoutTask implements TimerTask
+ {
+ private final TimeoutHandler timeoutHandler;
+ private final long timeoutNanos;
+ private final Request request;
+
+ ReceiveTimeoutTask(long timeoutNanos, Request request)
+ {
+ this.timeoutHandler = TimeoutHandler.findTimeoutHandler(getNettyChannel().getPipeline());
+ this.timeoutNanos = timeoutNanos;
+ this.request = request;
+ }
+
+ public void run(Timeout timeout) throws Exception
+ {
+ if (timeoutHandler == null) {
+ return;
+ }
+
+ if (timeout.isCancelled()) {
+ return;
+ }
+
+ if (!getNettyChannel().isOpen()) {
+ return;
+ }
+
+ long currentTimeNanos = System.nanoTime();
+
+ long timePassed = currentTimeNanos - timeoutHandler.getLastMessageReceivedNanos();
+ long nextDelayNanos = timeoutNanos - timePassed;
+
+ if (nextDelayNanos <= 0) {
+ onReceiveTimeoutReceived(request);
+ }
+ else {
+ request.setReceiveTimeout(timer.newTimeout(this, nextDelayNanos, TimeUnit.NANOSECONDS));
+ }
+ }
+ }
}
@@ -17,6 +17,7 @@
import com.facebook.nifty.duplex.TDuplexProtocolFactory;
import com.google.common.net.HostAndPort;
+
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
@@ -68,7 +69,9 @@ public FramedClientConnector(HostAndPort address, TDuplexProtocolFactory protoco
public FramedClientChannel newThriftClientChannel(Channel nettyChannel, Timer timer)
{
FramedClientChannel channel = new FramedClientChannel(nettyChannel, timer, getProtocolFactory());
- channel.getNettyChannel().getPipeline().addLast("thriftHandler", channel);
+ ChannelPipeline cp = nettyChannel.getPipeline();
+ TimeoutHandler.addToPipeline(cp);
+ cp.addLast("thriftHandler", channel);
return channel;
}
@@ -80,6 +83,7 @@ public ChannelPipelineFactory newChannelPipelineFactory(final int maxFrameSize)
public ChannelPipeline getPipeline()
throws Exception {
ChannelPipeline cp = Channels.pipeline();
+ TimeoutHandler.addToPipeline(cp);
cp.addLast("frameEncoder", new LengthFieldPrepender(LENGTH_FIELD_LENGTH));
cp.addLast(
"frameDecoder",
Oops, something went wrong.

0 comments on commit 413e7c2

Please sign in to comment.