Skip to content

Commit

Permalink
Http2 server response write work
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Mar 13, 2016
1 parent 2dd4294 commit d902db9
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 48 deletions.
Expand Up @@ -94,15 +94,14 @@ public Http2ServerRequestImpl(
Http2Stream stream, Http2Stream stream,
ChannelHandlerContext ctx, ChannelHandlerContext ctx,
Http2ConnectionEncoder encoder, Http2ConnectionEncoder encoder,
int streamId,
Http2Headers headers) { Http2Headers headers) {
this.vertx = vertx; this.vertx = vertx;
this.serverOrigin = serverOrigin; this.serverOrigin = serverOrigin;
this.conn = conn; this.conn = conn;
this.stream = stream; this.stream = stream;
this.headers = headers; this.headers = headers;
this.ctx = ctx; this.ctx = ctx;
this.response = new Http2ServerResponseImpl(ctx, encoder, streamId); this.response = new Http2ServerResponseImpl(ctx, encoder, stream);
} }


void end() { void end() {
Expand Down Expand Up @@ -275,7 +274,7 @@ public String path() {
} }


@Override @Override
public HttpServerResponse response() { public Http2ServerResponseImpl response() {
return response; return response;
} }


Expand Down
146 changes: 105 additions & 41 deletions src/main/java/io/vertx/core/http/impl/Http2ServerResponseImpl.java
Expand Up @@ -17,11 +17,14 @@
package io.vertx.core.http.impl; package io.vertx.core.http.impl;


import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http2.DefaultHttp2Headers; import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2ConnectionEncoder; import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream;
import io.vertx.codegen.annotations.Nullable; import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult; import io.vertx.core.AsyncResult;
import io.vertx.core.Handler; import io.vertx.core.Handler;
Expand All @@ -38,64 +41,77 @@ public class Http2ServerResponseImpl implements HttpServerResponse {


private final ChannelHandlerContext ctx; private final ChannelHandlerContext ctx;
private final Http2ConnectionEncoder encoder; private final Http2ConnectionEncoder encoder;
private final int streamId; private final Http2Stream stream;
private Http2Headers headers = new DefaultHttp2Headers().status(OK.codeAsText()); private Http2Headers headers = new DefaultHttp2Headers().status(OK.codeAsText());
private Http2HeadersAdaptor headersMap; private Http2HeadersAdaptor headersMap;
private boolean chunked;
private boolean headWritten;
private int statusCode = 200;
private String statusMessage; // Not really used but we keep the message for the getStatusMessage()
private Handler<Void> drainHandler;


public Http2ServerResponseImpl(ChannelHandlerContext ctx, Http2ConnectionEncoder encoder, int streamId) { public Http2ServerResponseImpl(ChannelHandlerContext ctx, Http2ConnectionEncoder encoder, Http2Stream stream) {
this.ctx = ctx; this.ctx = ctx;
this.encoder = encoder; this.encoder = encoder;
this.streamId = streamId; this.stream = stream;
} }


@Override @Override
public HttpServerResponse exceptionHandler(Handler<Throwable> handler) { public HttpServerResponse exceptionHandler(Handler<Throwable> handler) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }


@Override
public HttpServerResponse write(Buffer data) {
throw new UnsupportedOperationException();
}

@Override
public HttpServerResponse setWriteQueueMaxSize(int maxSize) {
throw new UnsupportedOperationException();
}

@Override
public HttpServerResponse drainHandler(Handler<Void> handler) {
throw new UnsupportedOperationException();
}

@Override @Override
public int getStatusCode() { public int getStatusCode() {
throw new UnsupportedOperationException(); return statusCode;
} }


@Override @Override
public HttpServerResponse setStatusCode(int statusCode) { public HttpServerResponse setStatusCode(int statusCode) {
throw new UnsupportedOperationException(); if (statusCode < 0) {
throw new IllegalArgumentException("code: " + statusCode + " (expected: 0+)");
}
this.statusCode = statusCode;
headers.status("" + statusCode);
return this;
} }


@Override @Override
public String getStatusMessage() { public String getStatusMessage() {
throw new UnsupportedOperationException(); if (statusMessage == null) {
switch (statusCode / 100) {
case 1:
return "Informational";
case 2:
return "Success";
case 3:
return "Redirection";
case 4:
return "Client Error";
case 5:
return "Server Error";
default:
return "Unknown Status";
}
}
return statusMessage;
} }


@Override @Override
public HttpServerResponse setStatusMessage(String statusMessage) { public HttpServerResponse setStatusMessage(String statusMessage) {
throw new UnsupportedOperationException(); this.statusMessage = statusMessage;
return this;
} }


@Override @Override
public HttpServerResponse setChunked(boolean chunked) { public HttpServerResponse setChunked(boolean chunked) {
throw new UnsupportedOperationException(); this.chunked = true;
return this;
} }


@Override @Override
public boolean isChunked() { public boolean isChunked() {
throw new UnsupportedOperationException(); return chunked;
} }


@Override @Override
Expand Down Expand Up @@ -161,18 +177,28 @@ public HttpServerResponse closeHandler(@Nullable Handler<Void> handler) {
} }


@Override @Override
public HttpServerResponse write(String chunk, String enc) { public HttpServerResponse writeContinue() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }


@Override @Override
public HttpServerResponse write(String chunk) { public HttpServerResponse write(Buffer chunk) {
throw new UnsupportedOperationException(); ByteBuf buf = chunk.getByteBuf();
return write(buf);
} }


@Override @Override
public HttpServerResponse writeContinue() { public HttpServerResponse write(String chunk, String enc) {
throw new UnsupportedOperationException(); return write(Buffer.buffer(chunk, enc).getByteBuf());
}

@Override
public HttpServerResponse write(String chunk) {
return write(Buffer.buffer(chunk).getByteBuf());
}

private Http2ServerResponseImpl write(ByteBuf chunk) {
return write(chunk, false);
} }


@Override @Override
Expand All @@ -182,29 +208,71 @@ public void end(String chunk) {


@Override @Override
public void end(String chunk, String enc) { public void end(String chunk, String enc) {
throw new UnsupportedOperationException(); end(Buffer.buffer(chunk, enc));
} }


@Override @Override
public void end(Buffer chunk) { public void end(Buffer chunk) {
ByteBuf buf = chunk.getByteBuf(); end(chunk.getByteBuf());
end0(buf);
} }


@Override @Override
public void end() { public void end() {
throw new UnsupportedOperationException(); end(Unpooled.EMPTY_BUFFER);
} }


private void end0(ByteBuf data) { private void end(ByteBuf chunk) {
encoder.writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise()); if (!chunked && !headers.contains(HttpHeaderNames.CONTENT_LENGTH)) {
encoder.writeData(ctx, streamId, data, 0, true, ctx.newPromise()); headers().set(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(chunk.readableBytes()));
}
write(chunk, true);
}

private void checkSendHeaders() {
if (!headWritten) {
headWritten = true;
if (!headers.contains(HttpHeaderNames.CONTENT_LENGTH) && !chunked) {
throw new IllegalStateException("You must set the Content-Length header to be the total size of the message "
+ "body BEFORE sending any data if you are not sending an HTTP chunked response.");
}
encoder.writeHeaders(ctx, stream.id(), headers, 0, false, ctx.newPromise());
headWritten = true;
}
}

private Http2ServerResponseImpl write(ByteBuf chunk, boolean last) {
checkSendHeaders();
encoder.writeData(ctx, stream.id(), chunk, 0, last, ctx.newPromise());
try { try {
encoder.flowController().writePendingBytes(); encoder.flowController().writePendingBytes();
} catch (Http2Exception e) { } catch (Http2Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
ctx.flush(); ctx.flush();
return this;
}

@Override
public boolean writeQueueFull() {
return encoder.flowController().isWritable(stream);
}

@Override
public HttpServerResponse setWriteQueueMaxSize(int maxSize) {
// It does not seem to be possible to configure this at the moment
return this;
}

@Override
public HttpServerResponse drainHandler(Handler<Void> handler) {
drainHandler = handler;
return this;
}

void writabilityChanged() {
if (!writeQueueFull() && drainHandler != null) {
drainHandler.handle(null);
}
} }


@Override @Override
Expand Down Expand Up @@ -234,7 +302,7 @@ public boolean closed() {


@Override @Override
public boolean headWritten() { public boolean headWritten() {
throw new UnsupportedOperationException(); return headWritten;
} }


@Override @Override
Expand All @@ -252,8 +320,4 @@ public long bytesWritten() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }


@Override
public boolean writeQueueFull() {
throw new UnsupportedOperationException();
}
} }
9 changes: 7 additions & 2 deletions src/main/java/io/vertx/core/http/impl/VertxHttp2Handler.java
Expand Up @@ -30,7 +30,6 @@
import io.netty.handler.codec.http2.Http2Settings; import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2Stream; import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.AsciiString; import io.netty.util.AsciiString;
import io.netty.util.CharsetUtil;
import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap; import io.netty.util.collection.IntObjectMap;
import io.vertx.core.Handler; import io.vertx.core.Handler;
Expand All @@ -56,6 +55,12 @@ public class VertxHttp2Handler extends Http2ConnectionHandler implements Http2Fr
Http2Settings initialSettings, Handler<HttpServerRequest> handler) { Http2Settings initialSettings, Handler<HttpServerRequest> handler) {
super(decoder, encoder, initialSettings); super(decoder, encoder, initialSettings);


encoder.flowController().listener(stream -> {
Http2ServerRequestImpl req = requestMap.get(stream.id());
Http2ServerResponseImpl resp = req.response();
resp.writabilityChanged();
});

this.vertx = vertx; this.vertx = vertx;
this.serverOrigin = serverOrigin; this.serverOrigin = serverOrigin;
this.handler = handler; this.handler = handler;
Expand Down Expand Up @@ -89,7 +94,7 @@ public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int padding, boolean endOfStream) { Http2Headers headers, int padding, boolean endOfStream) {
Http2Connection conn = connection(); Http2Connection conn = connection();
Http2Stream stream = conn.stream(streamId); Http2Stream stream = conn.stream(streamId);
Http2ServerRequestImpl req = new Http2ServerRequestImpl(vertx, serverOrigin, conn, stream, ctx, encoder(), streamId, headers); Http2ServerRequestImpl req = new Http2ServerRequestImpl(vertx, serverOrigin, conn, stream, ctx, encoder(), headers);
requestMap.put(streamId, req); requestMap.put(streamId, req);
handler.handle(req); handler.handle(req);
if (endOfStream) { if (endOfStream) {
Expand Down

0 comments on commit d902db9

Please sign in to comment.