Skip to content

Commit

Permalink
[FLINK-7806] [flip6] Register CurrentJobsOverviewHandler under /jobs/…
Browse files Browse the repository at this point in the history
…overview

Rename CurrentJobsOverviewHandler to JobsOverviewHandler

Change paths

Remove joboverview/running and joboverview/completed from JobsOverviewHandler

Adapt web ui files

Update rest_api to reflect new REST call /jobs

This changes #4805.
  • Loading branch information
tillrohrmann committed Nov 7, 2017
1 parent 0c62c52 commit 8086e3b
Show file tree
Hide file tree
Showing 22 changed files with 137 additions and 156 deletions.
2 changes: 1 addition & 1 deletion docs/monitoring/historyserver.md
Expand Up @@ -80,7 +80,7 @@ Below is a list of available requests, with a sample JSON response. All requests
Values in angle brackets are variables, for example `http://hostname:port/jobs/<jobid>/exceptions` will have to requested for example as `http://hostname:port/jobs/7684be6004e4e955c2a558a9bc463f65/exceptions`.

- `/config`
- `/joboverview`
- `/jobs/overview`
- `/jobs/<jobid>`
- `/jobs/<jobid>/vertices`
- `/jobs/<jobid>/config`
Expand Down
33 changes: 3 additions & 30 deletions docs/monitoring/rest_api.md
Expand Up @@ -55,9 +55,7 @@ Values in angle brackets are variables, for example `http://hostname:8081/jobs/<

- `/config`
- `/overview`
- `/jobs`
- `/joboverview/running`
- `/joboverview/completed`
- `/jobs/overview`
- `/jobs/<jobid>`
- `/jobs/<jobid>/vertices`
- `/jobs/<jobid>/config`
Expand Down Expand Up @@ -117,31 +115,15 @@ Sample Result:

### Overview of Jobs

**`/jobs`**

IDs of the jobs, grouped by status *running*, *finished*, *failed*, *canceled*.

Sample Result:

~~~
{
"jobs-running": [],
"jobs-finished": ["7684be6004e4e955c2a558a9bc463f65","49306f94d0920216b636e8dd503a6409"],
"jobs-cancelled":[],
"jobs-failed":[]
}
~~~

**`/joboverview`**
**`/jobs/overview`**

Jobs, grouped by status, each with a small summary of its status.

Sample Result:

~~~
{
"running":[],
"finished":[
"jobs":[
{
"jid": "7684be6004e4e955c2a558a9bc463f65",
"name": "Flink Java Job at Wed Sep 16 18:08:21 CEST 2015",
Expand All @@ -168,15 +150,6 @@ Sample Result:
}
~~~

**`/joboverview/running`**

Jobs, grouped by status, each with a small summary of its status. The same as `/joboverview`, but containing only currently running jobs.

**`/joboverview/completed`**

Jobs, grouped by status, each with a small summary of its status. The same as `/joboverview`, but containing only completed (finished, canceled, or failed) jobs.


### Details of a Running or Completed Job

**`/jobs/<jobid>`**
Expand Down
Expand Up @@ -35,10 +35,10 @@
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
Expand Down Expand Up @@ -199,7 +199,7 @@ public CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String

@Override
public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception {
CurrentJobsOverviewHandlerHeaders headers = CurrentJobsOverviewHandlerHeaders.getInstance();
JobsOverviewHeaders headers = JobsOverviewHeaders.getInstance();
CompletableFuture<MultipleJobsDetails> jobDetailsFuture = restClient.sendRequest(
restClusterClientConfiguration.getRestServerAddress(),
restClusterClientConfiguration.getRestServerPort(),
Expand Down
Expand Up @@ -38,12 +38,12 @@
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
Expand Down Expand Up @@ -307,7 +307,7 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
private static class TestListJobsHandler extends TestHandler<EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {

private TestListJobsHandler() {
super(CurrentJobsOverviewHandlerHeaders.getInstance());
super(JobsOverviewHeaders.getInstance());
}

@Override
Expand Down
Expand Up @@ -33,7 +33,6 @@
import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.ConstantTextHandler;
import org.apache.flink.runtime.rest.handler.legacy.CurrentJobIdsHandler;
import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.JobAccumulatorsHandler;
Expand All @@ -48,6 +47,7 @@
import org.apache.flink.runtime.rest.handler.legacy.JobVertexBackPressureHandler;
import org.apache.flink.runtime.rest.handler.legacy.JobVertexDetailsHandler;
import org.apache.flink.runtime.rest.handler.legacy.JobVertexTaskManagersHandler;
import org.apache.flink.runtime.rest.handler.legacy.JobsOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.RequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.SubtaskCurrentAttemptDetailsHandler;
import org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptAccumulatorsHandler;
Expand Down Expand Up @@ -271,6 +271,13 @@ public WebRuntimeMonitor(
get(router, new AggregatingTaskManagersMetricsHandler(scheduledExecutor, metricFetcher));
get(router, new TaskManagerMetricsHandler(scheduledExecutor, metricFetcher));

// overview over jobs
get(router, new JobsOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT));

get(router, new CurrentJobIdsHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT));

get(router, new JobDetailsHandler(executionGraphCache, scheduledExecutor, metricFetcher));

get(router, new AggregatingJobsMetricsHandler(scheduledExecutor, metricFetcher));
get(router, new JobMetricsHandler(scheduledExecutor, metricFetcher));

Expand All @@ -279,15 +286,6 @@ public WebRuntimeMonitor(
get(router, new AggregatingSubtasksMetricsHandler(scheduledExecutor, metricFetcher));
get(router, new SubtaskMetricsHandler(scheduledExecutor, metricFetcher));

// overview over jobs
get(router, new CurrentJobsOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, true, true));
get(router, new CurrentJobsOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, true, false));
get(router, new CurrentJobsOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, false, true));

get(router, new CurrentJobIdsHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT));

get(router, new JobDetailsHandler(executionGraphCache, scheduledExecutor, metricFetcher));

get(router, new JobVertexDetailsHandler(executionGraphCache, scheduledExecutor, metricFetcher));
get(router, new SubtasksTimesHandler(executionGraphCache, scheduledExecutor));
get(router, new JobVertexTaskManagersHandler(executionGraphCache, scheduledExecutor, metricFetcher));
Expand Down Expand Up @@ -421,7 +419,7 @@ public void run() {
*/
public static JsonArchivist[] getJsonArchivists() {
JsonArchivist[] archivists = {
new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist(),
new JobsOverviewHandler.CurrentJobsOverviewJsonArchivist(),

new JobPlanHandler.JobPlanJsonArchivist(),
new JobConfigHandler.JobConfigJsonArchivist(),
Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.history.FsJobArchivist;
import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.JobsOverviewHandler;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.FileUtils;

Expand Down Expand Up @@ -224,7 +224,7 @@ public void run() {
}

/**
* This method replicates the JSON response that would be given by the {@link CurrentJobsOverviewHandler} when
* This method replicates the JSON response that would be given by the {@link JobsOverviewHandler} when
* listing both running and finished jobs.
*
* <p>Every job archive contains a joboverview.json file containing the same structure. Since jobs are archived on
Expand Down
Expand Up @@ -28,6 +28,8 @@
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.messages.ArchiveMessages;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.util.TestLogger;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -53,7 +55,7 @@
/**
* Tests for the HistoryServer.
*/
public class HistoryServerTest {
public class HistoryServerTest extends TestLogger {

@Rule
public TemporaryFolder tmpDir = new TemporaryFolder();
Expand Down Expand Up @@ -91,7 +93,7 @@ public void testFullArchiveLifecycle() throws Exception {
numFinishedPolls.await(10L, TimeUnit.SECONDS);

ObjectMapper mapper = new ObjectMapper();
String response = getFromHTTP(baseUrl + "/joboverview");
String response = getFromHTTP(baseUrl + JobsOverviewHeaders.URL);
JsonNode overview = mapper.readTree(response);

String jobID = overview.get("finished").get(0).get("jid").asText();
Expand Down
Expand Up @@ -86,7 +86,7 @@ angular.module('flinkApp')
@listJobs = ->
deferred = $q.defer()

$http.get flinkConfig.jobServer + "joboverview"
$http.get flinkConfig.jobServer + "jobs/overview"
.success (data, status, headers, config) =>
angular.forEach data, (list, listKey) =>
switch listKey
Expand Down
4 changes: 2 additions & 2 deletions flink-runtime-web/web-dashboard/web/js/hs/index.js

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions flink-runtime-web/web-dashboard/web/js/index.js

Large diffs are not rendered by default.

Expand Up @@ -223,7 +223,7 @@ public CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> request
}

@Override
public CompletableFuture<MultipleJobsDetails> requestJobDetails(boolean includeRunning, boolean includeFinished, Time timeout) {
public CompletableFuture<MultipleJobsDetails> requestJobDetails(Time timeout) {
return FutureUtils.toJava(
jobManagerGateway
.ask(new RequestJobDetails(true, true), FutureUtils.toFiniteDuration(timeout))
Expand Down
Expand Up @@ -330,7 +330,7 @@ public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
}

@Override
public CompletableFuture<MultipleJobsDetails> requestJobDetails(boolean includeRunning, boolean includeFinished, Time timeout) {
public CompletableFuture<MultipleJobsDetails> requestJobDetails(Time timeout) {
final int numberJobsRunning = jobManagerRunners.size();

ArrayList<CompletableFuture<JobDetails>> individualJobDetails = new ArrayList<>(numberJobsRunning);
Expand Down
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.RestServerEndpoint;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
Expand All @@ -37,14 +36,14 @@
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
Expand All @@ -56,7 +55,6 @@
import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfo;
import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
Expand All @@ -66,6 +64,7 @@
import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
Expand Down Expand Up @@ -162,17 +161,12 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
executor,
restConfiguration.getRefreshInterval()));

LegacyRestHandlerAdapter<DispatcherGateway, MultipleJobsDetails, EmptyMessageParameters> currentJobsOverviewHandler = new LegacyRestHandlerAdapter<>(
JobsOverviewHandler<DispatcherGateway> jobsOverviewHandler = new JobsOverviewHandler<>(
restAddressFuture,
leaderRetriever,
timeout,
responseHeaders,
CurrentJobsOverviewHandlerHeaders.getInstance(),
new CurrentJobsOverviewHandler(
executor,
timeout,
true,
true));
JobsOverviewHeaders.getInstance());

LegacyRestHandlerAdapter<DispatcherGateway, ClusterConfigurationInfo, EmptyMessageParameters> clusterConfigurationHandler = new LegacyRestHandlerAdapter<>(
restAddressFuture,
Expand Down Expand Up @@ -331,7 +325,7 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler));
handlers.add(Tuple2.of(ClusterConfigurationInfoHeaders.getInstance(), clusterConfigurationHandler));
handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigurationHandler));
handlers.add(Tuple2.of(CurrentJobsOverviewHandlerHeaders.getInstance(), currentJobsOverviewHandler));
handlers.add(Tuple2.of(JobsOverviewHeaders.getInstance(), jobsOverviewHandler));
handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobTerminationHandler));
handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler));
handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));
Expand Down
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.job;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import javax.annotation.Nonnull;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Overview handler for jobs.
*
* @param <T> type of the leader gateway
*/
public class JobsOverviewHandler<T extends RestfulGateway> extends AbstractRestHandler<T, EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {

public JobsOverviewHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<? extends T> leaderRetriever,
Time timeout,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> messageHeaders) {
super(
localRestAddress,
leaderRetriever,
timeout,
responseHeaders,
messageHeaders);
}

@Override
protected CompletableFuture<MultipleJobsDetails> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull T gateway) throws RestHandlerException {
return gateway.requestJobDetails(timeout);
}
}
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.util.EnvironmentInformation;
Expand All @@ -41,7 +42,6 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;

import static org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders.CLUSTER_OVERVIEW_REST_PATH;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand All @@ -63,7 +63,7 @@ public ClusterOverviewHandler(Executor executor, Time timeout) {

@Override
public String[] getPaths() {
return new String[]{CLUSTER_OVERVIEW_REST_PATH};
return new String[]{ClusterOverviewHeaders.URL};
}

@Override
Expand Down

2 comments on commit 8086e3b

@uce
Copy link
Contributor

@uce uce commented on 8086e3b Dec 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that the project historically did not consider the REST API as a public API, but I would vote to note down all of these breaking REST API changes for the upcoming 1.5 release notes in order to have a good migration path for users. I just ran into this when pointing a tool that was using the jobsoverview endpoint of 1.4 to the latest master and had to look into what happened when I got a 404.

It might also be worth to add redirects in 1.5. and only remove them in the release after that (cc @zentol).

@tillrohrmann
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid point. I'll add the release notes.

Please sign in to comment.