From 2b392b2b01e38b1c476554ba0263f5ef565ba953 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 5 Apr 2017 23:37:06 +0200 Subject: [PATCH] [FLINK-5629] [runtime-web] Close RAF in FileServerHandlers when exception occurs --- .../files/StaticFileServerHandler.java | 61 ++++++++++-------- .../HistoryServerStaticFileServerHandler.java | 63 ++++++++++--------- 2 files changed, 69 insertions(+), 55 deletions(-) diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java index b7874c904c5e1..406baf0a71a5b 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java @@ -296,37 +296,44 @@ private void respondAsLeader(ChannelHandlerContext ctx, HttpRequest request, Str sendError(ctx, NOT_FOUND); return; } - long fileLength = raf.length(); - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); - setContentTypeHeader(response, file); - - // since the log and out files are rapidly changing, we don't want to browser to cache them - if (!(requestPath.contains("log") || requestPath.contains("out"))) { - setDateAndCacheHeaders(response, file); - } - if (HttpHeaders.isKeepAlive(request)) { - response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); - } - HttpHeaders.setContentLength(response, fileLength); + try { + long fileLength = raf.length(); - // write the initial line and the header. - ctx.write(response); + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + setContentTypeHeader(response, file); - // write the content. - ChannelFuture lastContentFuture; - if (ctx.pipeline().get(SslHandler.class) == null) { - ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise()); - lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); - } else { - lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)), - ctx.newProgressivePromise()); - // HttpChunkedInput will write the end marker (LastHttpContent) for us. - } + // since the log and out files are rapidly changing, we don't want to browser to cache them + if (!(requestPath.contains("log") || requestPath.contains("out"))) { + setDateAndCacheHeaders(response, file); + } + if (HttpHeaders.isKeepAlive(request)) { + response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + } + HttpHeaders.setContentLength(response, fileLength); + + // write the initial line and the header. + ctx.write(response); + + // write the content. + ChannelFuture lastContentFuture; + if (ctx.pipeline().get(SslHandler.class) == null) { + ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise()); + lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + } else { + lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)), + ctx.newProgressivePromise()); + // HttpChunkedInput will write the end marker (LastHttpContent) for us. + } - // close the connection, if no keep-alive is needed - if (!HttpHeaders.isKeepAlive(request)) { - lastContentFuture.addListener(ChannelFutureListener.CLOSE); + // close the connection, if no keep-alive is needed + if (!HttpHeaders.isKeepAlive(request)) { + lastContentFuture.addListener(ChannelFutureListener.CLOSE); + } + } catch (Exception e) { + raf.close(); + logger.error("Failed to serve file.", e); + sendError(ctx, INTERNAL_SERVER_ERROR); } } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java index 31e9bbcd355b9..ba0e2d2d154c1 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java @@ -205,37 +205,44 @@ private void respondWithFile(ChannelHandlerContext ctx, HttpRequest request, Str StaticFileServerHandler.sendError(ctx, NOT_FOUND); return; } - long fileLength = raf.length(); - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); - StaticFileServerHandler.setContentTypeHeader(response, file); + try { + long fileLength = raf.length(); - // the job overview should be updated as soon as possible - if (!requestPath.equals("/joboverview.json")) { - StaticFileServerHandler.setDateAndCacheHeaders(response, file); - } - if (HttpHeaders.isKeepAlive(request)) { - response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); - } - HttpHeaders.setContentLength(response, fileLength); - - // write the initial line and the header. - ctx.write(response); - - // write the content. - ChannelFuture lastContentFuture; - if (ctx.pipeline().get(SslHandler.class) == null) { - ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise()); - lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); - } else { - lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)), - ctx.newProgressivePromise()); - // HttpChunkedInput will write the end marker (LastHttpContent) for us. - } + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + StaticFileServerHandler.setContentTypeHeader(response, file); + + // the job overview should be updated as soon as possible + if (!requestPath.equals("/joboverview.json")) { + StaticFileServerHandler.setDateAndCacheHeaders(response, file); + } + if (HttpHeaders.isKeepAlive(request)) { + response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + } + HttpHeaders.setContentLength(response, fileLength); + + // write the initial line and the header. + ctx.write(response); + + // write the content. + ChannelFuture lastContentFuture; + if (ctx.pipeline().get(SslHandler.class) == null) { + ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise()); + lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + } else { + lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)), + ctx.newProgressivePromise()); + // HttpChunkedInput will write the end marker (LastHttpContent) for us. + } - // close the connection, if no keep-alive is needed - if (!HttpHeaders.isKeepAlive(request)) { - lastContentFuture.addListener(ChannelFutureListener.CLOSE); + // close the connection, if no keep-alive is needed + if (!HttpHeaders.isKeepAlive(request)) { + lastContentFuture.addListener(ChannelFutureListener.CLOSE); + } + } catch (Exception e) { + raf.close(); + LOG.error("Failed to serve file.", e); + StaticFileServerHandler.sendError(ctx, INTERNAL_SERVER_ERROR); } }