Skip to content

Commit

Permalink
Start implementation of NetSocket tunneling in HTTP2
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Mar 14, 2016
1 parent b58cbf2 commit 71f09e9
Show file tree
Hide file tree
Showing 12 changed files with 656 additions and 326 deletions.
29 changes: 27 additions & 2 deletions src/main/java/io/vertx/core/http/impl/ClientConnection.java
Expand Up @@ -86,6 +86,9 @@ class ClientConnection extends ConnectionBase implements HttpClientConnection, H
private HttpClientRequestImpl requestForResponse;
private WebSocketImpl ws;

private boolean paused;
private Buffer pausedChunk;

ClientConnection(HttpVersion version, ConnectionManager manager, VertxInternal vertx, HttpClientImpl client, Handler<Throwable> exceptionHandler, Channel channel, boolean ssl, String host,
int port, ContextImpl context, ConnectionManager.Http1xPool listener, HttpClientMetrics metrics) {
super(vertx, channel, context, metrics);
Expand Down Expand Up @@ -297,12 +300,34 @@ void handleResponse(HttpResponse resp) {
requestForResponse.handleResponse(nResp);
}

public void doPause() {
super.doPause();
paused = true;
}

public void doResume() {
super.doResume();
paused = false;
}

void handleResponseChunk(Buffer buff) {
currentResponse.handleChunk(buff);
if (paused) {
if (pausedChunk == null) {
pausedChunk = buff.copy();
} else {
pausedChunk.appendBuffer(buff);
}
} else {
if (pausedChunk != null) {
buff = pausedChunk.appendBuffer(buff);
pausedChunk = null;
}
currentResponse.handleChunk(buff);
}
}

void handleResponseEnd(LastHttpContent trailer) {
currentResponse.handleEnd(new HeadersAdaptor(trailer.trailingHeaders()));
currentResponse.handleEnd(pausedChunk, new HeadersAdaptor(trailer.trailingHeaders()));

// We don't signal response end for a 100-continue response as a real response will follow
// Also we keep the connection open for an HTTP CONNECT
Expand Down
26 changes: 17 additions & 9 deletions src/main/java/io/vertx/core/http/impl/FileStreamChannel.java
Expand Up @@ -58,12 +58,14 @@ class FileStreamChannel extends AbstractChannel {
private final ChannelConfig config = new DefaultChannelConfig(this);
private boolean active;
private boolean closed;
private final Http2ServerResponseImpl response;
private final VertxHttp2Stream stream;
private final Handler<Void> endHandler;

FileStreamChannel(
Context resultCtx,
Handler<AsyncResult<Void>> resultHandler,
Http2ServerResponseImpl response,
VertxHttp2Stream stream,
Handler<Void> endHandler,
long length) {
super(null, Id.INSTANCE);

Expand Down Expand Up @@ -95,14 +97,15 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
}
});

response.drainHandler(v -> {
flush();
});

this.length = length;
this.response = response;
this.stream = stream;
this.endHandler = endHandler;
}

final Handler<Void> drainHandler = v -> {
flush();
};

@Override
protected void doRegister() throws Exception {
active = true;
Expand Down Expand Up @@ -151,10 +154,15 @@ protected void doBeginRead() throws Exception {
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
// To do : try to gather the chunks in a single write operation ?
ByteBuf chunk;
while (!response.writeQueueFull() && (chunk = (ByteBuf) in.current()) != null && length > 0) {
while (!stream.isNotWritable() && (chunk = (ByteBuf) in.current()) != null && length > 0) {
length -= chunk.readableBytes();
response.write(chunk.retain(), length == 0);
boolean end = length == 0;
stream.writeData(chunk.retain(), end);
stream.handlerContext.flush();
in.remove();
if (end) {
endHandler.handle(null);
}
}
}

Expand Down
100 changes: 25 additions & 75 deletions src/main/java/io/vertx/core/http/impl/Http2ServerRequestImpl.java
Expand Up @@ -27,8 +27,8 @@
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream;
import io.vertx.codegen.annotations.Nullable;
Expand All @@ -44,6 +44,7 @@
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.http.StreamResetException;
import io.vertx.core.impl.ContextImpl;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
Expand All @@ -54,22 +55,17 @@
import javax.security.cert.X509Certificate;
import java.net.URISyntaxException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;

/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public class Http2ServerRequestImpl extends VertxHttp2Stream implements HttpServerRequest {

private static final Logger log = LoggerFactory.getLogger(HttpServerRequestImpl.class);
private static final Object END = new Object(); // Marker

private final Vertx vertx;
private final VertxHttp2ServerHandler connection;
private final String serverOrigin;
private final ChannelHandlerContext ctx;
private final Http2Connection conn;
private final Http2Stream stream;
private final Http2ServerResponseImpl response;
private final Http2Headers headers;
private MultiMap headersMap;
Expand All @@ -83,9 +79,7 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream implements HttpServ

private Handler<Buffer> dataHandler;
private Handler<Void> endHandler;
private boolean paused;
private boolean ended;
private ArrayDeque<Object> pending = new ArrayDeque<>(8);

private Handler<HttpServerFileUpload> uploadHandler;
private HttpPostRequestDecoder decoder;
Expand All @@ -94,58 +88,28 @@ public class Http2ServerRequestImpl extends VertxHttp2Stream implements HttpServ

public Http2ServerRequestImpl(
Vertx vertx,
ContextImpl context,
VertxHttp2ServerHandler connection,
String serverOrigin,
Http2Connection conn,
Http2Stream stream,
ChannelHandlerContext ctx,
ChannelHandlerContext handlerContext,
Http2ConnectionEncoder encoder,
Http2ConnectionDecoder decoder,
Http2Headers headers,
String contentEncoding) {
super(vertx, context, handlerContext, encoder, decoder, stream);

this.vertx = vertx;
this.connection = connection;
this.serverOrigin = serverOrigin;
this.conn = conn;
this.stream = stream;
this.headers = headers;
this.ctx = ctx;
this.response = new Http2ServerResponseImpl((VertxInternal) vertx, ctx, connection, encoder, stream, false, contentEncoding);
this.response = new Http2ServerResponseImpl(this, (VertxInternal) vertx, handlerContext, connection, encoder, stream, false, contentEncoding);
}

void end() {
if (paused || pending.size() > 0) {
pending.add(END);
} else {
callEnd();
}
}

void handleData(Buffer data) {
if (!paused) {
if (pending.isEmpty()) {
callHandler(data);
consume(data.length());
} else {
pending.add(data);
checkNextTick(null);
}
} else {
pending.add(data);
}
}

void handleReset(long code) {
ended = true;
paused = false;
pending.clear();
if (exceptionHandler != null) {
exceptionHandler.handle(new StreamResetException(code));
}
response.handleReset(code);
if (endHandler != null) {
endHandler.handle(null);
}
@Override
void handleInterestedOpsChanged() {
response.writabilityChanged();
}

@Override
Expand All @@ -167,7 +131,7 @@ void handleClose() {
response.handleClose();
}

private void callHandler(Buffer data) {
void callHandler(Buffer data) {
if (decoder != null) {
try {
decoder.offer(new DefaultHttpContent(data.getByteBuf()));
Expand All @@ -180,7 +144,7 @@ private void callHandler(Buffer data) {
}
}

private void callEnd() {
void callEnd() {
ended = true;
if (decoder != null) {
try {
Expand Down Expand Up @@ -210,30 +174,15 @@ private void callEnd() {
}
}

private void consume(int numBytes) {
try {
boolean windowUpdateSent = conn.local().flowController().consumeBytes(stream, numBytes);
if (windowUpdateSent) {
ctx.flush();
}
} catch (Http2Exception e) {
e.printStackTrace();
@Override
void callReset(long errorCode) {
ended = true;
if (exceptionHandler != null) {
exceptionHandler.handle(new StreamResetException(errorCode));
}
}

private void checkNextTick(Void v) {
if (!paused) {
Object msg = pending.poll();
if (msg instanceof Buffer) {
Buffer buf = (Buffer) msg;
consume(buf.length());
callHandler(buf);
if (pending.size() > 0) {
vertx.runOnContext(this::checkNextTick);
}
} if (msg == END) {
callEnd();
}
response.callReset(errorCode);
if (endHandler != null) {
endHandler.handle(null);
}
}

Expand All @@ -258,14 +207,13 @@ public HttpServerRequest handler(Handler<Buffer> handler) {

@Override
public HttpServerRequest pause() {
paused = true;
doPause();
return this;
}

@Override
public HttpServerRequest resume() {
paused = false;
checkNextTick(null);
doResume();
return this;
}

Expand Down Expand Up @@ -409,7 +357,9 @@ public String absoluteURI() {

@Override
public NetSocket netSocket() {
throw new UnsupportedOperationException();
checkEnded();
response.toNetSocket();
return connection.toNetSocket(this);
}

@Override
Expand Down

0 comments on commit 71f09e9

Please sign in to comment.