Skip to content

Commit

Permalink
Encapsulate usage of encoder/decoder in VertxHttp2Stream and connecti…
Browse files Browse the repository at this point in the history
…on classes
  • Loading branch information
vietj committed Mar 23, 2016
1 parent 0f4c550 commit b05ba86
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 94 deletions.
26 changes: 11 additions & 15 deletions src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
Expand Up @@ -24,7 +24,6 @@
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Flags;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream;
import io.vertx.core.Context;
Expand Down Expand Up @@ -156,7 +155,7 @@ public HttpVersion version() {
}

@Override
void callEnd() {
void handleEnd() {
if (conn.metrics.isEnabled()) {
if (request.exceptionOccurred) {
conn.metrics.requestReset(request.metric());
Expand All @@ -170,12 +169,12 @@ void callEnd() {
}

@Override
void callHandler(Buffer buf) {
void handleData(Buffer buf) {
response.handleChunk(buf);
}

@Override
void callReset(long errorCode) {
void handleReset(long errorCode) {
if (!responseEnded) {
responseEnded = true;
if (conn.metrics.isEnabled()) {
Expand Down Expand Up @@ -220,8 +219,7 @@ void handleHeaders(Http2Headers headers, boolean end) {
} catch (Exception e) {
e.printStackTrace();
handleException(e);
encoder.writeRstStream(handlerContext, stream.id(), 0x01 /* PROTOCOL_ERROR */, handlerContext.newPromise());
channel.flush();
writeReset(0x01 /* PROTOCOL_ERROR */);
return;
}
response = new HttpClientResponseImpl(
Expand All @@ -234,7 +232,7 @@ void handleHeaders(Http2Headers headers, boolean end) {
);
request.handleResponse(response);
if (end) {
handleEnd();
onEnd();
}
} else if (end) {
response.handleEnd(null, new Http2HeadersAdaptor(headers));
Expand Down Expand Up @@ -293,26 +291,25 @@ public void writeHeadWithContent(HttpMethod method, String uri, MultiMap headers
if (conn.metrics.isEnabled()) {
request.metric(conn.metrics.requestBegin(conn.metric, conn.localAddress(), conn.remoteAddress(), request));
}
encoder.writeHeaders(handlerContext, stream.id(), h, 0, end && content == null, handlerContext.newPromise());
writeHeaders(h, end && content == null);
if (content != null) {
writeBuffer(content, end);
} else {
channel.flush();
handlerContext.flush();
}
}

@Override
public void writeBuffer(ByteBuf buf, boolean end) {
writeData(buf, end);
if (end) {
channel.flush();
handlerContext.flush();
}
}

@Override
public void writeFrame(int type, int flags, ByteBuf payload) {
encoder.writeFrame(handlerContext, (byte) type, stream.id(), new Http2Flags((short) flags), payload, handlerContext.newPromise());
handlerContext.flush();
super.writeFrame(type, flags, payload);
}

@Override
Expand All @@ -326,7 +323,7 @@ public void doSetWriteQueueMaxSize(int size) {

@Override
public boolean isNotWritable() {
return !conn.handler.connection().remote().flowController().isWritable(stream);
return super.isNotWritable();
}

@Override
Expand All @@ -341,8 +338,7 @@ public void endRequest() {

@Override
public void reset(long code) {
encoder.writeRstStream(handlerContext, stream.id(), code, handlerContext.newPromise());
channel.flush();
writeReset(code);
}

@Override
Expand Down
Expand Up @@ -74,7 +74,7 @@ public Http2ConnectionBase(Channel channel, ContextImpl context, VertxHttp2Conne
handler.encoder().flowController().listener(s -> {
VertxHttp2Stream stream = streams.get(s.id());
if (stream != null) {
context.executeFromIO(stream::handleWritabilityChanged);
context.executeFromIO(stream::onWritabilityChanged);
}
});

Expand Down Expand Up @@ -261,7 +261,7 @@ public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorC
VertxHttp2Stream req = streams.get(streamId);
if (req != null) {
context.executeFromIO(() -> {
req.handleReset(errorCode);
req.onResetRead(errorCode);
});
}
}
Expand All @@ -272,10 +272,10 @@ public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int
if (req != null) {
Buffer buff = Buffer.buffer(data.copy());
context.executeFromIO(() -> {
req.handleData(buff);
req.onDataRead(buff);
});
if (endOfStream) {
context.executeFromIO(req::handleEnd);
context.executeFromIO(req::onEnd);
}
}
return padding;
Expand Down
Expand Up @@ -21,7 +21,6 @@
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings;
Expand Down Expand Up @@ -131,7 +130,7 @@ public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
// Http server request trailer - not implemented yet (in api)
}
if (endOfStream) {
context.executeFromIO(stream::handleEnd);
context.executeFromIO(stream::onEnd);
}
}

Expand All @@ -157,12 +156,12 @@ public Push(Http2Stream stream, String contentEncoding, Handler<AsyncResult<Http
}

@Override
void callEnd() {
void handleEnd() {
throw new UnsupportedOperationException();
}

@Override
void callHandler(Buffer buf) {
void handleData(Buffer buf) {
throw new UnsupportedOperationException();
}

Expand All @@ -174,7 +173,7 @@ void handleInterestedOpsChanged() {
}

@Override
void callReset(long errorCode) {
void handleReset(long errorCode) {
if (response != null) {
response.callReset(errorCode);
} else {
Expand Down
Expand Up @@ -128,7 +128,7 @@ void handleUnknownFrame(int type, int flags, Buffer buff) {
}
}

void callHandler(Buffer data) {
void handleData(Buffer data) {
bytesRead += data.length();
if (postRequestDecoder != null) {
try {
Expand All @@ -142,7 +142,7 @@ void callHandler(Buffer data) {
}
}

void callEnd() {
void handleEnd() {
ended = true;
conn.reportBytesRead(bytesRead);
if (postRequestDecoder != null) {
Expand Down Expand Up @@ -174,7 +174,7 @@ void callEnd() {
}

@Override
void callReset(long errorCode) {
void handleReset(long errorCode) {
ended = true;
if (exceptionHandler != null) {
exceptionHandler.handle(new StreamResetException(errorCode));
Expand Down
Expand Up @@ -21,7 +21,6 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Flags;
import io.netty.handler.codec.http2.Http2Headers;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
Expand Down Expand Up @@ -265,7 +264,7 @@ public HttpServerResponse closeHandler(@Nullable Handler<Void> handler) {
@Override
public HttpServerResponse writeContinue() {
checkHeadWritten();
stream.encoder.writeHeaders(ctx, stream.id(), new DefaultHttp2Headers().status("100"), 0, false, ctx.newPromise());
stream.writeHeaders(new DefaultHttp2Headers().status("100"), false);
ctx.flush();
return this;
}
Expand Down Expand Up @@ -337,7 +336,7 @@ private boolean checkSendHeaders(boolean end) {
}
headWritten = true;
headers.status(Integer.toString(statusCode));
stream.encoder.writeHeaders(ctx, stream.id(), headers, 0, end, ctx.newPromise());
stream.writeHeaders(headers, end);
if (end) {
ctx.flush();
}
Expand All @@ -360,7 +359,7 @@ void write(ByteBuf chunk, boolean end) {
bytesWritten += len;
}
if (end && trailers != null) {
stream.encoder.writeHeaders(ctx, stream.id(), trailers, 0, true, ctx.newPromise());
stream.writeHeaders(trailers, true);
}
if (end && bodyEndHandler != null) {
bodyEndHandler.handle(null);
Expand All @@ -371,7 +370,7 @@ void write(ByteBuf chunk, boolean end) {
public HttpServerResponse writeFrame(int type, int flags, Buffer payload) {
checkEnded();
checkSendHeaders(false);
stream.encoder.writeFrame(ctx, (byte) type, stream.id(), new Http2Flags((short) flags), payload.getByteBuf(), ctx.newPromise());
stream.writeFrame(type, flags, payload.getByteBuf());
ctx.flush();
return this;
}
Expand Down Expand Up @@ -535,7 +534,7 @@ public int streamId() {
public void reset(long code) {
checkEnded();
handleEnded(true);
stream.encoder.writeRstStream(ctx, stream.id(), code, ctx.newPromise());
stream.writeReset(code);
ctx.flush();
}

Expand Down
Expand Up @@ -56,7 +56,7 @@ public VertxHttp2NetSocket(C connection, Http2Stream stream) {
// Stream impl

@Override
void callEnd() {
void handleEnd() {
try {
if (endHandler != null) {
// Give opportunity to send a last chunk
Expand All @@ -68,14 +68,14 @@ void callEnd() {
}

@Override
void callHandler(Buffer buf) {
void handleData(Buffer buf) {
if (dataHandler != null) {
dataHandler.handle(buf);
}
}

@Override
void callReset(long errorCode) {
void handleReset(long errorCode) {
handleException(new StreamResetException(errorCode));
}

Expand Down Expand Up @@ -216,7 +216,7 @@ public NetSocket sendFile(String filename, long offset, long length, Handler<Asy
}
}, this, offset, contentLength);
drainHandler(fileChannel.drainHandler);
channel.eventLoop().register(fileChannel);
handlerContext.channel().eventLoop().register(fileChannel);
fileChannel.pipeline().fireUserEventTriggered(raf);

return this;
Expand Down

0 comments on commit b05ba86

Please sign in to comment.