Skip to content

Commit

Permalink
Added tests to verify input data consumption.
Browse files Browse the repository at this point in the history
Verify that input data is consumed at the end of a request handling,
either when input is not read and when an exception is thrown,
to make sure that the session flow control is not stalled.
  • Loading branch information
sbordet committed May 11, 2016
1 parent 9b6d423 commit 8ac23d1
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 14 deletions.
Expand Up @@ -37,6 +37,8 @@
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
Expand All @@ -47,6 +49,7 @@
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.FuturePromise;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -221,7 +224,7 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) t

response.setStatus(200);
response.setContentType("text/plain;charset=" + charset.name());
response.setContentLength(data.length*10);
response.setContentLength(data.length * 10);
response.flushBuffer();

try
Expand All @@ -238,7 +241,7 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) t
{
// Write some content after the stream has
// been reset, it should throw an exception.
for (int i=0;i<10;i++)
for (int i = 0; i < 10; i++)
{
Thread.sleep(500);
response.getOutputStream().write(data);
Expand Down Expand Up @@ -350,4 +353,87 @@ public void onHeaders(Stream stream, HeadersFrame frame)

Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
}

@Test
public void testClientResetConsumesQueuedData() throws Exception
{
start(new EmptyHttpServlet());

Session client = newClient(new Session.Listener.Adapter());
MetaData.Request request = newRequest("GET", new HttpFields());
HeadersFrame frame = new HeadersFrame(request, null, false);
FuturePromise<Stream> promise = new FuturePromise<>();
client.newStream(frame, promise, new Stream.Listener.Adapter());
Stream stream = promise.get(5, TimeUnit.SECONDS);
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
CountDownLatch dataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), data, false), new Callback()
{
@Override
public void succeeded()
{
dataLatch.countDown();
}
});
// The server does not read the data, so the flow control window should be zero.
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(0, ((ISession)client).updateSendWindow(0));

// Now reset the stream.
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);

// Wait for the server to receive the reset and process
// it, and for the client to process the window updates.
Thread.sleep(1000);

Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0));
}

@Test
public void testServerExceptionConsumesQueuedData() throws Exception
{
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
try
{
// Wait to let the data sent by the client to be queued.
Thread.sleep(1000);
throw new IllegalStateException();
}
catch (InterruptedException e)
{
throw new InterruptedIOException();
}
}
});

Session client = newClient(new Session.Listener.Adapter());
MetaData.Request request = newRequest("GET", new HttpFields());
HeadersFrame frame = new HeadersFrame(request, null, false);
FuturePromise<Stream> promise = new FuturePromise<>();
client.newStream(frame, promise, new Stream.Listener.Adapter());
Stream stream = promise.get(5, TimeUnit.SECONDS);
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
CountDownLatch dataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), data, false), new Callback()
{
@Override
public void succeeded()
{
dataLatch.countDown();
}
});
// The server does not read the data, so the flow control window should be zero.
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(0, ((ISession)client).updateSendWindow(0));

// Wait for the server process the exception, and
// for the client to process the window updates.
Thread.sleep(2000);

Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0));
}
}
@@ -1,4 +1,5 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.http2.hpack.LEVEL=INFO
#org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.http2.LEVEL=DEBUG
org.eclipse.jetty.http2.hpack.LEVEL=INFO
#org.eclipse.jetty.servlets.LEVEL=DEBUG
Expand Up @@ -62,7 +62,7 @@ public boolean isOptimizedForDirectBuffers()
// copying we can defer to the endpoint
return connection.getEndPoint().isOptimizedForDirectBuffers();
}

public IStream getStream()
{
return stream;
Expand Down Expand Up @@ -145,7 +145,7 @@ public void push(final MetaData.Request request)

if (LOG.isDebugEnabled())
LOG.debug("HTTP/2 Push {}",request);

stream.push(new PushPromiseFrame(stream.getId(), 0, request), new Promise<Stream>()
{
@Override
Expand Down Expand Up @@ -190,16 +190,20 @@ private void send(ByteBuffer content, boolean lastContent, Callback callback)
@Override
public void onCompleted()
{
// If the stream is not closed, it is still reading the request content.
// Send a reset to the other end so that it stops sending data.
if (!stream.isClosed())
{
// If the stream is not closed, it is still reading the request content.
// Send a reset to the other end so that it stops sending data.
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
// Now that this stream is reset, in-flight data frames will be consumed and discarded.
// Consume the existing queued data frames to avoid stalling the flow control.
HttpChannel channel = (HttpChannel)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
channel.getRequest().getHttpInput().consumeAll();
}

// Consume the existing queued data frames to
// avoid stalling the session flow control.
consumeInput();
}

protected void consumeInput()
{
HttpChannel channel = (HttpChannel)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
channel.getRequest().getHttpInput().consumeAll();
}

@Override
Expand All @@ -213,7 +217,7 @@ public void abort(Throwable failure)
}

private class CommitCallback implements Callback.NonBlocking
{
{
@Override
public void succeeded()
{
Expand Down

0 comments on commit 8ac23d1

Please sign in to comment.