Skip to content

Commit

Permalink
Add promise assertion
Browse files Browse the repository at this point in the history
  • Loading branch information
jasontedor committed Feb 23, 2017
1 parent 30cdefc commit 072c81d
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
Expand Up @@ -69,19 +69,22 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throw
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception {
if (msg instanceof HttpPipelinedResponse) {
final HttpPipelinedResponse current = (HttpPipelinedResponse) msg;
assert current.promise() == promise;

boolean channelShouldClose = false;

synchronized (holdingQueue) {
if (holdingQueue.size() < maxEventsHeld) {
holdingQueue.add((HttpPipelinedResponse) msg);
holdingQueue.add(current);

while (!holdingQueue.isEmpty()) {
/*
* Since the response with the lowest sequence number is the top of the priority queue, we know if its sequence
* number does not match the current write sequence number then we have not processed all preceding responses yet.
*/
final HttpPipelinedResponse response = holdingQueue.peek();
if (response.sequence() != writeSequence) {
final HttpPipelinedResponse top = holdingQueue.peek();
if (top.sequence() != writeSequence) {
break;
}
holdingQueue.remove();
Expand All @@ -90,7 +93,7 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann
* responses that precede it in the pipeline are written first. Note that the promise from the method invocation is
* not ignored, it will already be attached to an existing response and consumed when that response is drained.
*/
ctx.write(response.response(), response.promise());
ctx.write(top.response(), top.promise());
writeSequence++;
}
} else {
Expand Down
Expand Up @@ -24,6 +24,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
Expand Down Expand Up @@ -255,11 +256,14 @@ public void run() {
assert uri.matches("/\\d+");
}

final ChannelPromise promise = ctx.newPromise();
final Object msg;
if (pipelinedRequest != null) {
ctx.writeAndFlush(pipelinedRequest.createHttpResponse(httpResponse, ctx.newPromise()));
msg = pipelinedRequest.createHttpResponse(httpResponse, promise);
} else {
ctx.writeAndFlush(httpResponse);
msg = httpResponse;
}
ctx.writeAndFlush(msg, promise);
}

}
Expand Down

0 comments on commit 072c81d

Please sign in to comment.