Skip to content

Commit

Permalink
[FLINK-24020][web] Aggregate HTTP requests before custom netty hander…
Browse files Browse the repository at this point in the history
…s are getting the data
  • Loading branch information
gaborgsomogyi committed Aug 27, 2021
1 parent f2b0e43 commit f954017
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 7 deletions.
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.io.network.netty.InboundChannelHandlerFactory;
import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
import org.apache.flink.runtime.rest.FlinkHttpObjectAggregator;
import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.rest.handler.router.RouterHandler;
import org.apache.flink.runtime.webmonitor.HttpRequestHandler;
Expand Down Expand Up @@ -58,6 +59,8 @@
import java.util.Optional;
import java.util.ServiceLoader;

import static org.apache.flink.configuration.RestOptions.SERVER_MAX_CONTENT_LENGTH;

/** This classes encapsulates the boot-strapping of netty for the web-frontend. */
public class WebFrontendBootstrap {
private final Router router;
Expand All @@ -66,6 +69,7 @@ public class WebFrontendBootstrap {
private final ServerBootstrap bootstrap;
private final Channel serverChannel;
private final String restAddress;
private final int maxContentLength;
private final Map<String, String> responseHeaders;
@VisibleForTesting List<InboundChannelHandlerFactory> inboundChannelHandlerFactories;

Expand All @@ -82,6 +86,7 @@ public WebFrontendBootstrap(
this.router = Preconditions.checkNotNull(router);
this.log = Preconditions.checkNotNull(log);
this.uploadDir = directory;
this.maxContentLength = config.get(SERVER_MAX_CONTENT_LENGTH);
this.responseHeaders = new HashMap<>();
inboundChannelHandlerFactories = new ArrayList<>();
ServiceLoader<InboundChannelHandlerFactory> loader =
Expand Down Expand Up @@ -119,7 +124,12 @@ protected void initChannel(SocketChannel ch) throws ConfigurationException {
serverSSLFactory.createNettySSLHandler(ch.alloc()));
}

ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new HttpRequestHandler(uploadDir))
.addLast(
new FlinkHttpObjectAggregator(
maxContentLength, responseHeaders));

for (InboundChannelHandlerFactory factory :
inboundChannelHandlerFactories) {
Expand All @@ -132,7 +142,6 @@ protected void initChannel(SocketChannel ch) throws ConfigurationException {

ch.pipeline()
.addLast(new ChunkedWriteHandler())
.addLast(new HttpRequestHandler(uploadDir))
.addLast(handler.getName(), handler)
.addLast(new PipelineErrorHandler(WebFrontendBootstrap.this.log));
}
Expand Down
Expand Up @@ -207,7 +207,12 @@ protected void initChannel(SocketChannel ch) throws ConfigurationException {
sslHandlerFactory));
}

ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new FileUploadHandler(uploadDir))
.addLast(
new FlinkHttpObjectAggregator(
maxContentLength, responseHeaders));

for (InboundChannelHandlerFactory factory :
inboundChannelHandlerFactories) {
Expand All @@ -219,10 +224,6 @@ protected void initChannel(SocketChannel ch) throws ConfigurationException {
}

ch.pipeline()
.addLast(new FileUploadHandler(uploadDir))
.addLast(
new FlinkHttpObjectAggregator(
maxContentLength, responseHeaders))
.addLast(new ChunkedWriteHandler())
.addLast(handler.getName(), handler)
.addLast(new PipelineErrorHandler(log, responseHeaders));
Expand Down

0 comments on commit f954017

Please sign in to comment.