Skip to content

Commit

Permalink
100 continue support for server
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Mar 13, 2016
1 parent 97efe9c commit 8a9b5b2
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 13 deletions.
Expand Up @@ -257,7 +257,9 @@ public HttpServerResponse closeHandler(@Nullable Handler<Void> handler) {
@Override @Override
public HttpServerResponse writeContinue() { public HttpServerResponse writeContinue() {
checkHeadWritten(); checkHeadWritten();
throw new UnsupportedOperationException(); encoder.writeHeaders(ctx, stream.id(), new DefaultHttp2Headers().status("100"), 0, false, ctx.newPromise());
ctx.flush();
return this;
} }


@Override @Override
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/http/impl/HttpServerImpl.java
Expand Up @@ -204,7 +204,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) thr
configureHttp1(pipeline); configureHttp1(pipeline);
} else { } else {
HandlerHolder<HttpServerRequest> holder = reqHandlerManager.chooseHandler(ch.eventLoop()); HandlerHolder<HttpServerRequest> holder = reqHandlerManager.chooseHandler(ch.eventLoop());
pipeline.addLast("handler", new VertxHttp2HandlerBuilder(ctx, holder.context, serverOrigin, options.getHttp2Settings(), options.isCompressionSupported(), holder.handler).build()); pipeline.addLast("handler", new VertxHttp2HandlerBuilder(ctx, holder.context, serverOrigin, options, holder.handler).build());
} }
} }
}); });
Expand Down
18 changes: 13 additions & 5 deletions src/main/java/io/vertx/core/http/impl/VertxHttp2Handler.java
Expand Up @@ -19,6 +19,8 @@
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpServerUpgradeHandler; import io.netty.handler.codec.http.HttpServerUpgradeHandler;
import io.netty.handler.codec.http2.DefaultHttp2Headers; import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2CodecUtil; import io.netty.handler.codec.http2.Http2CodecUtil;
Expand Down Expand Up @@ -62,9 +64,9 @@ public class VertxHttp2Handler extends Http2ConnectionHandler implements Http2Fr


static final String UPGRADE_RESPONSE_HEADER = "http-to-http2-upgrade"; static final String UPGRADE_RESPONSE_HEADER = "http-to-http2-upgrade";


private ChannelHandlerContext context; private final ChannelHandlerContext context;
private final HttpServerOptions options;
private final ContextInternal handlerContext; private final ContextInternal handlerContext;
private final boolean supportsCompression;
private final String serverOrigin; private final String serverOrigin;
private final IntObjectMap<VertxHttp2Stream> streams = new IntObjectHashMap<>(); private final IntObjectMap<VertxHttp2Stream> streams = new IntObjectHashMap<>();
private final Handler<HttpServerRequest> handler; private final Handler<HttpServerRequest> handler;
Expand All @@ -83,7 +85,7 @@ public class VertxHttp2Handler extends Http2ConnectionHandler implements Http2Fr
private Handler<Throwable> exceptionHandler; private Handler<Throwable> exceptionHandler;


VertxHttp2Handler(ChannelHandlerContext context, ContextInternal handlerContext, String serverOrigin, Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, VertxHttp2Handler(ChannelHandlerContext context, ContextInternal handlerContext, String serverOrigin, Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
Http2Settings initialSettings, boolean supportsCompression, Handler<HttpServerRequest> handler) { Http2Settings initialSettings, HttpServerOptions options, Handler<HttpServerRequest> handler) {
super(decoder, encoder, initialSettings); super(decoder, encoder, initialSettings);


encoder.flowController().listener(stream -> { encoder.flowController().listener(stream -> {
Expand All @@ -101,8 +103,8 @@ public void onStreamClosed(Http2Stream stream) {
} }
}); });


this.options = options;
this.context = context; this.context = context;
this.supportsCompression = supportsCompression;
this.handlerContext = handlerContext; this.handlerContext = handlerContext;
this.serverOrigin = serverOrigin; this.serverOrigin = serverOrigin;
this.handler = handler; this.handler = handler;
Expand Down Expand Up @@ -166,12 +168,18 @@ public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
Http2Stream stream = conn.stream(streamId); Http2Stream stream = conn.stream(streamId);
Http2ServerRequestImpl req = (Http2ServerRequestImpl) streams.get(streamId); Http2ServerRequestImpl req = (Http2ServerRequestImpl) streams.get(streamId);
if (req == null) { if (req == null) {
String contentEncoding = supportsCompression ? UriUtils.determineContentEncoding(headers) : null; String contentEncoding = options.isCompressionSupported() ? UriUtils.determineContentEncoding(headers) : null;
Http2ServerRequestImpl newReq = req = new Http2ServerRequestImpl(handlerContext.owner(), this, serverOrigin, conn, stream, ctx, encoder(), headers, contentEncoding); Http2ServerRequestImpl newReq = req = new Http2ServerRequestImpl(handlerContext.owner(), this, serverOrigin, conn, stream, ctx, encoder(), headers, contentEncoding);
if (isMalformedRequest(headers)) { if (isMalformedRequest(headers)) {
encoder().writeRstStream(ctx, streamId, Http2Error.PROTOCOL_ERROR.code(), ctx.newPromise()); encoder().writeRstStream(ctx, streamId, Http2Error.PROTOCOL_ERROR.code(), ctx.newPromise());
return; return;
} }
CharSequence value = headers.get(HttpHeaderNames.EXPECT);
if (options.isHandle100ContinueAutomatically() &&
((value != null && HttpHeaderValues.CONTINUE.equals(value)) ||
headers.contains(HttpHeaderNames.EXPECT, HttpHeaderValues.CONTINUE))) {
req.response().writeContinue();
}
streams.put(streamId, newReq); streams.put(streamId, newReq);
handlerContext.executeFromIO(() -> { handlerContext.executeFromIO(() -> {
handler.handle(newReq); handler.handle(newReq);
Expand Down
Expand Up @@ -25,6 +25,7 @@
import io.vertx.core.Context; import io.vertx.core.Context;
import io.vertx.core.Handler; import io.vertx.core.Handler;
import io.vertx.core.Vertx; import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.ContextInternal;


Expand All @@ -36,23 +37,23 @@ public class VertxHttp2HandlerBuilder extends AbstractHttp2ConnectionHandlerBuil
private final ChannelHandlerContext context; private final ChannelHandlerContext context;
private final ContextInternal handlerContext; private final ContextInternal handlerContext;
private final String serverOrigin; private final String serverOrigin;
private final boolean supportsCompression; private final HttpServerOptions options;
private final Handler<HttpServerRequest> handler; private final Handler<HttpServerRequest> handler;


public VertxHttp2HandlerBuilder( public VertxHttp2HandlerBuilder(
ChannelHandlerContext context, ChannelHandlerContext context,
ContextInternal handlerContext, ContextInternal handlerContext,
String serverOrigin, String serverOrigin,
io.vertx.core.http.Http2Settings initialSettings, HttpServerOptions option,
boolean supportsCompression,
Handler<HttpServerRequest> handler) { Handler<HttpServerRequest> handler) {


this.handlerContext = handlerContext; this.handlerContext = handlerContext;
this.serverOrigin = serverOrigin; this.serverOrigin = serverOrigin;
this.handler = handler; this.handler = handler;
this.context = context; this.context = context;
this.supportsCompression = supportsCompression; this.options = option;


io.vertx.core.http.Http2Settings initialSettings = options.getHttp2Settings();
if (initialSettings != null) { if (initialSettings != null) {
if (initialSettings.getHeaderTableSize() != null) { if (initialSettings.getHeaderTableSize() != null) {
initialSettings().headerTableSize(initialSettings.getHeaderTableSize()); initialSettings().headerTableSize(initialSettings.getHeaderTableSize());
Expand Down Expand Up @@ -92,12 +93,12 @@ protected VertxHttp2Handler build() {


@Override @Override
protected VertxHttp2Handler build(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings) throws Exception { protected VertxHttp2Handler build(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings) throws Exception {
if (supportsCompression) { if (options.isCompressionSupported()) {
encoder = new CompressorHttp2ConnectionEncoder(encoder); encoder = new CompressorHttp2ConnectionEncoder(encoder);
} }
VertxHttp2Handler vertxHttp2Handler = new VertxHttp2Handler( VertxHttp2Handler vertxHttp2Handler = new VertxHttp2Handler(
context, context,
handlerContext, serverOrigin, decoder, encoder, initialSettings, supportsCompression, handler); handlerContext, serverOrigin, decoder, encoder, initialSettings, options, handler);
frameListener(vertxHttp2Handler); frameListener(vertxHttp2Handler);
return vertxHttp2Handler; return vertxHttp2Handler;
} }
Expand Down
105 changes: 105 additions & 0 deletions src/test/java/io/vertx/test/core/Http2Test.java
Expand Up @@ -1634,4 +1634,109 @@ public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int
fut.sync(); fut.sync();
await(); await();
} }

@Test
public void test100ContinueHandledManually() throws Exception {
server.requestHandler(req -> {
assertEquals("100-continue", req.getHeader("expect"));
HttpServerResponse resp = req.response();
resp.writeContinue();
req.bodyHandler(body -> {
assertEquals("the-body", body.toString());
resp.putHeader("wibble", "wibble-value").end();
});
});
test100Continue();
}

@Test
public void test100ContinueHandledAutomatically() throws Exception {
server.close();
server = vertx.createHttpServer(serverOptions.setHandle100ContinueAutomatically(true));
server.requestHandler(req -> {
HttpServerResponse resp = req.response();
req.bodyHandler(body -> {
assertEquals("the-body", body.toString());
resp.putHeader("wibble", "wibble-value").end();
});
});
test100Continue();
}

private void test100Continue() throws Exception {
startServer();
TestClient client = new TestClient();
ChannelFuture fut = client.connect(4043, "localhost", request -> {
int id = request.nextStreamId();
request.decoder.frameListener(new Http2EventAdapter() {
int count = 0;
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream) throws Http2Exception {
switch (count++) {
case 0:
vertx.runOnContext(v -> {
assertEquals("100", headers.status().toString());
});
request.encoder.writeData(request.context, id, Buffer.buffer("the-body").getByteBuf(), 0, true, request.context.newPromise());
request.context.flush();
break;
case 1:
vertx.runOnContext(v -> {
assertEquals("200", headers.status().toString());
assertEquals("wibble-value", headers.get("wibble").toString());
testComplete();
});
break;
default:
vertx.runOnContext(v -> {
fail();
});
}
}
});
request.encoder.writeHeaders(request.context, id, GET("/").add("expect", "100-continue"), 0, false, request.context.newPromise());
request.context.flush();
});
fut.sync();
await();
}

@Test
public void test100ContinueRejectedManually() throws Exception {
server.requestHandler(req -> {
req.response().setStatusCode(405).end();
req.handler(buf -> {
fail();
});
});
startServer();
TestClient client = new TestClient();
ChannelFuture fut = client.connect(4043, "localhost", request -> {
int id = request.nextStreamId();
request.decoder.frameListener(new Http2EventAdapter() {
int count = 0;
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream) throws Http2Exception {
switch (count++) {
case 0:
vertx.runOnContext(v -> {
assertEquals("405", headers.status().toString());
vertx.setTimer(100, v2 -> {
testComplete();
});
});
break;
default:
vertx.runOnContext(v -> {
fail();
});
}
}
});
request.encoder.writeHeaders(request.context, id, GET("/").add("expect", "100-continue"), 0, false, request.context.newPromise());
request.context.flush();
});
fut.sync();
await();
}
} }

0 comments on commit 8a9b5b2

Please sign in to comment.