diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 153ee53ad4800..8af343490a438 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -38,6 +38,8 @@ import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.messages.webmonitor.StatusOverview; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -49,6 +51,7 @@ import org.apache.flink.util.Preconditions; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -235,7 +238,6 @@ public CompletableFuture submitJob(JobGraph jobGraph, Time timeout) @Override public CompletableFuture> listJobs(Time timeout) { - // TODO: return proper list of running jobs return CompletableFuture.completedFuture(jobManagerRunners.keySet()); } @@ -258,6 +260,23 @@ public CompletableFuture requestStatusOverview(Time timeout) { 8)); } + @Override + public CompletableFuture requestJobDetails(Time timeout) { + final int numberJobsRunning = jobManagerRunners.size(); + + ArrayList> individualJobDetails = new ArrayList<>(numberJobsRunning); + + for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) { + individualJobDetails.add(jobManagerRunner.getJobManagerGateway().requestJobDetails(timeout)); + } + + CompletableFuture> combinedJobDetails = FutureUtils.combineAll(individualJobDetails); + + return combinedJobDetails.thenApply( + (Collection jobDetails) -> + new MultipleJobsDetails(jobDetails, null)); + } + /** * Cleans up the job related data from the dispatcher. If cleanupHA is true, then * the data will also be removed from HA. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java index ee5484e13dc49..6aaf0b6d2580a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.messages.webmonitor.StatusOverview; import org.apache.flink.runtime.rpc.FencedRpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; @@ -56,4 +57,6 @@ CompletableFuture> listJobs( @RpcTimeout Time timeout); CompletableFuture requestStatusOverview(@RpcTimeout Time timeout); + + CompletableFuture requestJobDetails(@RpcTimeout Time timeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index 6054a7d5f07bb..dff5df836d29c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -20,18 +20,21 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.rest.RestServerEndpoint; import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler; +import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler; import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler; import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler; import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification; import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration; import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion; import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; +import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders; import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; @@ -93,6 +96,17 @@ protected Collection> in executor, restConfiguration.getRefreshInterval())); + LegacyRestHandlerAdapter currentJobsOverviewHandler = new LegacyRestHandlerAdapter<>( + restAddressFuture, + leaderRetriever, + timeout, + CurrentJobsOverviewHandlerHeaders.getInstance(), + new CurrentJobsOverviewHandler( + executor, + timeout, + true, + true)); + final File tmpDir = restConfiguration.getTmpDir(); Optional> optWebContent; @@ -110,6 +124,7 @@ protected Collection> in handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler)); handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigurationHandler)); + handlers.add(Tuple2.of(CurrentJobsOverviewHandlerHeaders.getInstance(), currentJobsOverviewHandler)); optWebContent.ifPresent( webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent))); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java index e394854cd5954..c4da6dbc10702 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java @@ -75,7 +75,7 @@ protected void startClusterComponents( LeaderGatewayRetriever dispatcherGatewayRetriever = new RpcGatewayRetriever<>( rpcService, DispatcherGateway.class, - uuid -> new DispatcherId(uuid), + DispatcherId::new, 10, Time.milliseconds(50L)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index 6f5a082bf40cd..0bf0cc2ebd315 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -177,6 +177,10 @@ public JobManagerRunner( } } + public JobMasterGateway getJobManagerGateway() { + return jobManager.getSelfGateway(JobMasterGateway.class); + } + //---------------------------------------------------------------------------------------------- // Lifecycle management //---------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 343fbf6fd5a08..19fe4a645e85f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -70,6 +70,7 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.query.KvStateID; import org.apache.flink.runtime.query.KvStateLocation; @@ -90,6 +91,7 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; @@ -718,6 +720,11 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) { resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null); } + @Override + public CompletableFuture requestJobDetails(Time timeout) { + return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(executionGraph), executor); + } + //---------------------------------------------------------------------------------------------- // Internal methods //---------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 965d88d272e3c..c2fba470d9a80 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.query.KvStateID; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateServerAddress; @@ -209,4 +210,6 @@ CompletableFuture registerTaskManager( * @param resourceID unique id of the resource manager */ void heartbeatFromResourceManager(final ResourceID resourceID); + + CompletableFuture requestJobDetails(@RpcTimeout Time timeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java index 31ea516041bdd..2aca75bbe11ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java @@ -19,8 +19,21 @@ package org.apache.flink.runtime.messages.webmonitor; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.jobgraph.JobStatus; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; + +import java.io.IOException; +import java.io.Serializable; import java.util.Arrays; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -28,40 +41,58 @@ /** * An actor message with a detailed overview of the current status of a job. */ -public class JobDetails implements java.io.Serializable { +@JsonSerialize(using = JobDetails.JobDetailsSerializer.class) +@JsonDeserialize(using = JobDetails.JobDetailsDeserializer.class) +public class JobDetails implements Serializable { private static final long serialVersionUID = -3391462110304948766L; - + + private static final String FIELD_NAME_JOB_ID = "jid"; + private static final String FIELD_NAME_JOB_NAME = "name"; + private static final String FIELD_NAME_START_TIME = "start-time"; + private static final String FIELD_NAME_END_TIME = "end-time"; + private static final String FIELD_NAME_DURATION = "duration"; + private static final String FIELD_NAME_STATUS = "state"; + private static final String FIELD_NAME_LAST_MODIFICATION = "last-modification"; + private static final String FIELD_NAME_TOTAL_NUMBER_TASKS = "total"; + private final JobID jobId; - + private final String jobName; - + private final long startTime; - + private final long endTime; - + + private final long duration; + private final JobStatus status; - + private final long lastUpdateTime; - private final int[] numVerticesPerExecutionState; + private final int[] tasksPerState; private final int numTasks; - - public JobDetails(JobID jobId, String jobName, - long startTime, long endTime, - JobStatus status, - long lastUpdateTime, - int[] numVerticesPerExecutionState, int numTasks) { - + public JobDetails( + JobID jobId, + String jobName, + long startTime, + long endTime, + long duration, + JobStatus status, + long lastUpdateTime, + int[] tasksPerState, + int numTasks) { + this.jobId = checkNotNull(jobId); this.jobName = checkNotNull(jobName); this.startTime = startTime; this.endTime = endTime; + this.duration = duration; this.status = checkNotNull(status); this.lastUpdateTime = lastUpdateTime; - this.numVerticesPerExecutionState = checkNotNull(numVerticesPerExecutionState); + this.tasksPerState = checkNotNull(tasksPerState); this.numTasks = numTasks; } @@ -83,6 +114,10 @@ public long getEndTime() { return endTime; } + public long getDuration() { + return duration; + } + public JobStatus getStatus() { return status; } @@ -95,8 +130,8 @@ public int getNumTasks() { return numTasks; } - public int[] getNumVerticesPerExecutionState() { - return numVerticesPerExecutionState; + public int[] getTasksPerState() { + return tasksPerState; } // ------------------------------------------------------------------------ @@ -116,7 +151,7 @@ else if (o != null && o.getClass() == JobDetails.class) { this.status == that.status && this.jobId.equals(that.jobId) && this.jobName.equals(that.jobName) && - Arrays.equals(this.numVerticesPerExecutionState, that.numVerticesPerExecutionState); + Arrays.equals(this.tasksPerState, that.tasksPerState); } else { return false; @@ -131,7 +166,7 @@ public int hashCode() { result = 31 * result + (int) (endTime ^ (endTime >>> 32)); result = 31 * result + status.hashCode(); result = 31 * result + (int) (lastUpdateTime ^ (lastUpdateTime >>> 32)); - result = 31 * result + Arrays.hashCode(numVerticesPerExecutionState); + result = 31 * result + Arrays.hashCode(tasksPerState); result = 31 * result + numTasks; return result; } @@ -145,8 +180,89 @@ public String toString() { ", endTime=" + endTime + ", status=" + status + ", lastUpdateTime=" + lastUpdateTime + - ", numVerticesPerExecutionState=" + Arrays.toString(numVerticesPerExecutionState) + + ", numVerticesPerExecutionState=" + Arrays.toString(tasksPerState) + ", numTasks=" + numTasks + '}'; } + + public static final class JobDetailsSerializer extends StdSerializer { + private static final long serialVersionUID = 7915913423515194428L; + + public JobDetailsSerializer() { + super(JobDetails.class); + } + + @Override + public void serialize( + JobDetails jobDetails, + JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeStartObject(); + + jsonGenerator.writeStringField(FIELD_NAME_JOB_ID, jobDetails.getJobId().toString()); + jsonGenerator.writeStringField(FIELD_NAME_JOB_NAME, jobDetails.getJobName()); + jsonGenerator.writeStringField(FIELD_NAME_STATUS, jobDetails.getStatus().name()); + + jsonGenerator.writeNumberField(FIELD_NAME_START_TIME, jobDetails.getStartTime()); + jsonGenerator.writeNumberField(FIELD_NAME_END_TIME, jobDetails.getEndTime()); + jsonGenerator.writeNumberField(FIELD_NAME_DURATION, jobDetails.getDuration()); + jsonGenerator.writeNumberField(FIELD_NAME_LAST_MODIFICATION, jobDetails.getLastUpdateTime()); + + jsonGenerator.writeObjectFieldStart("tasks"); + jsonGenerator.writeNumberField(FIELD_NAME_TOTAL_NUMBER_TASKS, jobDetails.getNumTasks()); + + final int[] perState = jobDetails.getTasksPerState(); + + for (ExecutionState executionState : ExecutionState.values()) { + jsonGenerator.writeNumberField(executionState.name().toLowerCase(), perState[executionState.ordinal()]); + } + + jsonGenerator.writeEndObject(); + + jsonGenerator.writeEndObject(); + } + } + + public static final class JobDetailsDeserializer extends StdDeserializer { + + private static final long serialVersionUID = 6089784742093294800L; + + public JobDetailsDeserializer() { + super(JobDetails.class); + } + + @Override + public JobDetails deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException { + + JsonNode rootNode = jsonParser.readValueAsTree(); + + JobID jobId = JobID.fromHexString(rootNode.get(FIELD_NAME_JOB_ID).textValue()); + String jobName = rootNode.get(FIELD_NAME_JOB_NAME).textValue(); + long startTime = rootNode.get(FIELD_NAME_START_TIME).longValue(); + long endTime = rootNode.get(FIELD_NAME_END_TIME).longValue(); + long duration = rootNode.get(FIELD_NAME_DURATION).longValue(); + JobStatus jobStatus = JobStatus.valueOf(rootNode.get(FIELD_NAME_STATUS).textValue()); + long lastUpdateTime = rootNode.get(FIELD_NAME_LAST_MODIFICATION).longValue(); + + JsonNode tasksNode = rootNode.get("tasks"); + int numTasks = tasksNode.get(FIELD_NAME_TOTAL_NUMBER_TASKS).intValue(); + + int[] numVerticesPerExecutionState = new int[ExecutionState.values().length]; + + for (ExecutionState executionState : ExecutionState.values()) { + numVerticesPerExecutionState[executionState.ordinal()] = tasksNode.get(executionState.name().toLowerCase()).intValue(); + } + + return new JobDetails( + jobId, + jobName, + startTime, + endTime, + duration, + jobStatus, + lastUpdateTime, + numVerticesPerExecutionState, + numTasks); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java index 47de58abff242..31eb1cf747a42 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java @@ -18,64 +18,95 @@ package org.apache.flink.runtime.messages.webmonitor; -import java.util.Arrays; +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.commons.collections.CollectionUtils; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; /** * An actor messages describing details of various jobs. This message is sent for example - * in response to the {@link org.apache.flink.runtime.messages.webmonitor.RequestJobDetails} - * message. + * in response to the {@link RequestJobDetails} message. */ -public class MultipleJobsDetails implements java.io.Serializable { +public class MultipleJobsDetails implements ResponseBody, Serializable { private static final long serialVersionUID = -1526236139616019127L; - private static final JobDetails[] EMPTY = new JobDetails[0]; - - private final JobDetails[] runningJobs; - private final JobDetails[] finishedJobs; + public static final String FIELD_NAME_JOBS_RUNNING = "running"; + public static final String FIELD_NAME_JOBS_FINISHED = "finished"; - public MultipleJobsDetails(JobDetails[] running, JobDetails[] finished) { - this.runningJobs = running == null ? EMPTY : running; - this.finishedJobs = finished == null ? EMPTY : finished; + @JsonProperty(FIELD_NAME_JOBS_RUNNING) + private final Collection running; + + @JsonProperty(FIELD_NAME_JOBS_FINISHED) + private final Collection finished; + + @JsonCreator + public MultipleJobsDetails( + @JsonProperty(FIELD_NAME_JOBS_RUNNING) Collection running, + @JsonProperty(FIELD_NAME_JOBS_FINISHED) Collection finished) { + this.running = running == null ? Collections.emptyList() : running; + this.finished = finished == null ? Collections.emptyList() : finished; } // ------------------------------------------------------------------------ - public JobDetails[] getRunningJobs() { - return runningJobs; + public Collection getRunning() { + return running; } - public JobDetails[] getFinishedJobs() { - return finishedJobs; + public Collection getFinished() { + return finished; } - // ------------------------------------------------------------------------ - @Override - public int hashCode() { - return Arrays.deepHashCode(runningJobs) + Arrays.deepHashCode(finishedJobs); + public String toString() { + return "MultipleJobsDetails{" + + "running=" + running + + ", finished=" + finished + + '}'; } @Override - public boolean equals(Object obj) { - if (obj == this) { + public boolean equals(Object o) { + if (this == o) { return true; } - else if (obj instanceof MultipleJobsDetails) { - MultipleJobsDetails that = (MultipleJobsDetails) obj; - return Arrays.deepEquals(this.runningJobs, that.runningJobs) && - Arrays.deepEquals(this.finishedJobs, that.finishedJobs); - } - else { + if (o == null || getClass() != o.getClass()) { return false; } + MultipleJobsDetails that = (MultipleJobsDetails) o; + + return CollectionUtils.isEqualCollection(running, that.running) && + CollectionUtils.isEqualCollection(finished, that.finished); } @Override - public String toString() { - return "MultipleJobsDetails {" + - "running=" + Arrays.toString(runningJobs) + - ", finished=" + Arrays.toString(finishedJobs) + - '}'; + public int hashCode() { + // the hash code only depends on the collection elements, not the collection itself! + int result = 1; + + Iterator iterator = running.iterator(); + + while (iterator.hasNext()) { + JobDetails jobDetails = iterator.next(); + result = 31 * result + (jobDetails == null ? 0 : jobDetails.hashCode()); + } + + iterator = finished.iterator(); + + while (iterator.hasNext()) { + JobDetails jobDetails = iterator.next(); + result = 31 * result + (jobDetails == null ? 0 : jobDetails.hashCode()); + } + + return result; } + + // ------------------------------------------------------------------------ } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java index 480c9e8b6b2eb..794ff20a1bbef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java @@ -50,8 +50,6 @@ */ public class ClusterOverviewHandler extends AbstractJsonRequestHandler implements LegacyRestHandler { - - private static final String version = EnvironmentInformation.getVersion(); private static final String commitID = EnvironmentInformation.getRevisionInformation().commitId; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java index 6f8532085228a..b1939e5ff9c47 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java @@ -21,11 +21,15 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.LegacyRestHandler; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; @@ -45,7 +49,7 @@ /** * Request handler that returns a summary of the job status. */ -public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler { +public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler implements LegacyRestHandler { private static final String ALL_JOBS_REST_PATH = "/joboverview"; private static final String RUNNING_JOBS_REST_PATH = "/joboverview/running"; @@ -68,6 +72,11 @@ public CurrentJobsOverviewHandler( this.includeFinishedJobs = includeFinishedJobs; } + @Override + public CompletableFuture handleRequest(HandlerRequest request, DispatcherGateway gateway) { + return gateway.requestJobDetails(timeout); + } + @Override public String[] getPaths() { if (includeRunningJobs && includeFinishedJobs) { @@ -92,24 +101,26 @@ public CompletableFuture handleJsonRequest(Map pathParam StringWriter writer = new StringWriter(); try { JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer(); + gen.writeStartObject(); if (includeRunningJobs && includeFinishedJobs) { - gen.writeArrayFieldStart("running"); - for (JobDetails detail : result.getRunningJobs()) { - writeJobDetailOverviewAsJson(detail, gen, now); + gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING); + for (JobDetails detail : result.getRunning()) { + jobDetailsSerializer.serialize(detail, gen, null); } gen.writeEndArray(); - gen.writeArrayFieldStart("finished"); - for (JobDetails detail : result.getFinishedJobs()) { - writeJobDetailOverviewAsJson(detail, gen, now); + gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED); + for (JobDetails detail : result.getFinished()) { + jobDetailsSerializer.serialize(detail, gen, null); } gen.writeEndArray(); } else { gen.writeArrayFieldStart("jobs"); - for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) { - writeJobDetailOverviewAsJson(detail, gen, now); + for (JobDetails detail : includeRunningJobs ? result.getRunning() : result.getFinished()) { + jobDetailsSerializer.serialize(detail, gen, null); } gen.writeEndArray(); } @@ -138,10 +149,13 @@ public Collection archiveJsonWithPath(AccessExecutionGraph graph) StringWriter writer = new StringWriter(); try (JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer)) { gen.writeStartObject(); - gen.writeArrayFieldStart("running"); + gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING); gen.writeEndArray(); - gen.writeArrayFieldStart("finished"); - writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis()); + gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED); + + final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer(); + jobDetailsSerializer.serialize(WebMonitorUtils.createDetailsForJob(graph), gen, null); + gen.writeEndArray(); gen.writeEndObject(); } @@ -150,33 +164,4 @@ public Collection archiveJsonWithPath(AccessExecutionGraph graph) return Collections.singleton(new ArchivedJson(path, json)); } } - - public static void writeJobDetailOverviewAsJson(JobDetails details, JsonGenerator gen, long now) throws IOException { - gen.writeStartObject(); - - gen.writeStringField("jid", details.getJobId().toString()); - gen.writeStringField("name", details.getJobName()); - gen.writeStringField("state", details.getStatus().name()); - - gen.writeNumberField("start-time", details.getStartTime()); - gen.writeNumberField("end-time", details.getEndTime()); - gen.writeNumberField("duration", (details.getEndTime() <= 0 ? now : details.getEndTime()) - details.getStartTime()); - gen.writeNumberField("last-modification", details.getLastUpdateTime()); - - gen.writeObjectFieldStart("tasks"); - gen.writeNumberField("total", details.getNumTasks()); - - final int[] perState = details.getNumVerticesPerExecutionState(); - gen.writeNumberField("pending", perState[ExecutionState.CREATED.ordinal()] + - perState[ExecutionState.SCHEDULED.ordinal()] + - perState[ExecutionState.DEPLOYING.ordinal()]); - gen.writeNumberField("running", perState[ExecutionState.RUNNING.ordinal()]); - gen.writeNumberField("finished", perState[ExecutionState.FINISHED.ordinal()]); - gen.writeNumberField("canceling", perState[ExecutionState.CANCELING.ordinal()]); - gen.writeNumberField("canceled", perState[ExecutionState.CANCELED.ordinal()]); - gen.writeNumberField("failed", perState[ExecutionState.FAILED.ordinal()]); - gen.writeEndObject(); - - gen.writeEndObject(); - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java index 9f53808c76a61..c114ee6fb8645 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java @@ -113,10 +113,10 @@ private void fetchMetrics() { LOG.debug("Fetching of JobDetails failed.", throwable); } else { ArrayList toRetain = new ArrayList<>(); - for (JobDetails job : jobDetails.getRunningJobs()) { + for (JobDetails job : jobDetails.getRunning()) { toRetain.add(job.getJobId().toString()); } - for (JobDetails job : jobDetails.getFinishedJobs()) { + for (JobDetails job : jobDetails.getFinished()) { toRetain.add(job.getJobId().toString()); } synchronized (metrics) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CurrentJobsOverviewHandlerHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CurrentJobsOverviewHandlerHeaders.java new file mode 100644 index 0000000000000..f97c601257201 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CurrentJobsOverviewHandlerHeaders.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for {@link CurrentJobsOverviewHandler}. + */ +public final class CurrentJobsOverviewHandlerHeaders implements MessageHeaders { + + private static final CurrentJobsOverviewHandlerHeaders INSTANCE = new CurrentJobsOverviewHandlerHeaders(); + + // make this class a singleton + private CurrentJobsOverviewHandlerHeaders() {} + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return "/joboverview"; + } + + @Override + public Class getResponseClass() { + return MultipleJobsDetails.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public EmptyMessageParameters getUnresolvedMessageParameters() { + return EmptyMessageParameters.getInstance(); + } + + public static CurrentJobsOverviewHandlerHeaders getInstance() { + return INSTANCE; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java index e0f18230bcec0..0accab755763a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java @@ -253,6 +253,7 @@ public static JobDetails createDetailsForJob(AccessExecutionGraph job) { long started = job.getStatusTimestamp(JobStatus.CREATED); long finished = status.isGloballyTerminalState() ? job.getStatusTimestamp(status) : -1L; + long duration = (finished >= 0L ? finished : System.currentTimeMillis()) - started; int[] countsPerStatus = new int[ExecutionState.values().length]; long lastChanged = 0; @@ -271,9 +272,16 @@ public static JobDetails createDetailsForJob(AccessExecutionGraph job) { lastChanged = Math.max(lastChanged, finished); - return new JobDetails(job.getJobID(), job.getJobName(), - started, finished, status, lastChanged, - countsPerStatus, numTotalTasks); + return new JobDetails( + job.getJobID(), + job.getJobName(), + started, + finished, + duration, + status, + lastChanged, + countsPerStatus, + numTotalTasks); } /** diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 67ffb32a201c6..76707b5dc8696 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager import java.io.IOException import java.net._ +import java.util import java.util.UUID import java.util.concurrent.{TimeUnit, Future => _, TimeoutException => _, _} import java.util.function.{BiFunction, Consumer} @@ -1694,10 +1695,12 @@ class JobManager( val future = (archive ? msg)(timeout) future.onSuccess { case archiveDetails: MultipleJobsDetails => - theSender ! new MultipleJobsDetails(ourDetails, archiveDetails.getFinishedJobs()) + theSender ! new MultipleJobsDetails( + util.Arrays.asList(ourDetails: _*), + archiveDetails.getFinished()) }(context.dispatcher) } else { - theSender ! new MultipleJobsDetails(ourDetails, null) + theSender ! new MultipleJobsDetails(util.Arrays.asList(ourDetails: _*), null) } case _ => log.error("Unrecognized info message " + actorMessage) diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala index 327e2a37f0b56..c963238cfee59 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala @@ -189,7 +189,7 @@ class MemoryArchivist( v => WebMonitorUtils.createDetailsForJob(v) }.toArray[JobDetails] - theSender ! decorateMessage(new MultipleJobsDetails(null, details)) + theSender ! decorateMessage(new MultipleJobsDetails(null, util.Arrays.asList(details: _*))) } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java index 8d281f71329b4..f5f497662dc86 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java @@ -33,6 +33,8 @@ import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Random; @@ -85,8 +87,8 @@ public void testJobDetailsMessage() { JobID jid = GenericMessageTester.randomJobId(rnd); JobStatus status = GenericMessageTester.randomJobStatus(rnd); - JobDetails msg1 = new JobDetails(jid, name, time, endTime, status, lastModified, numVerticesPerState, numTotal); - JobDetails msg2 = new JobDetails(jid, name, time, endTime, status, lastModified, numVerticesPerState, numTotal); + JobDetails msg1 = new JobDetails(jid, name, time, endTime, endTime - time, status, lastModified, numVerticesPerState, numTotal); + JobDetails msg2 = new JobDetails(jid, name, time, endTime, endTime - time, status, lastModified, numVerticesPerState, numTotal); GenericMessageTester.testMessageInstances(msg1, msg2); } @@ -120,7 +122,7 @@ private static List randomIds(Random rnd) { return ids; } - private JobDetails[] randomJobDetails(Random rnd) { + private Collection randomJobDetails(Random rnd) { final JobDetails[] details = new JobDetails[rnd.nextInt(10)]; for (int k = 0; k < details.length; k++) { int[] numVerticesPerState = new int[ExecutionState.values().length]; @@ -140,8 +142,8 @@ private JobDetails[] randomJobDetails(Random rnd) { JobID jid = new JobID(); JobStatus status = JobStatus.values()[rnd.nextInt(JobStatus.values().length)]; - details[k] = new JobDetails(jid, name, time, endTime, status, lastModified, numVerticesPerState, numTotal); + details[k] = new JobDetails(jid, name, time, endTime, endTime - time, status, lastModified, numVerticesPerState, numTotal); } - return details; + return Arrays.asList(details); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java new file mode 100644 index 0000000000000..96a97a71d1490 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.messages.webmonitor; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.util.TestLogger; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link JobDetails}. + */ +public class JobDetailsTest extends TestLogger { + + /** + * Tests that we can marshal and unmarshal JobDetails instances. + */ + @Test + public void testJobDetailsMarshalling() throws JsonProcessingException { + final JobDetails expected = new JobDetails( + new JobID(), + "foobar", + 1L, + 10L, + 9L, + JobStatus.RUNNING, + 8L, + new int[]{1, 3, 3, 7, 4, 2, 7, 3, 3}, + 42); + + final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); + + final JsonNode marshalled = objectMapper.valueToTree(expected); + + final JobDetails unmarshalled = objectMapper.treeToValue(marshalled, JobDetails.class); + + assertEquals(expected, unmarshalled); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetailsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetailsTest.java new file mode 100644 index 0000000000000..23f012a85b448 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetailsTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.messages.webmonitor; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.util.TestLogger; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Test; + +import java.util.Collections; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link MultipleJobsDetails} class. + */ +public class MultipleJobsDetailsTest extends TestLogger { + + /** + * Tests that we can un/marshal {@link MultipleJobsDetails} objects. + */ + @Test + public void testMultipleJobsDetailsMarshalling() throws JsonProcessingException { + int[] verticesPerState = new int[ExecutionState.values().length]; + + for (int i = 0; i < verticesPerState.length; i++) { + verticesPerState[i] = i; + } + + final JobDetails running = new JobDetails( + new JobID(), + "running", + 1L, + -1L, + 9L, + JobStatus.RUNNING, + 9L, + verticesPerState, + 9); + + final JobDetails finished = new JobDetails( + new JobID(), + "finished", + 1L, + 5L, + 4L, + JobStatus.FINISHED, + 8L, + verticesPerState, + 4); + + final MultipleJobsDetails expected = new MultipleJobsDetails( + Collections.singleton(running), + Collections.singleton(finished)); + + final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); + + final JsonNode marshalled = objectMapper.valueToTree(expected); + + final MultipleJobsDetails unmarshalled = objectMapper.treeToValue(marshalled, MultipleJobsDetails.class); + + assertEquals(expected, unmarshalled); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java index 83bb157009bf0..c9326c1736053 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java @@ -89,7 +89,8 @@ public void testJsonGeneration() throws Exception { JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob); StringWriter writer = new StringWriter(); try (JsonGenerator gen = ArchivedJobGenerationUtils.JACKSON_FACTORY.createGenerator(writer)) { - CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(expectedDetails, gen, 0); + JobDetails.JobDetailsSerializer serializer = new JobDetails.JobDetailsSerializer(); + serializer.serialize(expectedDetails, gen, null); } compareJobOverview(expectedDetails, writer.toString()); } @@ -108,14 +109,10 @@ private static void compareJobOverview(JobDetails expectedDetails, String answer JsonNode tasks = result.get("tasks"); Assert.assertEquals(expectedDetails.getNumTasks(), tasks.get("total").asInt()); - int[] tasksPerState = expectedDetails.getNumVerticesPerExecutionState(); - Assert.assertEquals( - tasksPerState[ExecutionState.CREATED.ordinal()] + tasksPerState[ExecutionState.SCHEDULED.ordinal()] + tasksPerState[ExecutionState.DEPLOYING.ordinal()], - tasks.get("pending").asInt()); - Assert.assertEquals(tasksPerState[ExecutionState.RUNNING.ordinal()], tasks.get("running").asInt()); - Assert.assertEquals(tasksPerState[ExecutionState.FINISHED.ordinal()], tasks.get("finished").asInt()); - Assert.assertEquals(tasksPerState[ExecutionState.CANCELING.ordinal()], tasks.get("canceling").asInt()); - Assert.assertEquals(tasksPerState[ExecutionState.CANCELED.ordinal()], tasks.get("canceled").asInt()); - Assert.assertEquals(tasksPerState[ExecutionState.FAILED.ordinal()], tasks.get("failed").asInt()); + int[] tasksPerState = expectedDetails.getTasksPerState(); + + for (ExecutionState executionState : ExecutionState.values()) { + Assert.assertEquals(tasksPerState[executionState.ordinal()], tasks.get(executionState.name().toLowerCase()).asInt()); + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java index b278979ae2596..e513dd9527655 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java @@ -91,7 +91,7 @@ public void testUpdate() throws Exception { JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); when(jobManagerGateway.requestJobDetails(anyBoolean(), anyBoolean(), any(Time.class))) - .thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(new JobDetails[0], new JobDetails[0]))); + .thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList(), Collections.emptyList()))); when(jobManagerGateway.requestTaskManagerInstances(any(Time.class))) .thenReturn(CompletableFuture.completedFuture(Collections.singleton(taskManager))); when(jobManagerGateway.getAddress()).thenReturn("/jm/address");