Skip to content

Commit

Permalink
Respect promises on pipelined responses
Browse files Browse the repository at this point in the history
When pipelined responses are sent to the pipeline handler for writing,
they are not necessarily written immediately. They must be held in a
priority queue until all responses preceding the given response are
written. This means that when write is invoked on the handler, the
promise that is attached to the write invocation will not necessarily be
the promise associated with the responses that are written while the
queue is drained. To address this, the promise associated with a
pipelined response must be held with the response and then used when the
channel context is actually written to. This was introduced when
ensuring that the releasing promise is always chained through on write
calls lest the releasing promise never be invoked. This leads to many
failing test cases, so no new test cases are needed here.

Relates #23317
  • Loading branch information
jasontedor committed Feb 23, 2017
1 parent 1b11772 commit 2a745d2
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 14 deletions.
Expand Up @@ -135,7 +135,7 @@ public void sendResponse(RestResponse response) {

final Object msg;
if (pipelinedRequest != null) {
msg = pipelinedRequest.createHttpResponse(resp);
msg = pipelinedRequest.createHttpResponse(resp, promise);
} else {
msg = resp;
}
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.http.netty4.pipelining;

import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCounted;
Expand All @@ -32,7 +33,6 @@ public class HttpPipelinedRequest implements ReferenceCounted {
private final LastHttpContent last;
private final int sequence;


public HttpPipelinedRequest(final LastHttpContent last, final int sequence) {
this.last = last;
this.sequence = sequence;
Expand All @@ -42,8 +42,8 @@ public LastHttpContent last() {
return last;
}

public HttpPipelinedResponse createHttpResponse(final FullHttpResponse response) {
return new HttpPipelinedResponse(response, sequence);
public HttpPipelinedResponse createHttpResponse(final FullHttpResponse response, final ChannelPromise promise) {
return new HttpPipelinedResponse(response, promise, sequence);
}

@Override
Expand Down
Expand Up @@ -19,23 +19,30 @@
* under the License.
*/

import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.util.ReferenceCounted;

class HttpPipelinedResponse implements Comparable<HttpPipelinedResponse>, ReferenceCounted {

private final FullHttpResponse response;
private final ChannelPromise promise;
private final int sequence;

HttpPipelinedResponse(FullHttpResponse response, int sequence) {
HttpPipelinedResponse(FullHttpResponse response, ChannelPromise promise, int sequence) {
this.response = response;
this.promise = promise;
this.sequence = sequence;
}

public FullHttpResponse response() {
return response;
}

public ChannelPromise promise() {
return promise;
}

public int sequence() {
return sequence;
}
Expand Down
Expand Up @@ -71,23 +71,37 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throw
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception {
if (msg instanceof HttpPipelinedResponse) {
final HttpPipelinedResponse current = (HttpPipelinedResponse) msg;
/*
* We attach the promise to the response. When we invoke a write on the channel with the response, we must ensure that we invoke
* the write methods that accept the same promise that we have attached to the response otherwise as the response proceeds
* through the handler pipeline a different promise will be used until reaching this handler. Therefore, we assert here that the
* attached promise is identical to the provided promise as a safety mechanism that we are respecting this.
*/
assert current.promise() == promise;

boolean channelShouldClose = false;

synchronized (holdingQueue) {
if (holdingQueue.size() < maxEventsHeld) {
holdingQueue.add((HttpPipelinedResponse) msg);
holdingQueue.add(current);

while (!holdingQueue.isEmpty()) {
/*
* Since the response with the lowest sequence number is the top of the priority queue, we know if its sequence
* number does not match the current write sequence then we have not processed all preceding responses yet.
* number does not match the current write sequence number then we have not processed all preceding responses yet.
*/
final HttpPipelinedResponse response = holdingQueue.peek();
if (response.sequence() != writeSequence) {
final HttpPipelinedResponse top = holdingQueue.peek();
if (top.sequence() != writeSequence) {
break;
}
holdingQueue.remove();
ctx.write(response.response(), promise);
/*
* We must use the promise attached to the response; this is necessary since are going to hold a response until all
* responses that precede it in the pipeline are written first. Note that the promise from the method invocation is
* not ignored, it will already be attached to an existing response and consumed when that response is drained.
*/
ctx.write(top.response(), top.promise());
writeSequence++;
}
} else {
Expand All @@ -99,7 +113,7 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann
try {
Netty4Utils.closeChannels(Collections.singletonList(ctx.channel()));
} finally {
((HttpPipelinedResponse) msg).release();
current.release();
promise.setSuccess();
}
}
Expand Down
Expand Up @@ -24,6 +24,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
Expand Down Expand Up @@ -257,11 +258,14 @@ public void run() {
assert uri.matches("/\\d+");
}

final ChannelPromise promise = ctx.newPromise();
final Object msg;
if (pipelinedRequest != null) {
ctx.writeAndFlush(pipelinedRequest.createHttpResponse(httpResponse));
msg = pipelinedRequest.createHttpResponse(httpResponse, promise);
} else {
ctx.writeAndFlush(httpResponse);
msg = httpResponse;
}
ctx.writeAndFlush(msg, promise);
}

}
Expand Down
Expand Up @@ -23,6 +23,7 @@
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
Expand Down Expand Up @@ -246,7 +247,8 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpPipelined
executorService.submit(() -> {
try {
waitingLatch.await(1000, TimeUnit.SECONDS);
ctx.write(pipelinedRequest.createHttpResponse(httpResponse), ctx.newPromise());
final ChannelPromise promise = ctx.newPromise();
ctx.write(pipelinedRequest.createHttpResponse(httpResponse, promise), promise);
finishingLatch.countDown();
} catch (InterruptedException e) {
fail(e.toString());
Expand Down

0 comments on commit 2a745d2

Please sign in to comment.