Skip to content

Commit

Permalink
[hotfix][rest] Centralize REST error logging
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Jan 10, 2019
1 parent c06e7a5 commit 1f47c0a
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 55 deletions.
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
Expand All @@ -50,6 +51,7 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -157,33 +159,49 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe
final FileUploads finalUploadedFiles = uploadedFiles;
requestProcessingFuture
.whenComplete((Void ignored, Throwable throwable) -> {
inFlightRequestTracker.deregisterRequest();
cleanupFileUploads(finalUploadedFiles);
if (throwable != null) {
handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest)
.whenComplete((Void ignored2, Throwable throwable2) -> finalizeRequestProcessing(finalUploadedFiles));
} else {
finalizeRequestProcessing(finalUploadedFiles);
}
});
} catch (RestHandlerException rhe) {
inFlightRequestTracker.deregisterRequest();
} catch (Throwable e) {
final FileUploads finalUploadedFiles = uploadedFiles;
handleException(e, ctx, httpRequest)
.whenComplete((Void ignored, Throwable throwable) -> finalizeRequestProcessing(finalUploadedFiles));
}
}

private void finalizeRequestProcessing(FileUploads uploadedFiles) {
inFlightRequestTracker.deregisterRequest();
cleanupFileUploads(uploadedFiles);
}

private CompletableFuture<Void> handleException(Throwable throwable, ChannelHandlerContext ctx, HttpRequest httpRequest) {
if (throwable instanceof RestHandlerException) {
RestHandlerException rhe = (RestHandlerException) throwable;
if (log.isDebugEnabled()) {
log.error("Exception occurred in REST handler.", rhe);
} else {
log.error("Exception occurred in REST handler: {}", rhe.getMessage());
}
HandlerUtils.sendErrorResponse(
return HandlerUtils.sendErrorResponse(
ctx,
httpRequest,
new ErrorResponseBody(rhe.getMessage()),
rhe.getHttpResponseStatus(),
responseHeaders);
cleanupFileUploads(uploadedFiles);
} catch (Throwable e) {
inFlightRequestTracker.deregisterRequest();
log.error("Request processing failed.", e);
HandlerUtils.sendErrorResponse(
} else {
log.error("Implementation error: Unhandled exception.", throwable);
String stackTrace = String.format("<Exception on server side:%n%s%nEnd of exception on server side>",
ExceptionUtils.stringifyException(throwable));
return HandlerUtils.sendErrorResponse(
ctx,
httpRequest,
new ErrorResponseBody("Internal server error."),
new ErrorResponseBody(Arrays.asList("Internal server error.", stackTrace)),
HttpResponseStatus.INTERNAL_SERVER_ERROR,
responseHeaders);
cleanupFileUploads(uploadedFiles);
}
}

Expand Down
Expand Up @@ -19,17 +19,14 @@
package org.apache.flink.runtime.rest.handler;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
Expand All @@ -39,7 +36,6 @@

import javax.annotation.Nonnull;

import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -79,29 +75,7 @@ protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, Ht
response = FutureUtils.completedExceptionally(e);
}

return response.handle((resp, throwable) -> throwable != null ?
errorResponse(throwable) : Tuple2.of(resp, messageHeaders.getResponseStatusCode()))
.thenCompose(r -> HandlerUtils.sendResponse(ctx, httpRequest, r.f0, r.f1, responseHeaders));
}

private Tuple2<ResponseBody, HttpResponseStatus> errorResponse(Throwable throwable) {
Throwable error = ExceptionUtils.stripCompletionException(throwable);
if (error instanceof RestHandlerException) {
final RestHandlerException rhe = (RestHandlerException) error;
if (log.isDebugEnabled()) {
log.error("Exception occurred in REST handler.", rhe);
} else {
log.error("Exception occurred in REST handler: {}", rhe.getMessage());
}
return Tuple2.of(new ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus());
} else {
log.error("Implementation error: Unhandled exception.", error);
String stackTrace = String.format("<Exception on server side:%n%s%nEnd of exception on server side>",
ExceptionUtils.stringifyException(throwable));
return Tuple2.of(
new ErrorResponseBody(Arrays.asList("Internal server error.", stackTrace)),
HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
return response.thenAccept(resp -> HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode(), responseHeaders));
}

/**
Expand Down
Expand Up @@ -24,12 +24,11 @@
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
import org.apache.flink.runtime.rest.NotFoundException;
import org.apache.flink.runtime.rest.handler.AbstractHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
Expand Down Expand Up @@ -75,7 +74,6 @@

import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

Expand Down Expand Up @@ -159,23 +157,18 @@ protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, Ht
fileBlobKeys.invalidate(taskManagerId);

final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
final ErrorResponseBody errorResponseBody;
final HttpResponseStatus httpResponseStatus;

if (strippedThrowable instanceof UnknownTaskExecutorException) {
errorResponseBody = new ErrorResponseBody("Unknown TaskExecutor " + taskManagerId + '.');
httpResponseStatus = HttpResponseStatus.NOT_FOUND;
throw new CompletionException(
new NotFoundException(
String.format("Failed to transfer file from TaskExecutor %s because it was unknown.", taskManagerId),
strippedThrowable));
} else {
errorResponseBody = new ErrorResponseBody("Internal server error: " + throwable.getMessage() + '.');
httpResponseStatus = INTERNAL_SERVER_ERROR;
throw new CompletionException(
new FlinkException(
String.format("Failed to transfer file from TaskExecutor %s.", taskManagerId),
strippedThrowable));
}

HandlerUtils.sendErrorResponse(
ctx,
httpRequest,
errorResponseBody,
httpResponseStatus,
responseHeaders);
}
});
}
Expand Down

0 comments on commit 1f47c0a

Please sign in to comment.