Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-11134][rest] Do not log stacktrace handled exceptions #7346

Merged
merged 3 commits into from Jan 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
Expand All @@ -50,6 +51,7 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -117,15 +119,13 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe
try {
request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
} catch (JsonParseException | JsonMappingException je) {
log.error("Request did not conform to expected format.", je);
throw new RestHandlerException("Bad request received.", HttpResponseStatus.BAD_REQUEST, je);
throw new RestHandlerException("Bad request received. Request did not conform to expected format.", HttpResponseStatus.BAD_REQUEST, je);
}
} else {
try {
ByteBufInputStream in = new ByteBufInputStream(msgContent);
request = MAPPER.readValue(in, untypedResponseMessageHeaders.getRequestClass());
} catch (JsonParseException | JsonMappingException je) {
log.error("Failed to read request.", je);
throw new RestHandlerException(
String.format("Request did not match expected format %s.", untypedResponseMessageHeaders.getRequestClass().getSimpleName()),
HttpResponseStatus.BAD_REQUEST,
Expand Down Expand Up @@ -160,28 +160,49 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe
final FileUploads finalUploadedFiles = uploadedFiles;
requestProcessingFuture
.whenComplete((Void ignored, Throwable throwable) -> {
inFlightRequestTracker.deregisterRequest();
cleanupFileUploads(finalUploadedFiles);
if (throwable != null) {
handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest)
.whenComplete((Void ignored2, Throwable throwable2) -> finalizeRequestProcessing(finalUploadedFiles));
} else {
finalizeRequestProcessing(finalUploadedFiles);
}
});
} catch (RestHandlerException rhe) {
inFlightRequestTracker.deregisterRequest();
HandlerUtils.sendErrorResponse(
} catch (Throwable e) {
final FileUploads finalUploadedFiles = uploadedFiles;
handleException(e, ctx, httpRequest)
.whenComplete((Void ignored, Throwable throwable) -> finalizeRequestProcessing(finalUploadedFiles));
}
}

private void finalizeRequestProcessing(FileUploads uploadedFiles) {
inFlightRequestTracker.deregisterRequest();
cleanupFileUploads(uploadedFiles);
}

private CompletableFuture<Void> handleException(Throwable throwable, ChannelHandlerContext ctx, HttpRequest httpRequest) {
if (throwable instanceof RestHandlerException) {
RestHandlerException rhe = (RestHandlerException) throwable;
if (log.isDebugEnabled()) {
log.error("Exception occurred in REST handler.", rhe);
} else {
log.error("Exception occurred in REST handler: {}", rhe.getMessage());
}
return HandlerUtils.sendErrorResponse(
ctx,
httpRequest,
new ErrorResponseBody(rhe.getMessage()),
rhe.getHttpResponseStatus(),
responseHeaders);
cleanupFileUploads(uploadedFiles);
} catch (Throwable e) {
inFlightRequestTracker.deregisterRequest();
log.error("Request processing failed.", e);
HandlerUtils.sendErrorResponse(
} else {
log.error("Implementation error: Unhandled exception.", throwable);
String stackTrace = String.format("<Exception on server side:%n%s%nEnd of exception on server side>",
ExceptionUtils.stringifyException(throwable));
return HandlerUtils.sendErrorResponse(
ctx,
httpRequest,
new ErrorResponseBody("Internal server error."),
new ErrorResponseBody(Arrays.asList("Internal server error.", stackTrace)),
HttpResponseStatus.INTERNAL_SERVER_ERROR,
responseHeaders);
cleanupFileUploads(uploadedFiles);
}
}

Expand Down
Expand Up @@ -19,17 +19,14 @@
package org.apache.flink.runtime.rest.handler;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
Expand All @@ -39,7 +36,6 @@

import javax.annotation.Nonnull;

import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -80,25 +76,7 @@ protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, Ht
response = FutureUtils.completedExceptionally(e);
}

return response.handle((resp, throwable) -> throwable != null ?
errorResponse(throwable) : Tuple2.of(resp, messageHeaders.getResponseStatusCode()))
.thenCompose(r -> HandlerUtils.sendResponse(ctx, httpRequest, r.f0, r.f1, responseHeaders));
}

private Tuple2<ResponseBody, HttpResponseStatus> errorResponse(Throwable throwable) {
Throwable error = ExceptionUtils.stripCompletionException(throwable);
if (error instanceof RestHandlerException) {
final RestHandlerException rhe = (RestHandlerException) error;
log.error("Exception occurred in REST handler.", rhe);
return Tuple2.of(new ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus());
} else {
log.error("Implementation error: Unhandled exception.", error);
String stackTrace = String.format("<Exception on server side:%n%s%nEnd of exception on server side>",
ExceptionUtils.stringifyException(throwable));
return Tuple2.of(
new ErrorResponseBody(Arrays.asList("Internal server error.", stackTrace)),
HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
return response.thenAccept(resp -> HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode(), responseHeaders));
}

/**
Expand Down
Expand Up @@ -24,12 +24,11 @@
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
import org.apache.flink.runtime.rest.NotFoundException;
import org.apache.flink.runtime.rest.handler.AbstractHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
Expand Down Expand Up @@ -75,7 +74,6 @@

import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

Expand Down Expand Up @@ -126,11 +124,7 @@ protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, Ht
blobKeyFuture = fileBlobKeys.get(taskManagerId);
} catch (ExecutionException e) {
final Throwable cause = ExceptionUtils.stripExecutionException(e);
if (cause instanceof RestHandlerException) {
throw (RestHandlerException) cause;
} else {
throw new RestHandlerException("Could not retrieve file blob key future.", HttpResponseStatus.INTERNAL_SERVER_ERROR, e);
}
throw new RestHandlerException("Could not retrieve file blob key future.", HttpResponseStatus.INTERNAL_SERVER_ERROR, cause);
}

final CompletableFuture<Void> resultFuture = blobKeyFuture.thenAcceptAsync(
Expand Down Expand Up @@ -160,23 +154,18 @@ protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, Ht
fileBlobKeys.invalidate(taskManagerId);

final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
final ErrorResponseBody errorResponseBody;
final HttpResponseStatus httpResponseStatus;

if (strippedThrowable instanceof UnknownTaskExecutorException) {
errorResponseBody = new ErrorResponseBody("Unknown TaskExecutor " + taskManagerId + '.');
httpResponseStatus = HttpResponseStatus.NOT_FOUND;
throw new CompletionException(
new NotFoundException(
String.format("Failed to transfer file from TaskExecutor %s because it was unknown.", taskManagerId),
strippedThrowable));
} else {
errorResponseBody = new ErrorResponseBody("Internal server error: " + throwable.getMessage() + '.');
httpResponseStatus = INTERNAL_SERVER_ERROR;
throw new CompletionException(
new FlinkException(
String.format("Failed to transfer file from TaskExecutor %s.", taskManagerId),
strippedThrowable));
}

HandlerUtils.sendErrorResponse(
ctx,
httpRequest,
errorResponseBody,
httpResponseStatus,
responseHeaders);
}
});
}
Expand Down