Skip to content

Commit

Permalink
[FLINK-7815] Remove grouping from MultipleJobsDetails
Browse files Browse the repository at this point in the history
With this commit the MultipleJobsDetails instance only contains a list of all jobs
which could be retrieved from the cluster. With this change it is the responsibility
of the web ui to group the jobs into running and finished jobs.

Adapt jobs.svc.coffee script to group list of jobs into running and finished jobs
  • Loading branch information
tillrohrmann committed Oct 12, 2017
1 parent cfd5bbc commit 8d642ee
Show file tree
Hide file tree
Showing 12 changed files with 49 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,23 @@ angular.module('flinkApp')

$http.get flinkConfig.jobServer + "jobs/overview"
.success (data, status, headers, config) =>
angular.forEach data, (list, listKey) =>
switch listKey
when 'running' then jobs.running = @setEndTimes(list)
when 'finished' then jobs.finished = @setEndTimes(list)
when 'cancelled' then jobs.cancelled = @setEndTimes(list)
when 'failed' then jobs.failed = @setEndTimes(list)
# reset job fields
jobs.finished = []
jobs.running = []

# group the received list of jobs into running and finished jobs
_(data.jobs).groupBy(
(x) ->
switch x.state.toLowerCase()
when 'finished' then 'finished'
when 'failed' then 'finished'
when 'canceled' then 'finished'
else 'running')
.forEach((value, key) =>
switch key
when 'finished' then jobs.finished = @setEndTimes(value)
when 'running' then jobs.running = @setEndTimes(value))
.value(); # materialize the chain

deferred.resolve(jobs)
notifyObservers()
Expand Down
4 changes: 2 additions & 2 deletions flink-runtime-web/web-dashboard/web/js/hs/index.js

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions flink-runtime-web/web-dashboard/web/js/index.js

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public CompletableFuture<MultipleJobsDetails> requestJobDetails(Time timeout) {

return combinedJobDetails.thenApply(
(Collection<JobDetails> jobDetails) ->
new MultipleJobsDetails(jobDetails, null));
new MultipleJobsDetails(jobDetails));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
package org.apache.flink.runtime.messages.webmonitor;

import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.util.Preconditions;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.collections.CollectionUtils;

import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;

/**
* An actor messages describing details of various jobs. This message is sent for example
Expand All @@ -37,38 +36,28 @@ public class MultipleJobsDetails implements ResponseBody, Serializable {

private static final long serialVersionUID = -1526236139616019127L;

public static final String FIELD_NAME_JOBS_RUNNING = "running";
public static final String FIELD_NAME_JOBS_FINISHED = "finished";
public static final String FIELD_NAME_JOBS = "jobs";

@JsonProperty(FIELD_NAME_JOBS_RUNNING)
private final Collection<JobDetails> running;

@JsonProperty(FIELD_NAME_JOBS_FINISHED)
private final Collection<JobDetails> finished;
@JsonProperty(FIELD_NAME_JOBS)
private final Collection<JobDetails> jobs;

@JsonCreator
public MultipleJobsDetails(
@JsonProperty(FIELD_NAME_JOBS_RUNNING) Collection<JobDetails> running,
@JsonProperty(FIELD_NAME_JOBS_FINISHED) Collection<JobDetails> finished) {
this.running = running == null ? Collections.emptyList() : running;
this.finished = finished == null ? Collections.emptyList() : finished;
@JsonProperty(FIELD_NAME_JOBS) Collection<JobDetails> jobs) {
this.jobs = Preconditions.checkNotNull(jobs);
}

// ------------------------------------------------------------------------

public Collection<JobDetails> getRunning() {
return running;
}

public Collection<JobDetails> getFinished() {
return finished;
public Collection<JobDetails> getJobs() {
return jobs;
}

@Override
public String toString() {
return "MultipleJobsDetails{" +
"running=" + running +
", finished=" + finished +
"jobs=" + jobs +
'}';
}

Expand All @@ -81,31 +70,12 @@ public boolean equals(Object o) {
return false;
}
MultipleJobsDetails that = (MultipleJobsDetails) o;

return CollectionUtils.isEqualCollection(running, that.running) &&
CollectionUtils.isEqualCollection(finished, that.finished);
return Objects.equals(jobs, that.jobs);
}

@Override
public int hashCode() {
// the hash code only depends on the collection elements, not the collection itself!
int result = 1;

Iterator<JobDetails> iterator = running.iterator();

while (iterator.hasNext()) {
JobDetails jobDetails = iterator.next();
result = 31 * result + (jobDetails == null ? 0 : jobDetails.hashCode());
}

iterator = finished.iterator();

while (iterator.hasNext()) {
JobDetails jobDetails = iterator.next();
result = 31 * result + (jobDetails == null ? 0 : jobDetails.hashCode());
}

return result;
return Objects.hash(jobs);
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,8 @@ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParam

gen.writeStartObject();

gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING);
for (JobDetails detail : result.getRunning()) {
jobDetailsSerializer.serialize(detail, gen, null);
}
gen.writeEndArray();

gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED);
for (JobDetails detail : result.getFinished()) {
gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS);
for (JobDetails detail : result.getJobs()) {
jobDetailsSerializer.serialize(detail, gen, null);
}
gen.writeEndArray();
Expand Down Expand Up @@ -126,9 +120,7 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
StringWriter writer = new StringWriter();
try (JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer)) {
gen.writeStartObject();
gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING);
gen.writeEndArray();
gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED);
gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS);

final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer();
jobDetailsSerializer.serialize(WebMonitorUtils.createDetailsForJob(graph), gen, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,7 @@ private void fetchMetrics() {
LOG.debug("Fetching of JobDetails failed.", throwable);
} else {
ArrayList<String> toRetain = new ArrayList<>();
for (JobDetails job : jobDetails.getRunning()) {
toRetain.add(job.getJobId().toString());
}
for (JobDetails job : jobDetails.getFinished()) {
for (JobDetails job : jobDetails.getJobs()) {
toRetain.add(job.getJobId().toString());
}
synchronized (metrics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1682,10 +1682,10 @@ class JobManager(

case msg : RequestJobDetails =>

val ourDetails: Array[JobDetails] = if (msg.shouldIncludeRunning()) {
val ourDetails: List[JobDetails] = if (msg.shouldIncludeRunning()) {
currentJobs.values.map {
v => WebMonitorUtils.createDetailsForJob(v._1)
}.toArray[JobDetails]
}.toList
} else {
null
}
Expand All @@ -1695,11 +1695,10 @@ class JobManager(
future.onSuccess {
case archiveDetails: MultipleJobsDetails =>
theSender ! new MultipleJobsDetails(
util.Arrays.asList(ourDetails: _*),
archiveDetails.getFinished())
(ourDetails ++ archiveDetails.getJobs.asScala).asJavaCollection)
}(context.dispatcher)
} else {
theSender ! new MultipleJobsDetails(util.Arrays.asList(ourDetails: _*), null)
theSender ! new MultipleJobsDetails(util.Arrays.asList(ourDetails: _*))
}

case _ => log.error("Unrecognized info message " + actorMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class MemoryArchivist(
v => WebMonitorUtils.createDetailsForJob(v)
}.toArray[JobDetails]

theSender ! decorateMessage(new MultipleJobsDetails(null, util.Arrays.asList(details: _*)))
theSender ! decorateMessage(new MultipleJobsDetails(util.Arrays.asList(details: _*)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
import org.apache.flink.runtime.messages.webmonitor.StatusOverview;

import org.junit.Test;

import java.util.ArrayList;
Expand All @@ -38,7 +39,7 @@
import java.util.List;
import java.util.Random;

import static org.junit.Assert.*;
import static org.junit.Assert.fail;

public class WebMonitorMessagesTest {

Expand Down Expand Up @@ -103,7 +104,7 @@ public void testMultipleJobDetails() {
try {
final Random rnd = new Random();
GenericMessageTester.testMessageInstance(
new MultipleJobsDetails(randomJobDetails(rnd), randomJobDetails(rnd)));
new MultipleJobsDetails(randomJobDetails(rnd)));
}
catch (Exception e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;

import java.util.Collections;
import java.util.Arrays;

import static org.junit.Assert.assertEquals;

Expand Down Expand Up @@ -72,8 +72,7 @@ public void testMultipleJobsDetailsMarshalling() throws JsonProcessingException
4);

final MultipleJobsDetails expected = new MultipleJobsDetails(
Collections.singleton(running),
Collections.singleton(finished));
Arrays.asList(running, finished));

final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testUpdate() throws Exception {
JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);

when(jobManagerGateway.requestJobDetails(any(Time.class)))
.thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList(), Collections.emptyList())));
.thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList())));
when(jobManagerGateway.requestTaskManagerInstances(any(Time.class)))
.thenReturn(CompletableFuture.completedFuture(Collections.singleton(taskManager)));
when(jobManagerGateway.getAddress()).thenReturn("/jm/address");
Expand Down

0 comments on commit 8d642ee

Please sign in to comment.