From f954017eb4d81acc1283bae464d7247bda38d904 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Fri, 27 Aug 2021 09:57:50 +0200 Subject: [PATCH] [FLINK-24020][web] Aggregate HTTP requests before custom netty handers are getting the data --- .../webmonitor/utils/WebFrontendBootstrap.java | 13 +++++++++++-- .../flink/runtime/rest/RestServerEndpoint.java | 11 ++++++----- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java index 543a48bc992267..c701f16240e926 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.io.network.netty.InboundChannelHandlerFactory; import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory; +import org.apache.flink.runtime.rest.FlinkHttpObjectAggregator; import org.apache.flink.runtime.rest.handler.router.Router; import org.apache.flink.runtime.rest.handler.router.RouterHandler; import org.apache.flink.runtime.webmonitor.HttpRequestHandler; @@ -58,6 +59,8 @@ import java.util.Optional; import java.util.ServiceLoader; +import static org.apache.flink.configuration.RestOptions.SERVER_MAX_CONTENT_LENGTH; + /** This classes encapsulates the boot-strapping of netty for the web-frontend. */ public class WebFrontendBootstrap { private final Router router; @@ -66,6 +69,7 @@ public class WebFrontendBootstrap { private final ServerBootstrap bootstrap; private final Channel serverChannel; private final String restAddress; + private final int maxContentLength; private final Map responseHeaders; @VisibleForTesting List inboundChannelHandlerFactories; @@ -82,6 +86,7 @@ public WebFrontendBootstrap( this.router = Preconditions.checkNotNull(router); this.log = Preconditions.checkNotNull(log); this.uploadDir = directory; + this.maxContentLength = config.get(SERVER_MAX_CONTENT_LENGTH); this.responseHeaders = new HashMap<>(); inboundChannelHandlerFactories = new ArrayList<>(); ServiceLoader loader = @@ -119,7 +124,12 @@ protected void initChannel(SocketChannel ch) throws ConfigurationException { serverSSLFactory.createNettySSLHandler(ch.alloc())); } - ch.pipeline().addLast(new HttpServerCodec()); + ch.pipeline() + .addLast(new HttpServerCodec()) + .addLast(new HttpRequestHandler(uploadDir)) + .addLast( + new FlinkHttpObjectAggregator( + maxContentLength, responseHeaders)); for (InboundChannelHandlerFactory factory : inboundChannelHandlerFactories) { @@ -132,7 +142,6 @@ protected void initChannel(SocketChannel ch) throws ConfigurationException { ch.pipeline() .addLast(new ChunkedWriteHandler()) - .addLast(new HttpRequestHandler(uploadDir)) .addLast(handler.getName(), handler) .addLast(new PipelineErrorHandler(WebFrontendBootstrap.this.log)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java index f11450eb8e0c6c..cb07a3223a089f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java @@ -207,7 +207,12 @@ protected void initChannel(SocketChannel ch) throws ConfigurationException { sslHandlerFactory)); } - ch.pipeline().addLast(new HttpServerCodec()); + ch.pipeline() + .addLast(new HttpServerCodec()) + .addLast(new FileUploadHandler(uploadDir)) + .addLast( + new FlinkHttpObjectAggregator( + maxContentLength, responseHeaders)); for (InboundChannelHandlerFactory factory : inboundChannelHandlerFactories) { @@ -219,10 +224,6 @@ protected void initChannel(SocketChannel ch) throws ConfigurationException { } ch.pipeline() - .addLast(new FileUploadHandler(uploadDir)) - .addLast( - new FlinkHttpObjectAggregator( - maxContentLength, responseHeaders)) .addLast(new ChunkedWriteHandler()) .addLast(handler.getName(), handler) .addLast(new PipelineErrorHandler(log, responseHeaders));