From 3ef17b389ccb0d7694a166756b003f1970c40df2 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 27 Jun 2018 09:34:34 +0200 Subject: [PATCH] [FLINK-9677][rest] Cleanup encoder after request has been processed --- .../apache/flink/runtime/rest/FileUploadHandler.java | 12 +++++++++++- .../org/apache/flink/runtime/rest/RestClient.java | 8 ++++---- .../flink/runtime/rest/MultipartUploadResource.java | 5 ++++- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java index 6fdd35045ad6c..aa87cc567a9c0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java @@ -79,6 +79,7 @@ public class FileUploadHandler extends SimpleChannelInboundHandler { private HttpRequest currentHttpRequest; private byte[] currentJsonPayload; private Path currentUploadDir; + private boolean currentRequestFailed = false; public FileUploadHandler(final Path uploadDir) { super(false); @@ -90,6 +91,7 @@ public FileUploadHandler(final Path uploadDir) { protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws Exception { try { if (msg instanceof HttpRequest) { + currentRequestFailed = false; final HttpRequest httpRequest = (HttpRequest) msg; LOG.trace("Received request. URL:{} Method:{}", httpRequest.getUri(), httpRequest.getMethod()); if (httpRequest.getMethod().equals(HttpMethod.POST)) { @@ -145,6 +147,8 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms } reset(); } + } else if (currentRequestFailed) { + LOG.trace("Swallowing content for failed request. {}", msg); } else { ctx.fireChannelRead(msg); } @@ -154,6 +158,7 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms } private void handleError(ChannelHandlerContext ctx, String errorMessage, HttpResponseStatus responseStatus, @Nullable Throwable e) { + currentRequestFailed = true; HttpRequest tmpRequest = currentHttpRequest; deleteUploadedFiles(); reset(); @@ -180,7 +185,12 @@ private void deleteUploadedFiles() { private void reset() { // destroy() can fail because some data is stored multiple times in the decoder causing an IllegalReferenceCountException // see https://github.com/netty/netty/issues/7814 - currentHttpPostRequestDecoder.getBodyHttpDatas().clear(); + try { + currentHttpPostRequestDecoder.getBodyHttpDatas().clear(); + } catch (HttpPostRequestDecoder.NotEnoughDataDecoderException ned) { + // this method always fails if not all chunks were offered to the decoder yet + LOG.debug("Error while resetting handler.", ned); + } currentHttpPostRequestDecoder.destroy(); currentHttpPostRequestDecoder = null; currentHttpRequest = null; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index ee0201c262012..12b2864061b29 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -349,14 +349,14 @@ private static final class MultipartRequest implements Request { @Override public void writeTo(Channel channel) { - channel.writeAndFlush(httpRequest); + ChannelFuture future = channel.writeAndFlush(httpRequest); // this should never be false as we explicitly set the encoder to use multipart messages if (bodyRequestEncoder.isChunked()) { - channel.writeAndFlush(bodyRequestEncoder); + future = channel.writeAndFlush(bodyRequestEncoder); } - // release data and remove temporary files if they were created - bodyRequestEncoder.cleanFiles(); + // release data and remove temporary files if they were created, once the writing is complete + future.addListener((ignored) -> bodyRequestEncoder.cleanFiles()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java index c03b85d6101de..1311b80773782 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java @@ -52,6 +52,7 @@ import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; import java.net.InetSocketAddress; import java.nio.file.Files; import java.nio.file.Path; @@ -113,7 +114,9 @@ public void before() throws Exception { CompletableFuture.completedFuture(mockRestfulGateway); file1 = temporaryFolder.newFile(); - Files.write(file1.toPath(), "hello".getBytes(ConfigConstants.DEFAULT_CHARSET)); + try (RandomAccessFile rw = new RandomAccessFile(file1, "rw")) { + rw.setLength(1024 * 1024 * 64); + } file2 = temporaryFolder.newFile(); Files.write(file2.toPath(), "world".getBytes(ConfigConstants.DEFAULT_CHARSET));