Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,16 @@
</ul>
</td>
</tr>
<tr>
<td colspan="2">Query parameters</td>
</tr>
<tr>
<td colspan="2">
<ul>
<li><code>maxExceptions</code> (optional): Comma-separated list of integer values that specifies the upper limit of exceptions to return.</li>
</ul>
</td>
</tr>
<tr>
<td colspan="2">
<label>
Expand Down
9 changes: 9 additions & 0 deletions docs/static/generated/rest_v1_dispatcher.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ paths:
required: true
schema:
$ref: "#/components/schemas/ApplicationID"
- name: maxExceptions
in: query
description: Comma-separated list of integer values that specifies the upper
limit of exceptions to return.
required: false
style: form
schema:
type: integer
format: int32
responses:
"200":
description: The request was successful.
Expand Down
5 changes: 4 additions & 1 deletion flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@
} ]
},
"query-parameters" : {
"queryParameters" : [ ]
"queryParameters" : [ {
"key" : "maxExceptions",
"mandatory" : false
} ]
},
"request" : {
"type" : "object",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.messages.ApplicationExceptionsInfoWithHistory;
import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.application.ApplicationExceptionsMessageParameters;
import org.apache.flink.runtime.rest.messages.job.UpperLimitExceptionParameter;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ApplicationJsonArchivist;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
Expand All @@ -38,6 +39,7 @@
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

Expand All @@ -47,17 +49,19 @@ public class ApplicationExceptionsHandler
RestfulGateway,
EmptyRequestBody,
ApplicationExceptionsInfoWithHistory,
ApplicationMessageParameters>
ApplicationExceptionsMessageParameters>
implements ApplicationJsonArchivist {

static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just an observation, that in https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java#L63 we also have same constant. maybe worth extracting if we revisit default in the future. however, it is just an observation (not a blocker).


public ApplicationExceptionsHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Duration timeout,
Map<String, String> responseHeaders,
MessageHeaders<
EmptyRequestBody,
ApplicationExceptionsInfoWithHistory,
ApplicationMessageParameters>
ApplicationExceptionsMessageParameters>
messageHeaders) {
super(leaderRetriever, timeout, responseHeaders, messageHeaders);
}
Expand All @@ -66,13 +70,20 @@ public ApplicationExceptionsHandler(
public CompletableFuture<ApplicationExceptionsInfoWithHistory> handleRequest(
@Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull RestfulGateway gateway) {
ApplicationID applicationId = request.getPathParameter(ApplicationIDPathParameter.class);
final List<Integer> exceptionToReportMaxSizes =
request.getQueryParameter(UpperLimitExceptionParameter.class);
final int exceptionToReportMaxSize =
exceptionToReportMaxSizes.size() > 0
? exceptionToReportMaxSizes.get(0)
: MAX_NUMBER_EXCEPTION_TO_REPORT;

return gateway.requestApplication(applicationId, timeout)
.thenApply(
archivedApplication ->
ApplicationExceptionsInfoWithHistory
.fromApplicationExceptionHistory(
archivedApplication.getExceptionHistory()));
archivedApplication.getExceptionHistory(),
exceptionToReportMaxSize));
}

@Override
Expand All @@ -88,6 +99,7 @@ public Collection<ArchivedJson> archiveApplicationWithPath(
new ArchivedJson(
path,
ApplicationExceptionsInfoWithHistory.fromApplicationExceptionHistory(
archivedApplication.getExceptionHistory())));
archivedApplication.getExceptionHistory(),
MAX_NUMBER_EXCEPTION_TO_REPORT)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,11 @@ public String toString() {
}

public static ApplicationExceptionsInfoWithHistory fromApplicationExceptionHistory(
Collection<ApplicationExceptionHistoryEntry> exceptions) {
Collection<ApplicationExceptionHistoryEntry> exceptions, int maxSize) {
return new ApplicationExceptionsInfoWithHistory(
new ApplicationExceptionHistory(
exceptions.stream()
.limit(maxSize)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that change the existing behavior (the return order)?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Stream.limit(n) preserves encounter order and just truncates to the first n elements, same iteration order as before, only shorter

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But which part/slice of history that gives us in the result? After this PR the user of this API will receive the most oldest one (it drops the most recent). And it looks inconsistent with JobExceptions history, which reverses first and as a result returns newest.
Is it intentional? Or I'm missing something?

.map(
exception ->
new ApplicationExceptionInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.runtime.rest.handler.application.ApplicationExceptionsHandler;
import org.apache.flink.runtime.rest.messages.ApplicationExceptionsInfoWithHistory;
import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;

Expand All @@ -33,7 +32,7 @@ public class ApplicationExceptionsHeaders
implements RuntimeMessageHeaders<
EmptyRequestBody,
ApplicationExceptionsInfoWithHistory,
ApplicationMessageParameters> {
ApplicationExceptionsMessageParameters> {

private static final ApplicationExceptionsHeaders INSTANCE = new ApplicationExceptionsHeaders();

Expand All @@ -58,8 +57,8 @@ public HttpResponseStatus getResponseStatusCode() {
}

@Override
public ApplicationMessageParameters getUnresolvedMessageParameters() {
return new ApplicationMessageParameters();
public ApplicationExceptionsMessageParameters getUnresolvedMessageParameters() {
return new ApplicationExceptionsMessageParameters();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
/** {@link MessageParameters} for {@link ApplicationExceptionsHandler}. */
public class ApplicationExceptionsMessageParameters extends ApplicationMessageParameters {

private final UpperLimitExceptionParameter upperLimitExceptionParameter =
public final UpperLimitExceptionParameter upperLimitExceptionParameter =
new UpperLimitExceptionParameter();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@
import org.apache.flink.runtime.rest.handler.HandlerRequestException;
import org.apache.flink.runtime.rest.messages.ApplicationExceptionsInfoWithHistory;
import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.application.ApplicationExceptionsHeaders;
import org.apache.flink.runtime.rest.messages.application.ApplicationExceptionsMessageParameters;
import org.apache.flink.runtime.rest.messages.job.UpperLimitExceptionParameter;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
Expand Down Expand Up @@ -57,13 +58,28 @@ class ApplicationExceptionsHandlerTest {

private static HandlerRequest<EmptyRequestBody> createRequest(ApplicationID applicationId)
throws HandlerRequestException {
return createRequest(applicationId, Collections.emptyMap());
}

private static HandlerRequest<EmptyRequestBody> createRequest(
ApplicationID applicationId, int maxExceptions) throws HandlerRequestException {
Map<String, List<String>> queryParameters = new HashMap<>();
queryParameters.put(
UpperLimitExceptionParameter.KEY,
Collections.singletonList(Integer.toString(maxExceptions)));
return createRequest(applicationId, queryParameters);
}

private static HandlerRequest<EmptyRequestBody> createRequest(
ApplicationID applicationId, Map<String, List<String>> queryParameters)
throws HandlerRequestException {
Map<String, String> pathParameters = new HashMap<>();
pathParameters.put(ApplicationIDPathParameter.KEY, applicationId.toString());
return HandlerRequest.resolveParametersAndCreate(
EmptyRequestBody.getInstance(),
new ApplicationMessageParameters(),
new ApplicationExceptionsMessageParameters(),
pathParameters,
Collections.emptyMap(),
queryParameters,
Collections.emptyList());
}

Expand Down Expand Up @@ -143,6 +159,116 @@ void testExceptionWithJobId() throws Exception {
assertThat(exceptionInfo.getJobId()).isEqualTo(jobId);
}

@Test
void testMaxExceptionsLimitsHistorySize() throws Exception {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could u please clarify, whether there are tests for:

  1. Default-cap path
  2. The case when maxException > historySize

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, they were both missing as explicit cases. Added two tests in ApplicationExceptionsHandlerTest.java:

  1. testDefaultCapAppliedWhenMaxExceptionsNotProvided, exercises the default-cap branch in handleRequest. It builds 25 history entries, omits maxExceptions from the request, asserts the response is capped at MAX_NUMBER_EXCEPTION_TO_REPORT (20).
  2. testMaxExceptionsLargerThanHistorySizeReturnsAllEntries, builds 3 history entries, requests maxExceptions=10, asserts the response contains all 3 entries (no padding, no overflow).

final List<ApplicationExceptionHistoryEntry> exceptionHistory = new ArrayList<>();
for (int i = 0; i < 5; i++) {
exceptionHistory.add(
new ApplicationExceptionHistoryEntry(
new RuntimeException("exception #" + i),
System.currentTimeMillis(),
null));
}

final ArchivedApplication applicationWithExceptions =
new ArchivedApplication(
archivedApplication.getApplicationId(),
archivedApplication.getApplicationName(),
ApplicationState.FAILED,
new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L},
Collections.emptyMap(),
exceptionHistory);

testingRestfulGateway =
new TestingRestfulGateway.Builder()
.setRequestApplicationFunction(
applicationId ->
CompletableFuture.completedFuture(
applicationWithExceptions))
.build();

final HandlerRequest<EmptyRequestBody> limitedRequest =
createRequest(archivedApplication.getApplicationId(), 2);

final ApplicationExceptionsInfoWithHistory response =
handler.handleRequest(limitedRequest, testingRestfulGateway).get();

assertThat(response.getExceptionHistory().getEntries()).hasSize(2);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we also verify/assert the intended order?

}

@Test
void testDefaultCapAppliedWhenMaxExceptionsNotProvided() throws Exception {
final int totalExceptions = ApplicationExceptionsHandler.MAX_NUMBER_EXCEPTION_TO_REPORT + 5;
final List<ApplicationExceptionHistoryEntry> exceptionHistory = new ArrayList<>();
for (int i = 0; i < totalExceptions; i++) {
exceptionHistory.add(
new ApplicationExceptionHistoryEntry(
new RuntimeException("exception #" + i),
System.currentTimeMillis(),
null));
}

final ArchivedApplication applicationWithExceptions =
new ArchivedApplication(
archivedApplication.getApplicationId(),
archivedApplication.getApplicationName(),
ApplicationState.FAILED,
new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L},
Collections.emptyMap(),
exceptionHistory);

testingRestfulGateway =
new TestingRestfulGateway.Builder()
.setRequestApplicationFunction(
applicationId ->
CompletableFuture.completedFuture(
applicationWithExceptions))
.build();

final ApplicationExceptionsInfoWithHistory response =
handler.handleRequest(handlerRequest, testingRestfulGateway).get();

assertThat(response.getExceptionHistory().getEntries())
.hasSize(ApplicationExceptionsHandler.MAX_NUMBER_EXCEPTION_TO_REPORT);
}

@Test
void testMaxExceptionsLargerThanHistorySizeReturnsAllEntries() throws Exception {
final List<ApplicationExceptionHistoryEntry> exceptionHistory = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the loop iterates i < 3 and the assertion is .hasSize(3) these are two independent literals that are conceptually the same value. Maybe we can extract this to a variable with a meaningful name.

exceptionHistory.add(
new ApplicationExceptionHistoryEntry(
new RuntimeException("exception #" + i),
System.currentTimeMillis(),
null));
}

final ArchivedApplication applicationWithExceptions =
new ArchivedApplication(
archivedApplication.getApplicationId(),
archivedApplication.getApplicationName(),
ApplicationState.FAILED,
new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L},
Collections.emptyMap(),
exceptionHistory);

testingRestfulGateway =
new TestingRestfulGateway.Builder()
.setRequestApplicationFunction(
applicationId ->
CompletableFuture.completedFuture(
applicationWithExceptions))
.build();

final HandlerRequest<EmptyRequestBody> oversizedRequest =
createRequest(archivedApplication.getApplicationId(), 10);

final ApplicationExceptionsInfoWithHistory response =
handler.handleRequest(oversizedRequest, testingRestfulGateway).get();

assertThat(response.getExceptionHistory().getEntries()).hasSize(3);
}

@Test
void testExceptionWithoutJobId() throws Exception {
final RuntimeException rootCause = new RuntimeException("exception #0");
Expand Down