Skip to content

Commit

Permalink
[FLINK-7815] Remove grouping from MultipleJobsDetails
Browse files Browse the repository at this point in the history
With this commit the MultipleJobsDetails instance only contains a list of all jobs
which could be retrieved from the cluster. With this change it is the responsibility
of the web ui to group the jobs into running and finished jobs.

Adapt jobs.svc.coffee script to group list of jobs into running and finished jobs
  • Loading branch information
tillrohrmann committed Oct 21, 2017
1 parent 16e66a2 commit de55dff
Show file tree
Hide file tree
Showing 18 changed files with 70 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
Expand Down Expand Up @@ -703,9 +704,10 @@ public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Excepti
return responseFuture.thenApply((responseMessage) -> {
if (responseMessage instanceof MultipleJobsDetails) {
MultipleJobsDetails details = (MultipleJobsDetails) responseMessage;
Collection<JobStatusMessage> flattenedDetails = new ArrayList<>(details.getRunning().size() + details.getFinished().size());
details.getRunning().forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
details.getFinished().forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));

final Collection<JobDetails> jobDetails = details.getJobs();
Collection<JobStatusMessage> flattenedDetails = new ArrayList<>(jobDetails.size());
jobDetails.forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
return flattenedDetails;
} else {
throw new CompletionException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
Expand Down Expand Up @@ -206,11 +207,13 @@ public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Excepti
headers
);
return jobDetailsFuture
.thenApply(details -> {
Collection<JobStatusMessage> flattenedDetails = new ArrayList<>();
details.getRunning().forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
details.getFinished().forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
return flattenedDetails;
.thenApply(
(MultipleJobsDetails multipleJobsDetails) -> {
final Collection<JobDetails> jobDetails = multipleJobsDetails.getJobs();
Collection<JobStatusMessage> flattenedDetails = new ArrayList<>(jobDetails.size());
jobDetails.forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));

return flattenedDetails;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import org.junit.Assert;
import org.junit.Test;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -257,7 +257,7 @@ private static class TestListActorGateway extends TestActorGateway<RequestJobDet
public MultipleJobsDetails process(RequestJobDetails message) {
JobDetails running = new JobDetails(new JobID(), "job1", 0, 0, 0, JobStatus.RUNNING, 0, new int[9], 0);
JobDetails finished = new JobDetails(new JobID(), "job2", 0, 0, 0, JobStatus.FINISHED, 0, new int[9], 0);
return new MultipleJobsDetails(Collections.singleton(running), Collections.singleton(finished));
return new MultipleJobsDetails(Arrays.asList(running, finished));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@
import javax.annotation.Nonnull;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -314,7 +314,7 @@ private TestListJobsHandler() {
protected CompletableFuture<MultipleJobsDetails> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
JobDetails running = new JobDetails(new JobID(), "job1", 0, 0, 0, JobStatus.RUNNING, 0, new int[9], 0);
JobDetails finished = new JobDetails(new JobID(), "job2", 0, 0, 0, JobStatus.FINISHED, 0, new int[9], 0);
return CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.singleton(running), Collections.singleton(finished)));
return CompletableFuture.completedFuture(new MultipleJobsDetails(Arrays.asList(running, finished)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void testFullArchiveLifecycle() throws Exception {
String response = getFromHTTP(baseUrl + JobsOverviewHeaders.URL);
JsonNode overview = mapper.readTree(response);

String jobID = overview.get("finished").get(0).get("jid").asText();
String jobID = overview.get("jobs").get(0).get("jid").asText();
JsonNode jobDetails = mapper.readTree(getFromHTTP(baseUrl + "/jobs/" + jobID));
Assert.assertNotNull(jobDetails.get("jid"));
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,23 @@ angular.module('flinkApp')

$http.get flinkConfig.jobServer + "jobs/overview"
.success (data, status, headers, config) =>
angular.forEach data, (list, listKey) =>
switch listKey
when 'running' then jobs.running = @setEndTimes(list)
when 'finished' then jobs.finished = @setEndTimes(list)
when 'cancelled' then jobs.cancelled = @setEndTimes(list)
when 'failed' then jobs.failed = @setEndTimes(list)
# reset job fields
jobs.finished = []
jobs.running = []

# group the received list of jobs into running and finished jobs
_(data.jobs).groupBy(
(x) ->
switch x.state.toLowerCase()
when 'finished' then 'finished'
when 'failed' then 'finished'
when 'canceled' then 'finished'
else 'running')
.forEach((value, key) =>
switch key
when 'finished' then jobs.finished = @setEndTimes(value)
when 'running' then jobs.running = @setEndTimes(value))
.value(); # materialize the chain

deferred.resolve(jobs)
notifyObservers()
Expand Down
4 changes: 2 additions & 2 deletions flink-runtime-web/web-dashboard/web/js/hs/index.js

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions flink-runtime-web/web-dashboard/web/js/index.js

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ public CompletableFuture<MultipleJobsDetails> requestJobDetails(Time timeout) {

return combinedJobDetails.thenApply(
(Collection<JobDetails> jobDetails) ->
new MultipleJobsDetails(jobDetails, null));
new MultipleJobsDetails(jobDetails));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
package org.apache.flink.runtime.messages.webmonitor;

import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.util.Preconditions;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.collections.CollectionUtils;

import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;

/**
* An actor messages describing details of various jobs. This message is sent for example
Expand All @@ -37,38 +36,28 @@ public class MultipleJobsDetails implements ResponseBody, Serializable {

private static final long serialVersionUID = -1526236139616019127L;

public static final String FIELD_NAME_JOBS_RUNNING = "running";
public static final String FIELD_NAME_JOBS_FINISHED = "finished";
public static final String FIELD_NAME_JOBS = "jobs";

@JsonProperty(FIELD_NAME_JOBS_RUNNING)
private final Collection<JobDetails> running;

@JsonProperty(FIELD_NAME_JOBS_FINISHED)
private final Collection<JobDetails> finished;
@JsonProperty(FIELD_NAME_JOBS)
private final Collection<JobDetails> jobs;

@JsonCreator
public MultipleJobsDetails(
@JsonProperty(FIELD_NAME_JOBS_RUNNING) Collection<JobDetails> running,
@JsonProperty(FIELD_NAME_JOBS_FINISHED) Collection<JobDetails> finished) {
this.running = running == null ? Collections.emptyList() : running;
this.finished = finished == null ? Collections.emptyList() : finished;
@JsonProperty(FIELD_NAME_JOBS) Collection<JobDetails> jobs) {
this.jobs = Preconditions.checkNotNull(jobs);
}

// ------------------------------------------------------------------------

public Collection<JobDetails> getRunning() {
return running;
}

public Collection<JobDetails> getFinished() {
return finished;
public Collection<JobDetails> getJobs() {
return jobs;
}

@Override
public String toString() {
return "MultipleJobsDetails{" +
"running=" + running +
", finished=" + finished +
"jobs=" + jobs +
'}';
}

Expand All @@ -81,31 +70,12 @@ public boolean equals(Object o) {
return false;
}
MultipleJobsDetails that = (MultipleJobsDetails) o;

return CollectionUtils.isEqualCollection(running, that.running) &&
CollectionUtils.isEqualCollection(finished, that.finished);
return Objects.equals(jobs, that.jobs);
}

@Override
public int hashCode() {
// the hash code only depends on the collection elements, not the collection itself!
int result = 1;

Iterator<JobDetails> iterator = running.iterator();

while (iterator.hasNext()) {
JobDetails jobDetails = iterator.next();
result = 31 * result + (jobDetails == null ? 0 : jobDetails.hashCode());
}

iterator = finished.iterator();

while (iterator.hasNext()) {
JobDetails jobDetails = iterator.next();
result = 31 * result + (jobDetails == null ? 0 : jobDetails.hashCode());
}

return result;
return Objects.hash(jobs);
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,8 @@ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParam

gen.writeStartObject();

gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING);
for (JobDetails detail : result.getRunning()) {
jobDetailsSerializer.serialize(detail, gen, null);
}
gen.writeEndArray();

gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED);
for (JobDetails detail : result.getFinished()) {
gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS);
for (JobDetails detail : result.getJobs()) {
jobDetailsSerializer.serialize(detail, gen, null);
}
gen.writeEndArray();
Expand Down Expand Up @@ -126,9 +120,7 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
StringWriter writer = new StringWriter();
try (JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer)) {
gen.writeStartObject();
gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING);
gen.writeEndArray();
gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED);
gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS);

final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer();
jobDetailsSerializer.serialize(WebMonitorUtils.createDetailsForJob(graph), gen, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,7 @@ private void fetchMetrics() {
LOG.debug("Fetching of JobDetails failed.", throwable);
} else {
ArrayList<String> toRetain = new ArrayList<>();
for (JobDetails job : jobDetails.getRunning()) {
toRetain.add(job.getJobId().toString());
}
for (JobDetails job : jobDetails.getFinished()) {
for (JobDetails job : jobDetails.getJobs()) {
toRetain.add(job.getJobId().toString());
}
synchronized (metrics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1682,10 +1682,10 @@ class JobManager(

case msg : RequestJobDetails =>

val ourDetails: Array[JobDetails] = if (msg.shouldIncludeRunning()) {
val ourDetails: List[JobDetails] = if (msg.shouldIncludeRunning()) {
currentJobs.values.map {
v => WebMonitorUtils.createDetailsForJob(v._1)
}.toArray[JobDetails]
}.toList
} else {
null
}
Expand All @@ -1695,11 +1695,10 @@ class JobManager(
future.onSuccess {
case archiveDetails: MultipleJobsDetails =>
theSender ! new MultipleJobsDetails(
util.Arrays.asList(ourDetails: _*),
archiveDetails.getFinished())
(ourDetails ++ archiveDetails.getJobs.asScala).asJavaCollection)
}(context.dispatcher)
} else {
theSender ! new MultipleJobsDetails(util.Arrays.asList(ourDetails: _*), null)
theSender ! new MultipleJobsDetails(util.Arrays.asList(ourDetails: _*))
}

case _ => log.error("Unrecognized info message " + actorMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class MemoryArchivist(
v => WebMonitorUtils.createDetailsForJob(v)
}.toArray[JobDetails]

theSender ! decorateMessage(new MultipleJobsDetails(null, util.Arrays.asList(details: _*)))
theSender ! decorateMessage(new MultipleJobsDetails(util.Arrays.asList(details: _*)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;

import org.junit.Test;

import java.util.ArrayList;
Expand All @@ -38,7 +39,7 @@
import java.util.List;
import java.util.Random;

import static org.junit.Assert.*;
import static org.junit.Assert.fail;

public class WebMonitorMessagesTest {

Expand Down Expand Up @@ -103,7 +104,7 @@ public void testMultipleJobDetails() {
try {
final Random rnd = new Random();
GenericMessageTester.testMessageInstance(
new MultipleJobsDetails(randomJobDetails(rnd), randomJobDetails(rnd)));
new MultipleJobsDetails(randomJobDetails(rnd)));
}
catch (Exception e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;

import java.util.Collections;
import java.util.Arrays;

import static org.junit.Assert.assertEquals;

Expand Down Expand Up @@ -72,8 +72,7 @@ public void testMultipleJobsDetailsMarshalling() throws JsonProcessingException
4);

final MultipleJobsDetails expected = new MultipleJobsDetails(
Collections.singleton(running),
Collections.singleton(finished));
Arrays.asList(running, finished));

final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,10 @@ public void testArchiver() throws Exception {
Assert.assertEquals(JobsOverviewHeaders.URL, archive.getPath());

JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(archive.getJson());
ArrayNode running = (ArrayNode) result.get("running");
Assert.assertEquals(0, running.size());
ArrayNode jobs = (ArrayNode) result.get("jobs");
Assert.assertEquals(1, jobs.size());

ArrayNode finished = (ArrayNode) result.get("finished");
Assert.assertEquals(1, finished.size());

compareJobOverview(expectedDetails, finished.get(0).toString());
compareJobOverview(expectedDetails, jobs.get(0).toString());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testUpdate() throws Exception {
JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);

when(jobManagerGateway.requestJobDetails(any(Time.class)))
.thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList(), Collections.emptyList())));
.thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList())));
when(jobManagerGateway.requestTaskManagerInstances(any(Time.class)))
.thenReturn(CompletableFuture.completedFuture(Collections.singleton(taskManager)));
when(jobManagerGateway.getAddress()).thenReturn("/jm/address");
Expand Down

0 comments on commit de55dff

Please sign in to comment.