Skip to content

Commit

Permalink
Issue #845 - Improve blocking IO for data rate limiting.
Browse files Browse the repository at this point in the history
Moved tests to run HTTP and HTTP/2 tests, and added more test cases.
  • Loading branch information
sbordet committed Sep 5, 2016
1 parent 0322a16 commit 705a68d
Show file tree
Hide file tree
Showing 15 changed files with 1,027 additions and 268 deletions.
Expand Up @@ -50,6 +50,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
private int maxHeaderBlockFragment = 0;
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
private ExecutionStrategy.Factory executionStrategyFactory = new ProduceExecuteConsume.Factory();
private long streamIdleTimeout;

public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration)
{
Expand Down Expand Up @@ -157,6 +158,17 @@ public void setExecutionStrategyFactory(ExecutionStrategy.Factory executionStrat
this.executionStrategyFactory = executionStrategyFactory;
}

@ManagedAttribute("The stream idle timeout in milliseconds")
public long getStreamIdleTimeout()
{
return streamIdleTimeout;
}

public void setStreamIdleTimeout(long streamIdleTimeout)
{
this.streamIdleTimeout = streamIdleTimeout;
}

public HttpConfiguration getHttpConfiguration()
{
return httpConfiguration;
Expand All @@ -177,8 +189,11 @@ public Connection newConnection(Connector connector, EndPoint endPoint)
// For a single stream in a connection, there will be a race between
// the stream idle timeout and the connection idle timeout. However,
// the typical case is that the connection will be busier and the
// stream idle timeout will expire earlier that the connection's.
session.setStreamIdleTimeout(endPoint.getIdleTimeout());
// stream idle timeout will expire earlier than the connection's.
long streamIdleTimeout = getStreamIdleTimeout();
if (streamIdleTimeout <= 0)
streamIdleTimeout = endPoint.getIdleTimeout();
session.setStreamIdleTimeout(streamIdleTimeout);
session.setInitialSessionRecvWindow(getInitialSessionRecvWindow());

ServerParser parser = newServerParser(connector, session);
Expand Down
Expand Up @@ -133,9 +133,9 @@ public void onData(IStream stream, DataFrame frame, Callback callback)
public boolean onStreamTimeout(IStream stream, Throwable failure)
{
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
boolean result = !channel.isRequestHandled();
boolean result = channel.onStreamTimeout(failure);
if (LOG.isDebugEnabled())
LOG.debug("{} idle timeout on {}: {}", result ? "Processing" : "Ignoring", stream, failure);
LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", stream, failure);
return result;
}

Expand All @@ -157,7 +157,7 @@ public boolean onSessionTimeout(Throwable failure)
result &= !channel.isRequestHandled();
}
if (LOG.isDebugEnabled())
LOG.debug("{} idle timeout on {}: {}", result ? "Processing" : "Ignoring", session, failure);
LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", session, failure);
return result;
}

Expand Down
Expand Up @@ -267,18 +267,33 @@ public void failed(Throwable x)
handle);
}

boolean delayed = _delayedUntilContent;
boolean wasDelayed = _delayedUntilContent;
_delayedUntilContent = false;
if (delayed)
if (wasDelayed)
_handled = true;
return handle || delayed ? this : null;
return handle || wasDelayed ? this : null;
}

public boolean isRequestHandled()
{
return _handled;
}

public boolean onStreamTimeout(Throwable failure)
{
if (!_handled)
return true;

HttpInput input = getRequest().getHttpInput();
boolean readFailed = input.failed(failure);
if (readFailed)
handle();

boolean writeFailed = getHttpTransport().onStreamTimeout(failure);

return readFailed || writeFailed;
}

public void onFailure(Throwable failure)
{
onEarlyEOF();
Expand Down
Expand Up @@ -43,7 +43,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
private static final Logger LOG = Log.getLogger(HttpTransportOverHTTP2.class);

private final AtomicBoolean commit = new AtomicBoolean();
private final Callback commitCallback = new CommitCallback();
private final TransportCallback transportCallback = new TransportCallback();
private final Connector connector;
private final HTTP2ServerConnection connection;
private IStream stream;
Expand Down Expand Up @@ -100,12 +100,22 @@ public void send(MetaData.Response info, boolean isHeadRequest, ByteBuffer conte
{
if (hasContent)
{
commit(info, false, commitCallback);
send(content, lastContent, callback);
Callback commitCallback = new Callback.Nested(callback)
{
@Override
public void succeeded()
{
if (transportCallback.start(callback, false))
send(content, lastContent, transportCallback);
}
};
if (transportCallback.start(commitCallback, true))
commit(info, false, transportCallback);
}
else
{
commit(info, lastContent, callback);
if (transportCallback.start(callback, false))
commit(info, lastContent, transportCallback);
}
}
else
Expand All @@ -117,7 +127,8 @@ public void send(MetaData.Response info, boolean isHeadRequest, ByteBuffer conte
{
if (hasContent || lastContent)
{
send(content, lastContent, callback);
if (transportCallback.start(callback, false))
send(content, lastContent, transportCallback);
}
else
{
Expand Down Expand Up @@ -186,6 +197,11 @@ private void send(ByteBuffer content, boolean lastContent, Callback callback)
stream.data(frame, callback);
}

public boolean onStreamTimeout(Throwable failure)
{
return transportCallback.onIdleTimeout(failure);
}

@Override
public void onCompleted()
{
Expand Down Expand Up @@ -214,20 +230,99 @@ public void abort(Throwable failure)
stream.reset(new ResetFrame(stream.getId(), ErrorCode.INTERNAL_ERROR.code), Callback.NOOP);
}

private class CommitCallback implements Callback.NonBlocking
private class TransportCallback implements Callback
{
private State state = State.IDLE;
private Callback callback;
private boolean commit;

public boolean start(Callback callback, boolean commit)
{
State state;
synchronized (this)
{
state = this.state;
if (state == State.IDLE)
{
this.state = State.WRITING;
this.callback = callback;
this.commit = commit;
return true;
}
}
callback.failed(new IllegalStateException("Invalid transport state: " + state));
return false;
}

@Override
public void succeeded()
{
boolean commit;
Callback callback = null;
synchronized (this)
{
commit = this.commit;
if (state != State.TIMEOUT)
{
callback = this.callback;
this.state = State.IDLE;
}
}
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #{} committed", stream.getId());
LOG.debug("HTTP2 Response #{} {}", stream.getId(), commit ? "committed" : "flushed content");
if (callback != null)
callback.succeeded();
}

@Override
public void failed(Throwable x)
{
boolean commit;
Callback callback = null;
synchronized (this)
{
commit = this.commit;
if (state != State.TIMEOUT)
{
callback = this.callback;
this.state = State.FAILED;
}
}
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #" + stream.getId() + " failed to commit", x);
LOG.debug("HTTP2 Response #" + stream.getId() + " failed to " + (commit ? "commit" : "flush"), x);
if (callback != null)
callback.failed(x);
}

@Override
public boolean isNonBlocking()
{
return callback.isNonBlocking();
}

private boolean onIdleTimeout(Throwable failure)
{
boolean result;
Callback callback = null;
synchronized (this)
{
result = state == State.WRITING;
if (result)
{
callback = this.callback;
this.state = State.TIMEOUT;
}
}
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #" + stream.getId() + " idle timeout", failure);
if (result)
callback.failed(failure);
return result;
}
}

private enum State
{
IDLE, WRITING, FAILED, TIMEOUT
}
}
Expand Up @@ -419,8 +419,8 @@ else if (_asyncWrite) // TODO refactor same as read
_state=State.ASYNC_WAIT;
action=Action.WAIT;
if (_asyncReadUnready)
_channel.asyncReadFillInterested();
Scheduler scheduler = _channel.getScheduler();
read_interested=true;
Scheduler scheduler=_channel.getScheduler();
if (scheduler!=null && _timeoutMs>0)
_event.setTimeoutTask(scheduler.schedule(_event,_timeoutMs,TimeUnit.MILLISECONDS));
}
Expand Down Expand Up @@ -454,6 +454,9 @@ else if (_asyncWrite) // TODO refactor same as read
}
}

if (read_interested)
_channel.asyncReadFillInterested();

return action;
}

Expand Down
Expand Up @@ -65,7 +65,7 @@ public class HttpConfiguration
private boolean _delayDispatchUntilContent = true;
private boolean _persistentConnectionsEnabled = true;
private int _maxErrorDispatches = 10;
private int _minRequestDataRate;
private long _minRequestDataRate;

/* ------------------------------------------------------------ */
/**
Expand Down Expand Up @@ -512,7 +512,7 @@ public void setMaxErrorDispatches(int max)
/**
* @return The minimum request data rate in bytes per second; or &lt;=0 for no limit
*/
public int getMinRequestDataRate()
public long getMinRequestDataRate()
{
return _minRequestDataRate;
}
Expand All @@ -521,7 +521,7 @@ public int getMinRequestDataRate()
/**
* @param bytesPerSecond The minimum request data rate in bytes per second; or &lt;=0 for no limit
*/
public void setMinRequestDataRate(int bytesPerSecond)
public void setMinRequestDataRate(long bytesPerSecond)
{
_minRequestDataRate=bytesPerSecond;
}
Expand Down
Expand Up @@ -124,9 +124,7 @@ protected HttpGenerator newHttpGenerator()

protected HttpChannelOverHttp newHttpChannel()
{
HttpChannelOverHttp httpChannel = new HttpChannelOverHttp(this, _connector, _config, getEndPoint(), this);

return httpChannel;
return new HttpChannelOverHttp(this, _connector, _config, getEndPoint(), this);
}

protected HttpParser newHttpParser(HttpCompliance compliance)
Expand Down Expand Up @@ -285,9 +283,8 @@ protected boolean fillAndParseForContent()
while (_parser.inContentState())
{
int filled = fillRequestBuffer();
boolean handle = parseRequestBuffer();
handled|=handle;
if (handle || filled<=0 || _channel.getRequest().getHttpInput().hasContent())
handled = parseRequestBuffer();
if (handled || filled<=0 || _channel.getRequest().getHttpInput().hasContent())
break;
}
return handled;
Expand Down

0 comments on commit 705a68d

Please sign in to comment.