From 3e42f7b317b9e837697eb8818b5ad26b190cbf6e Mon Sep 17 00:00:00 2001 From: zentol Date: Thu, 20 Dec 2018 14:18:46 +0100 Subject: [PATCH 1/3] [FLINK-11134][rest] Do not log stacktrace for handled exceptions --- .../flink/runtime/rest/handler/AbstractHandler.java | 9 ++++++--- .../flink/runtime/rest/handler/AbstractRestHandler.java | 6 +++++- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java index 5a1c371d5a671..3ca0bd38f49d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java @@ -117,15 +117,13 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe try { request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass()); } catch (JsonParseException | JsonMappingException je) { - log.error("Request did not conform to expected format.", je); - throw new RestHandlerException("Bad request received.", HttpResponseStatus.BAD_REQUEST, je); + throw new RestHandlerException("Bad request received. Request did not conform to expected format.", HttpResponseStatus.BAD_REQUEST, je); } } else { try { ByteBufInputStream in = new ByteBufInputStream(msgContent); request = MAPPER.readValue(in, untypedResponseMessageHeaders.getRequestClass()); } catch (JsonParseException | JsonMappingException je) { - log.error("Failed to read request.", je); throw new RestHandlerException( String.format("Request did not match expected format %s.", untypedResponseMessageHeaders.getRequestClass().getSimpleName()), HttpResponseStatus.BAD_REQUEST, @@ -165,6 +163,11 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe }); } catch (RestHandlerException rhe) { inFlightRequestTracker.deregisterRequest(); + if (log.isDebugEnabled()) { + log.error("Exception occurred in REST handler.", rhe); + } else { + log.error("Exception occurred in REST handler: {}", rhe.getMessage()); + } HandlerUtils.sendErrorResponse( ctx, httpRequest, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java index 0397cb875f295..3d74a7b80b02e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java @@ -89,7 +89,11 @@ private Tuple2 errorResponse(Throwable throwab Throwable error = ExceptionUtils.stripCompletionException(throwable); if (error instanceof RestHandlerException) { final RestHandlerException rhe = (RestHandlerException) error; - log.error("Exception occurred in REST handler.", rhe); + if (log.isDebugEnabled()) { + log.error("Exception occurred in REST handler.", rhe); + } else { + log.error("Exception occurred in REST handler: {}", rhe.getMessage()); + } return Tuple2.of(new ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus()); } else { log.error("Implementation error: Unhandled exception.", error); From 5737bad4402cef1f5fae725f6ef972cea0fd5477 Mon Sep 17 00:00:00 2001 From: zentol Date: Thu, 3 Jan 2019 17:29:56 +0100 Subject: [PATCH 2/3] [hotfix][rest] Centralize REST error logging --- .../runtime/rest/handler/AbstractHandler.java | 42 +++++++++++++------ .../rest/handler/AbstractRestHandler.java | 28 +------------ .../AbstractTaskManagerFileHandler.java | 25 ++++------- 3 files changed, 40 insertions(+), 55 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java index 3ca0bd38f49d3..d46bb13d4265d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException; @@ -50,6 +51,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.Arrays; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -158,33 +160,49 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe final FileUploads finalUploadedFiles = uploadedFiles; requestProcessingFuture .whenComplete((Void ignored, Throwable throwable) -> { - inFlightRequestTracker.deregisterRequest(); - cleanupFileUploads(finalUploadedFiles); + if (throwable != null) { + handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest) + .whenComplete((Void ignored2, Throwable throwable2) -> finalizeRequestProcessing(finalUploadedFiles)); + } else { + finalizeRequestProcessing(finalUploadedFiles); + } }); - } catch (RestHandlerException rhe) { - inFlightRequestTracker.deregisterRequest(); + } catch (Throwable e) { + final FileUploads finalUploadedFiles = uploadedFiles; + handleException(e, ctx, httpRequest) + .whenComplete((Void ignored, Throwable throwable) -> finalizeRequestProcessing(finalUploadedFiles)); + } + } + + private void finalizeRequestProcessing(FileUploads uploadedFiles) { + inFlightRequestTracker.deregisterRequest(); + cleanupFileUploads(uploadedFiles); + } + + private CompletableFuture handleException(Throwable throwable, ChannelHandlerContext ctx, HttpRequest httpRequest) { + if (throwable instanceof RestHandlerException) { + RestHandlerException rhe = (RestHandlerException) throwable; if (log.isDebugEnabled()) { log.error("Exception occurred in REST handler.", rhe); } else { log.error("Exception occurred in REST handler: {}", rhe.getMessage()); } - HandlerUtils.sendErrorResponse( + return HandlerUtils.sendErrorResponse( ctx, httpRequest, new ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus(), responseHeaders); - cleanupFileUploads(uploadedFiles); - } catch (Throwable e) { - inFlightRequestTracker.deregisterRequest(); - log.error("Request processing failed.", e); - HandlerUtils.sendErrorResponse( + } else { + log.error("Implementation error: Unhandled exception.", throwable); + String stackTrace = String.format("", + ExceptionUtils.stringifyException(throwable)); + return HandlerUtils.sendErrorResponse( ctx, httpRequest, - new ErrorResponseBody("Internal server error."), + new ErrorResponseBody(Arrays.asList("Internal server error.", stackTrace)), HttpResponseStatus.INTERNAL_SERVER_ERROR, responseHeaders); - cleanupFileUploads(uploadedFiles); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java index 3d74a7b80b02e..992e2c58d061c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java @@ -19,17 +19,14 @@ package org.apache.flink.runtime.rest.handler; import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.rest.handler.util.HandlerUtils; -import org.apache.flink.runtime.rest.messages.ErrorResponseBody; import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.MessageParameters; import org.apache.flink.runtime.rest.messages.RequestBody; import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; @@ -39,7 +36,6 @@ import javax.annotation.Nonnull; -import java.util.Arrays; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -80,29 +76,7 @@ protected CompletableFuture respondToRequest(ChannelHandlerContext ctx, Ht response = FutureUtils.completedExceptionally(e); } - return response.handle((resp, throwable) -> throwable != null ? - errorResponse(throwable) : Tuple2.of(resp, messageHeaders.getResponseStatusCode())) - .thenCompose(r -> HandlerUtils.sendResponse(ctx, httpRequest, r.f0, r.f1, responseHeaders)); - } - - private Tuple2 errorResponse(Throwable throwable) { - Throwable error = ExceptionUtils.stripCompletionException(throwable); - if (error instanceof RestHandlerException) { - final RestHandlerException rhe = (RestHandlerException) error; - if (log.isDebugEnabled()) { - log.error("Exception occurred in REST handler.", rhe); - } else { - log.error("Exception occurred in REST handler: {}", rhe.getMessage()); - } - return Tuple2.of(new ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus()); - } else { - log.error("Implementation error: Unhandled exception.", error); - String stackTrace = String.format("", - ExceptionUtils.stringifyException(throwable)); - return Tuple2.of( - new ErrorResponseBody(Arrays.asList("Internal server error.", stackTrace)), - HttpResponseStatus.INTERNAL_SERVER_ERROR); - } + return response.thenAccept(resp -> HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode(), responseHeaders)); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java index 8a20868ce3733..1781fe2f24efa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java @@ -24,12 +24,11 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException; +import org.apache.flink.runtime.rest.NotFoundException; import org.apache.flink.runtime.rest.handler.AbstractHandler; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; -import org.apache.flink.runtime.rest.handler.util.HandlerUtils; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; -import org.apache.flink.runtime.rest.messages.ErrorResponseBody; import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters; @@ -75,7 +74,6 @@ import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; -import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK; import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1; @@ -160,23 +158,18 @@ protected CompletableFuture respondToRequest(ChannelHandlerContext ctx, Ht fileBlobKeys.invalidate(taskManagerId); final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); - final ErrorResponseBody errorResponseBody; - final HttpResponseStatus httpResponseStatus; if (strippedThrowable instanceof UnknownTaskExecutorException) { - errorResponseBody = new ErrorResponseBody("Unknown TaskExecutor " + taskManagerId + '.'); - httpResponseStatus = HttpResponseStatus.NOT_FOUND; + throw new CompletionException( + new NotFoundException( + String.format("Failed to transfer file from TaskExecutor %s because it was unknown.", taskManagerId), + strippedThrowable)); } else { - errorResponseBody = new ErrorResponseBody("Internal server error: " + throwable.getMessage() + '.'); - httpResponseStatus = INTERNAL_SERVER_ERROR; + throw new CompletionException( + new FlinkException( + String.format("Failed to transfer file from TaskExecutor %s.", taskManagerId), + strippedThrowable)); } - - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - errorResponseBody, - httpResponseStatus, - responseHeaders); } }); } From 8a4ebbbb84cc7cd33da32251a5bb416406e1f15f Mon Sep 17 00:00:00 2001 From: zentol Date: Thu, 3 Jan 2019 17:30:53 +0100 Subject: [PATCH 3/3] [hotfix][rest] Remove unnecessary instanceof check --- .../handler/taskmanager/AbstractTaskManagerFileHandler.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java index 1781fe2f24efa..01d818bfaca9d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java @@ -124,11 +124,7 @@ protected CompletableFuture respondToRequest(ChannelHandlerContext ctx, Ht blobKeyFuture = fileBlobKeys.get(taskManagerId); } catch (ExecutionException e) { final Throwable cause = ExceptionUtils.stripExecutionException(e); - if (cause instanceof RestHandlerException) { - throw (RestHandlerException) cause; - } else { - throw new RestHandlerException("Could not retrieve file blob key future.", HttpResponseStatus.INTERNAL_SERVER_ERROR, e); - } + throw new RestHandlerException("Could not retrieve file blob key future.", HttpResponseStatus.INTERNAL_SERVER_ERROR, cause); } final CompletableFuture resultFuture = blobKeyFuture.thenAcceptAsync(