Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure response filters are not left waiting for buffered body #1353

Merged
merged 2 commits into from Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -34,6 +34,7 @@
import com.netflix.zuul.message.http.HttpRequestInfo;
import com.netflix.zuul.message.http.HttpRequestMessage;
import com.netflix.zuul.message.http.HttpResponseMessage;
import com.netflix.zuul.netty.SpectatorUtils;
import com.netflix.zuul.netty.server.MethodBinding;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpContent;
Expand Down Expand Up @@ -115,8 +116,8 @@ protected final AtomicInteger getRunningFilterIndex(I zuulMesg) {
return (AtomicInteger) Preconditions.checkNotNull(ctx.get(RUNNING_FILTER_IDX_SESSION_CTX_KEY), "runningFilterIndex");
}

protected final boolean isFilterAwaitingBody(I zuulMesg) {
return zuulMesg.getContext().containsKey(AWAITING_BODY_FLAG_SESSION_CTX_KEY);
protected final boolean isFilterAwaitingBody(SessionContext context) {
return context.containsKey(AWAITING_BODY_FLAG_SESSION_CTX_KEY);
}

protected final void setFilterAwaitingBody(I zuulMesg, boolean flag) {
Expand All @@ -140,7 +141,15 @@ protected final void invokeNextStage(final O zuulMesg, final HttpContent chunk)
try (TaskCloseable ignored =
traceTask(this, s -> s.getClass().getSimpleName() + ".fireChannelReadChunk")) {
addPerfMarkTags(zuulMesg);
getChannelHandlerContext(zuulMesg).fireChannelRead(chunk);
ChannelHandlerContext channelHandlerContext = getChannelHandlerContext(zuulMesg);
if (!channelHandlerContext.channel().isActive()) {
zuulMesg.getContext().cancel();
zuulMesg.disposeBufferedBody();
SpectatorUtils.newCounter("zuul.filterChain.chunk.hanging",
zuulMesg.getClass().getSimpleName()).increment();
} else {
channelHandlerContext.fireChannelRead(chunk);
}
}
}
}
Expand All @@ -157,7 +166,16 @@ protected final void invokeNextStage(final O zuulMesg) {
try (TaskCloseable ignored =
traceTask(this, s -> s.getClass().getSimpleName() + ".fireChannelRead")) {
addPerfMarkTags(zuulMesg);
getChannelHandlerContext(zuulMesg).fireChannelRead(zuulMesg);
ChannelHandlerContext channelHandlerContext = getChannelHandlerContext(zuulMesg);
if (!channelHandlerContext.channel().isActive()) {
zuulMesg.getContext().cancel();
zuulMesg.disposeBufferedBody();
SpectatorUtils.newCounter("zuul.filterChain.message.hanging",
zuulMesg.getClass().getSimpleName()).increment();
}
else {
channelHandlerContext.fireChannelRead(zuulMesg);
}
}
}
}
Expand Down
Expand Up @@ -146,7 +146,7 @@ public void filter(final HttpRequestMessage zuulReq, final HttpContent chunk) {
chunk.release();
}

if (isFilterAwaitingBody(zuulReq) && zuulReq.hasCompleteBody() && !(endpoint instanceof ProxyEndpoint)) {
if (isFilterAwaitingBody(zuulReq.getContext()) && zuulReq.hasCompleteBody() && !(endpoint instanceof ProxyEndpoint)) {
//whole body has arrived, resume filter chain
newChunk.touch("Endpoint body complete, resume chain, ZuulMessage: " + zuulReq);
invokeNextStage(filter(endpoint, zuulReq));
Expand Down
Expand Up @@ -18,11 +18,11 @@

import static com.netflix.netty.common.HttpLifecycleChannelHandler.CompleteReason.SESSION_COMPLETE;
import static com.netflix.zuul.context.CommonContextKeys.NETTY_SERVER_CHANNEL_HANDLER_CONTEXT;
import static com.netflix.zuul.netty.server.ClientRequestReceiver.ATTR_ZUUL_RESP;
import static com.netflix.zuul.stats.status.ZuulStatusCategory.FAILURE_CLIENT_CANCELLED;
import static com.netflix.zuul.stats.status.ZuulStatusCategory.FAILURE_CLIENT_TIMEOUT;
import static com.netflix.zuul.stats.status.ZuulStatusCategory.FAILURE_LOCAL;
import static com.netflix.zuul.stats.status.ZuulStatusCategory.FAILURE_LOCAL_IDLE_TIMEOUT;

import com.google.common.base.Preconditions;
import com.netflix.netty.common.HttpLifecycleChannelHandler.CompleteEvent;
import com.netflix.netty.common.HttpRequestReadTimeoutEvent;
Expand All @@ -34,11 +34,13 @@
import com.netflix.zuul.message.http.HttpResponseMessage;
import com.netflix.zuul.message.http.HttpResponseMessageImpl;
import com.netflix.zuul.netty.RequestCancelledEvent;
import com.netflix.zuul.netty.SpectatorUtils;
import com.netflix.zuul.stats.status.StatusCategory;
import com.netflix.zuul.stats.status.StatusCategoryUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.unix.Errors;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
Expand Down Expand Up @@ -88,7 +90,7 @@ else if ((msg instanceof HttpContent)&&(zuulRequest != null)) {
public final void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof CompleteEvent) {
final CompleteEvent completeEvent = (CompleteEvent)evt;
fireEndpointFinish(completeEvent.getReason() != SESSION_COMPLETE);
fireEndpointFinish(completeEvent.getReason() != SESSION_COMPLETE, ctx);
}
else if (evt instanceof HttpRequestReadTimeoutEvent) {
sendResponse(FAILURE_CLIENT_TIMEOUT, 408, ctx);
Expand All @@ -101,7 +103,7 @@ else if (evt instanceof RequestCancelledEvent) {
zuulRequest.getContext().cancel();
StatusCategoryUtils.storeStatusCategoryIfNotAlreadyFailure(zuulRequest.getContext(), FAILURE_CLIENT_CANCELLED);
}
fireEndpointFinish(true);
fireEndpointFinish(true, ctx);
ctx.close();
}
super.userEventTriggered(ctx, evt);
Expand All @@ -121,15 +123,18 @@ private void sendResponse(final StatusCategory statusCategory, final int status,
headers.add("Content-Length", "0");
zuulResponse.finishBufferedBodyIfIncomplete();
responseFilterChain.filter(zuulResponse);
fireEndpointFinish(true);
fireEndpointFinish(true, ctx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One slight nuance here with the change to fireEndpointFinish, since we close out the body on L123 (zuulResponse.finishBufferedBodyIfIncomplete()) and fire into the response filter chain, I don't think the new finishResponseFilters function will have an issue as no filter should be waiting for a body - so will just be a no-op? Just double checking 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but in the case of that static response, it won't pop into the if block that checks if the filter chain is waiting for a buffered body.

}
}

protected HttpRequestMessage getZuulRequest() {
return zuulRequest;
}

protected void fireEndpointFinish(final boolean error) {
protected void fireEndpointFinish(final boolean error, final ChannelHandlerContext ctx) {
// make sure filter chain is not left hanging
finishResponseFilters(ctx);

final ZuulFilter endpoint = ZuulEndPointRunner.getEndpoint(zuulRequest);
if (endpoint instanceof ProxyEndpoint) {
final ProxyEndpoint edgeProxyEndpoint = (ProxyEndpoint) endpoint;
Expand All @@ -138,6 +143,19 @@ protected void fireEndpointFinish(final boolean error) {
zuulRequest = null;
}

private void finishResponseFilters(ChannelHandlerContext ctx) {
// check if there are any response filters awaiting a buffered body
if (zuulRequest != null && responseFilterChain.isFilterAwaitingBody(zuulRequest.getContext())) {
HttpResponseMessage zuulResponse = ctx.channel().attr(ATTR_ZUUL_RESP).get();
if (zuulResponse != null) {
// fire a last content into the filter chain to unblock any filters awaiting a buffered body
responseFilterChain.filter(zuulResponse, new DefaultLastHttpContent());
SpectatorUtils.newCounter("zuul.filterChain.bodyBuffer.hanging",
zuulRequest.getContext().getRouteVIP()).increment();
}
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOG.error("zuul filter chain handler caught exception. cause=" + String.valueOf(cause), cause);
Expand All @@ -147,7 +165,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
zuulCtx.setShouldSendErrorResponse(true);
sendResponse(FAILURE_LOCAL, 500, ctx);
} else {
fireEndpointFinish(true);
fireEndpointFinish(true, ctx);
ctx.close();
}
}
Expand Down
Expand Up @@ -131,7 +131,7 @@ public void filter(T inMesg, HttpContent chunk) {
chunk.touch("Filter runner buffering chunk, message: " + inMesg);
inMesg.bufferBodyContents(chunk);

boolean isAwaitingBody = isFilterAwaitingBody(inMesg);
boolean isAwaitingBody = isFilterAwaitingBody(inMesg.getContext());

// Record passport states for start and end of buffering bodies.
if (isAwaitingBody) {
Expand Down