Skip to content

Commit

Permalink
Issue #3758 - Avoid sending empty trailer frames for http/2 requests.
Browse files Browse the repository at this point in the history
Modified the sender logic to allow specific subclasses to decide
when to send the trailers, if any.
This allows HTTP/2 to correctly compute the end_stream flag and avoid
sending empty trailers frames with end_stream=true.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Jun 12, 2019
1 parent da4f116 commit 8f53d14
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 215 deletions.
Expand Up @@ -22,12 +22,10 @@
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.util.BufferUtil;
Expand Down Expand Up @@ -67,7 +65,6 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
private final AtomicReference<SenderState> senderState = new AtomicReference<>(SenderState.IDLE);
private final Callback commitCallback = new CommitCallback();
private final IteratingCallback contentCallback = new ContentCallback();
private final Callback trailersCallback = new TrailersCallback();
private final Callback lastCallback = new LastCallback();
private final HttpChannel channel;
private HttpContent content;
Expand Down Expand Up @@ -444,15 +441,6 @@ private void terminateRequest(HttpExchange exchange, Throwable failure, Result r
*/
protected abstract void sendContent(HttpExchange exchange, HttpContent content, Callback callback);

/**
* Implementations should send the HTTP trailers and notify the given {@code callback} of the
* result of this operation.
*
* @param exchange the exchange to send
* @param callback the callback to notify
*/
protected abstract void sendTrailers(HttpExchange exchange, Callback callback);

protected void reset()
{
HttpContent content = this.content;
Expand Down Expand Up @@ -745,20 +733,10 @@ private void process() throws Exception
if (content == null)
return;

HttpRequest request = exchange.getRequest();
Supplier<HttpFields> trailers = request.getTrailers();
boolean hasContent = content.hasContent();
if (!hasContent)
if (!content.hasContent())
{
if (trailers == null)
{
// No trailers or content to send, we are done.
someToSuccess(exchange);
}
else
{
sendTrailers(exchange, lastCallback);
}
// No content to send, we are done.
someToSuccess(exchange);
}
else
{
Expand Down Expand Up @@ -859,9 +837,7 @@ protected Action process() throws Exception

if (lastContent)
{
HttpRequest request = exchange.getRequest();
Supplier<HttpFields> trailers = request.getTrailers();
sendContent(exchange, content, trailers == null ? lastCallback : trailersCallback);
sendContent(exchange, content, lastCallback);
return Action.IDLE;
}

Expand Down Expand Up @@ -925,28 +901,6 @@ protected void onCompleteSuccess()
}
}

private class TrailersCallback implements Callback
{
@Override
public void succeeded()
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
sendTrailers(exchange, lastCallback);
}

@Override
public void failed(Throwable x)
{
HttpContent content = HttpSender.this.content;
if (content == null)
return;
content.failed(x);
anyToFailure(x);
}
}

private class LastCallback implements Callback
{
@Override
Expand Down
Expand Up @@ -59,7 +59,7 @@ protected void sendHeaders(HttpExchange exchange, HttpContent content, Callback
{
try
{
new HeadersCallback(exchange, content, callback, getHttpChannel().getHttpConnection()).iterate();
new HeadersCallback(exchange, content, callback).iterate();
}
catch (Throwable x)
{
Expand All @@ -83,8 +83,8 @@ protected void sendContent(HttpExchange exchange, HttpContent content, Callback
HttpGenerator.Result result = generator.generateRequest(null, null, chunk, contentBuffer, lastContent);
if (LOG.isDebugEnabled())
LOG.debug("Generated content ({} bytes) - {}/{}",
contentBuffer == null ? -1 : contentBuffer.remaining(),
result, generator);
contentBuffer == null ? -1 : contentBuffer.remaining(),
result, generator);
switch (result)
{
case NEED_CHUNK:
Expand Down Expand Up @@ -138,21 +138,6 @@ protected void sendContent(HttpExchange exchange, HttpContent content, Callback
}
}

@Override
protected void sendTrailers(HttpExchange exchange, Callback callback)
{
try
{
new TrailersCallback(callback).iterate();
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug(x);
callback.failed(x);
}
}

@Override
protected void reset()
{
Expand Down Expand Up @@ -191,19 +176,17 @@ private class HeadersCallback extends IteratingCallback
private final HttpExchange exchange;
private final Callback callback;
private final MetaData.Request metaData;
private final HttpConnectionOverHTTP httpConnectionOverHTTP;
private ByteBuffer headerBuffer;
private ByteBuffer chunkBuffer;
private ByteBuffer contentBuffer;
private boolean lastContent;
private boolean generated;

public HeadersCallback(HttpExchange exchange, HttpContent content, Callback callback, HttpConnectionOverHTTP httpConnectionOverHTTP)
public HeadersCallback(HttpExchange exchange, HttpContent content, Callback callback)
{
super(false);
this.exchange = exchange;
this.callback = callback;
this.httpConnectionOverHTTP = httpConnectionOverHTTP;

HttpRequest request = exchange.getRequest();
ContentProvider requestContent = request.getContent();
Expand Down Expand Up @@ -231,10 +214,10 @@ protected Action process() throws Exception
HttpGenerator.Result result = generator.generateRequest(metaData, headerBuffer, chunkBuffer, contentBuffer, lastContent);
if (LOG.isDebugEnabled())
LOG.debug("Generated headers ({} bytes), chunk ({} bytes), content ({} bytes) - {}/{}",
headerBuffer == null ? -1 : headerBuffer.remaining(),
chunkBuffer == null ? -1 : chunkBuffer.remaining(),
contentBuffer == null ? -1 : contentBuffer.remaining(),
result, generator);
headerBuffer == null ? -1 : headerBuffer.remaining(),
chunkBuffer == null ? -1 : chunkBuffer.remaining(),
contentBuffer == null ? -1 : contentBuffer.remaining(),
result, generator);
switch (result)
{
case NEED_HEADER:
Expand All @@ -249,7 +232,8 @@ protected Action process() throws Exception
}
case NEED_CHUNK_TRAILER:
{
return Action.SUCCEEDED;
chunkBuffer = httpClient.getByteBufferPool().acquire(httpClient.getRequestBufferSize(), false);
break;
}
case FLUSH:
{
Expand All @@ -260,11 +244,8 @@ protected Action process() throws Exception
chunkBuffer = BufferUtil.EMPTY_BUFFER;
if (contentBuffer == null)
contentBuffer = BufferUtil.EMPTY_BUFFER;

httpConnectionOverHTTP.addBytesOut( BufferUtil.length(headerBuffer)
+ BufferUtil.length(chunkBuffer)
+ BufferUtil.length(contentBuffer));

long bytes = headerBuffer.remaining() + chunkBuffer.remaining() + contentBuffer.remaining();
getHttpChannel().getHttpConnection().addBytesOut(bytes);
endPoint.write(this, headerBuffer, chunkBuffer, contentBuffer);
generated = true;
return Action.SCHEDULED;
Expand Down Expand Up @@ -331,83 +312,6 @@ private void release()
}
}

private class TrailersCallback extends IteratingCallback
{
private final Callback callback;
private ByteBuffer chunkBuffer;

public TrailersCallback(Callback callback)
{
this.callback = callback;
}

@Override
protected Action process() throws Throwable
{
while (true)
{
HttpGenerator.Result result = generator.generateRequest(null, null, chunkBuffer, null, true);
if (LOG.isDebugEnabled())
LOG.debug("Generated trailers {}/{}", result, generator);
switch (result)
{
case NEED_CHUNK_TRAILER:
{
chunkBuffer = httpClient.getByteBufferPool().acquire(httpClient.getRequestBufferSize(), false);
break;
}
case FLUSH:
{
EndPoint endPoint = getHttpChannel().getHttpConnection().getEndPoint();
endPoint.write(this, chunkBuffer);
return Action.SCHEDULED;
}
case SHUTDOWN_OUT:
{
shutdownOutput();
return Action.SUCCEEDED;
}
case DONE:
{
return Action.SUCCEEDED;
}
default:
{
throw new IllegalStateException(result.toString());
}
}
}
}

@Override
public void succeeded()
{
release();
super.succeeded();
}

@Override
public void failed(Throwable x)
{
release();
callback.failed(x);
super.failed(x);
}

@Override
protected void onCompleteSuccess()
{
super.onCompleteSuccess();
callback.succeeded();
}

private void release()
{
httpClient.getByteBufferPool().release(chunkBuffer);
chunkBuffer = null;
}
}

private class ByteBufferRecyclerCallback extends Callback.Nested
{
private final ByteBufferPool pool;
Expand Down Expand Up @@ -435,7 +339,9 @@ public void succeeded()
public void failed(Throwable x)
{
for (ByteBuffer buffer : buffers)
{
pool.release(buffer);
}
super.failed(x);
}
}
Expand Down
Expand Up @@ -125,10 +125,4 @@ protected void sendContent(HttpExchange exchange, HttpContent content, Callback
getHttpChannel().flush(result);
}
}

@Override
protected void sendTrailers(HttpExchange exchange, Callback callback)
{
callback.succeeded();
}
}
Expand Up @@ -18,24 +18,13 @@

package org.eclipse.jetty.http2.client;

import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
Expand All @@ -58,10 +47,14 @@
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;

import org.eclipse.jetty.util.StringUtil;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TrailersTest extends AbstractTest
{
Expand Down Expand Up @@ -289,7 +282,7 @@ public void onData(Stream stream, DataFrame frame, Callback callback)

assertTrue(latch.await(5, TimeUnit.SECONDS));

assertTrue( frames.size()==3, frames.toString());
assertEquals(3, frames.size(), frames.toString());

HeadersFrame headers = (HeadersFrame)frames.get(0);
DataFrame data = (DataFrame)frames.get(1);
Expand All @@ -298,7 +291,7 @@ public void onData(Stream stream, DataFrame frame, Callback callback)
assertFalse(headers.isEndStream());
assertFalse(data.isEndStream());
assertTrue(trailers.isEndStream());
assertTrue(trailers.getMetaData().getFields().get(trailerName).equals(trailerValue));
assertEquals(trailers.getMetaData().getFields().get(trailerName), trailerValue);
}

@Test
Expand Down Expand Up @@ -358,6 +351,5 @@ public void onReset(Stream stream, ResetFrame frame)

assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));

}
}

0 comments on commit 8f53d14

Please sign in to comment.