Skip to content

Commit

Permalink
[FLINK-7941] Store timestamps indexed by ExecutionState in SubtasksTi…
Browse files Browse the repository at this point in the history
…mesInfo
  • Loading branch information
tillrohrmann committed Nov 7, 2017
1 parent 712d4cf commit 26e3d37
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 16 deletions.
Expand Up @@ -89,9 +89,9 @@ protected SubtasksTimesInfo handleRequest(HandlerRequest<EmptyRequestBody, JobVe
TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
String locationString = location == null ? "(unassigned)" : location.getHostname(); String locationString = location == null ? "(unassigned)" : location.getHostname();


Map<String, Long> timestampMap = new HashMap<>(); Map<ExecutionState, Long> timestampMap = new HashMap<>(ExecutionState.values().length);
for (ExecutionState state : ExecutionState.values()) { for (ExecutionState state : ExecutionState.values()) {
timestampMap.put(state.name(), timestamps[state.ordinal()]); timestampMap.put(state, timestamps[state.ordinal()]);
} }


subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo( subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(
Expand Down
Expand Up @@ -18,6 +18,7 @@


package org.apache.flink.runtime.rest.messages; package org.apache.flink.runtime.rest.messages;


import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler; import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;


import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
Expand Down Expand Up @@ -109,13 +110,13 @@ public static final class SubtaskTimeInfo {
private final long duration; private final long duration;


@JsonProperty(FIELD_NAME_TIMESTAMPS) @JsonProperty(FIELD_NAME_TIMESTAMPS)
private final Map<String, Long> timestamps; private final Map<ExecutionState, Long> timestamps;


public SubtaskTimeInfo( public SubtaskTimeInfo(
@JsonProperty(FIELD_NAME_SUBTASK) int subtask, @JsonProperty(FIELD_NAME_SUBTASK) int subtask,
@JsonProperty(FIELD_NAME_HOST) String host, @JsonProperty(FIELD_NAME_HOST) String host,
@JsonProperty(FIELD_NAME_DURATION) long duration, @JsonProperty(FIELD_NAME_DURATION) long duration,
@JsonProperty(FIELD_NAME_TIMESTAMPS) Map<String, Long> timestamps) { @JsonProperty(FIELD_NAME_TIMESTAMPS) Map<ExecutionState, Long> timestamps) {
this.subtask = subtask; this.subtask = subtask;
this.host = checkNotNull(host); this.host = checkNotNull(host);
this.duration = duration; this.duration = duration;
Expand Down
Expand Up @@ -18,6 +18,8 @@


package org.apache.flink.runtime.rest.messages; package org.apache.flink.runtime.rest.messages;


import org.apache.flink.runtime.execution.ExecutionState;

import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
Expand All @@ -37,22 +39,22 @@ protected Class<SubtasksTimesInfo> getTestResponseClass() {
protected SubtasksTimesInfo getTestResponseInstance() throws Exception { protected SubtasksTimesInfo getTestResponseInstance() throws Exception {
List<SubtasksTimesInfo.SubtaskTimeInfo> subtasks = new ArrayList<>(); List<SubtasksTimesInfo.SubtaskTimeInfo> subtasks = new ArrayList<>();


Map<String, Long> subTimeMap1 = new HashMap<>(); Map<ExecutionState, Long> subTimeMap1 = new HashMap<>();
subTimeMap1.put("state11", System.currentTimeMillis()); subTimeMap1.put(ExecutionState.RUNNING, 1L);
subTimeMap1.put("state12", System.currentTimeMillis()); subTimeMap1.put(ExecutionState.FAILED, 2L);
subTimeMap1.put("state13", System.currentTimeMillis()); subTimeMap1.put(ExecutionState.CANCELED, 3L);
subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(0, "local1", 1L, subTimeMap1)); subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(0, "local1", 1L, subTimeMap1));


Map<String, Long> subTimeMap2 = new HashMap<>(); Map<ExecutionState, Long> subTimeMap2 = new HashMap<>();
subTimeMap1.put("state21", System.currentTimeMillis()); subTimeMap2.put(ExecutionState.RUNNING, 4L);
subTimeMap1.put("state22", System.currentTimeMillis()); subTimeMap2.put(ExecutionState.FAILED, 5L);
subTimeMap1.put("state23", System.currentTimeMillis()); subTimeMap2.put(ExecutionState.CANCELED, 6L);
subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(1, "local2", 2L, subTimeMap2)); subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(1, "local2", 2L, subTimeMap2));


Map<String, Long> subTimeMap3 = new HashMap<>(); Map<ExecutionState, Long> subTimeMap3 = new HashMap<>();
subTimeMap1.put("state31", System.currentTimeMillis()); subTimeMap3.put(ExecutionState.SCHEDULED, 1L);
subTimeMap1.put("state32", System.currentTimeMillis()); subTimeMap3.put(ExecutionState.FAILED, 2L);
subTimeMap1.put("state33", System.currentTimeMillis()); subTimeMap3.put(ExecutionState.CANCELING, 3L);
subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(2, "local3", 3L, subTimeMap3)); subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(2, "local3", 3L, subTimeMap3));


return new SubtasksTimesInfo("testId", "testName", System.currentTimeMillis(), subtasks); return new SubtasksTimesInfo("testId", "testName", System.currentTimeMillis(), subtasks);
Expand Down

0 comments on commit 26e3d37

Please sign in to comment.