Skip to content

Commit

Permalink
[FLINK-7638] [flip6] Port CurrentJobsOverviewHandler to new REST endp…
Browse files Browse the repository at this point in the history
…oint

Ports the CurrentJobsOverviewHandler to the new REST endpoint by letting it implement
the LegacyRestHandler interface. This commit changes the JobDetails JSON such that it
now contains the number of tasks for each ExecutionState, including SCHEDULED,
DEPLOYING, CREATED and RECONCILING. These state will now also be displayed in the
web frontend.

Change MultipleJobsDetails to store a Collection<JobDetails> instead of JobDetails[]

Use MultipleJobsDetails#FIELD_NAME_ for serialization in CurrentJobsOverviewHandler

This closes #4688.
  • Loading branch information
tillrohrmann committed Sep 25, 2017
1 parent f1b2b83 commit e585aed
Show file tree
Hide file tree
Showing 21 changed files with 532 additions and 123 deletions.
Expand Up @@ -38,6 +38,8 @@
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
Expand All @@ -49,6 +51,7 @@
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -235,7 +238,6 @@ public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout)

@Override
public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
// TODO: return proper list of running jobs
return CompletableFuture.completedFuture(jobManagerRunners.keySet());
}

Expand All @@ -258,6 +260,23 @@ public CompletableFuture<StatusOverview> requestStatusOverview(Time timeout) {
8));
}

@Override
public CompletableFuture<MultipleJobsDetails> requestJobDetails(Time timeout) {
final int numberJobsRunning = jobManagerRunners.size();

ArrayList<CompletableFuture<JobDetails>> individualJobDetails = new ArrayList<>(numberJobsRunning);

for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) {
individualJobDetails.add(jobManagerRunner.getJobManagerGateway().requestJobDetails(timeout));
}

CompletableFuture<Collection<JobDetails>> combinedJobDetails = FutureUtils.combineAll(individualJobDetails);

return combinedJobDetails.thenApply(
(Collection<JobDetails> jobDetails) ->
new MultipleJobsDetails(jobDetails, null));
}

/**
* Cleans up the job related data from the dispatcher. If cleanupHA is true, then
* the data will also be removed from HA.
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
Expand Down Expand Up @@ -56,4 +57,6 @@ CompletableFuture<Collection<JobID>> listJobs(
@RpcTimeout Time timeout);

CompletableFuture<StatusOverview> requestStatusOverview(@RpcTimeout Time timeout);

CompletableFuture<MultipleJobsDetails> requestJobDetails(@RpcTimeout Time timeout);
}
Expand Up @@ -20,18 +20,21 @@

import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.RestServerEndpoint;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
Expand Down Expand Up @@ -93,6 +96,17 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
executor,
restConfiguration.getRefreshInterval()));

LegacyRestHandlerAdapter<DispatcherGateway, MultipleJobsDetails, EmptyMessageParameters> currentJobsOverviewHandler = new LegacyRestHandlerAdapter<>(
restAddressFuture,
leaderRetriever,
timeout,
CurrentJobsOverviewHandlerHeaders.getInstance(),
new CurrentJobsOverviewHandler(
executor,
timeout,
true,
true));

final File tmpDir = restConfiguration.getTmpDir();

Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
Expand All @@ -110,6 +124,7 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in

handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler));
handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigurationHandler));
handlers.add(Tuple2.of(CurrentJobsOverviewHandlerHeaders.getInstance(), currentJobsOverviewHandler));

optWebContent.ifPresent(
webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent)));
Expand Down
Expand Up @@ -75,7 +75,7 @@ protected void startClusterComponents(
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
rpcService,
DispatcherGateway.class,
uuid -> new DispatcherId(uuid),
DispatcherId::new,
10,
Time.milliseconds(50L));

Expand Down
Expand Up @@ -177,6 +177,10 @@ public JobManagerRunner(
}
}

public JobMasterGateway getJobManagerGateway() {
return jobManager.getSelfGateway(JobMasterGateway.class);
}

//----------------------------------------------------------------------------------------------
// Lifecycle management
//----------------------------------------------------------------------------------------------
Expand Down
Expand Up @@ -70,6 +70,7 @@
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateLocation;
Expand All @@ -90,6 +91,7 @@
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -718,6 +720,11 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) {
resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
}

@Override
public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(executionGraph), executor);
}

//----------------------------------------------------------------------------------------------
// Internal methods
//----------------------------------------------------------------------------------------------
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateServerAddress;
Expand Down Expand Up @@ -209,4 +210,6 @@ CompletableFuture<RegistrationResponse> registerTaskManager(
* @param resourceID unique id of the resource manager
*/
void heartbeatFromResourceManager(final ResourceID resourceID);

CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time timeout);
}

0 comments on commit e585aed

Please sign in to comment.