Skip to content

Commit

Permalink
[FLINK-7858][flip6] Return with HTTP 404 if job or jobvertex are unknown
Browse files Browse the repository at this point in the history
Annotate AccessExecutionGraph#getJobVertex(JobVertexID) with @nullable.
Throw NotFoundException in JobVertexTaskManagersHandler if jobvertexId is unknown.
Throw NotFoundException in AbstractExecutionGraphHandler if jobId is unknown.
Copy Javadoc from legacy JobVertexTaskManagersHandler.
  • Loading branch information
GJL authored and tillrohrmann committed Jan 25, 2018
1 parent 056c72a commit 37b4e2c
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 23 deletions.
Expand Up @@ -76,8 +76,9 @@ public interface AccessExecutionGraph {
* Returns the job vertex for the given {@link JobVertexID}.
*
* @param id id of job vertex to be returned
* @return job vertex for the given id, or null
* @return job vertex for the given id, or {@code null}
*/
@Nullable
AccessExecutionJobVertex getJobVertex(JobVertexID id);

/**
Expand Down
Expand Up @@ -33,4 +33,8 @@ public class NotFoundException extends RestHandlerException {
public NotFoundException(String message) {
super(message, HttpResponseStatus.NOT_FOUND);
}

public NotFoundException(String message, Throwable cause) {
super(message, HttpResponseStatus.NOT_FOUND, cause);
}
}
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.rest.NotFoundException;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
Expand All @@ -32,6 +34,7 @@
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -79,8 +82,16 @@ protected CompletableFuture<R> handleRequest(@Nonnull HandlerRequest<EmptyReques
} catch (RestHandlerException rhe) {
throw new CompletionException(rhe);
}
},
executor);
}, executor)
.exceptionally(throwable -> {
throwable = ExceptionUtils.stripCompletionException(throwable);
if (throwable instanceof FlinkJobNotFoundException) {
throw new CompletionException(
new NotFoundException(String.format("Job %s not found", jobId), throwable));
} else {
throw new CompletionException(throwable);
}
});
}

/**
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.NotFoundException;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
Expand All @@ -41,6 +42,7 @@
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.Preconditions;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -50,7 +52,8 @@
import java.util.concurrent.Executor;

/**
* Request handler for the job vertex task managers.
* A request handler that provides the details of a job vertex, including id, name, and the
* runtime and metrics of all its subtasks aggregated by TaskManager.
*/
public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, JobVertexMessageParameters> {
private MetricFetcher<?> metricFetcher;
Expand All @@ -65,7 +68,7 @@ public JobVertexTaskManagersHandler(
Executor executor,
MetricFetcher<?> metricFetcher) {
super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
this.metricFetcher = metricFetcher;
this.metricFetcher = Preconditions.checkNotNull(metricFetcher);
}

@Override
Expand All @@ -76,23 +79,24 @@ protected JobVertexTaskManagersInfo handleRequest(
JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);

if (jobVertex == null) {
throw new NotFoundException(String.format("JobVertex %s not found", jobVertexID));
}

// Build a map that groups tasks by TaskManager
Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
String taskManager = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort();
List<AccessExecutionVertex> vertices = taskManagerVertices.get(taskManager);
if (vertices == null) {
vertices = new ArrayList<>();
taskManagerVertices.put(taskManager, vertices);
}

String taskManager = location == null ? "(unassigned)" : location.getHostname() + ':' + location.dataPort();
List<AccessExecutionVertex> vertices = taskManagerVertices.computeIfAbsent(
taskManager,
ignored -> new ArrayList<>(4));
vertices.add(vertex);
}

final long now = System.currentTimeMillis();

List<JobVertexTaskManagersInfo.TaskManagersInfo> taskManagersInfoList = new ArrayList<>();
List<JobVertexTaskManagersInfo.TaskManagersInfo> taskManagersInfoList = new ArrayList<>(4);
for (Map.Entry<String, List<AccessExecutionVertex>> entry : taskManagerVertices.entrySet()) {
String host = entry.getKey();
List<AccessExecutionVertex> taskVertices = entry.getValue();
Expand Down Expand Up @@ -141,8 +145,10 @@ protected JobVertexTaskManagersInfo handleRequest(
duration = -1L;
}

ExecutionState jobVertexState =
ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, taskVertices.size());
ExecutionState jobVertexState = ExecutionJobVertex.getAggregateJobVertexState(
tasksPerState,
taskVertices.size());

final IOMetricsInfo jobVertexMetrics = new IOMetricsInfo(
counts.getNumBytesInLocal() + counts.getNumBytesInRemote(),
counts.isNumBytesInLocalComplete() && counts.isNumBytesInRemoteComplete(),
Expand All @@ -153,11 +159,18 @@ protected JobVertexTaskManagersInfo handleRequest(
counts.getNumRecordsOut(),
counts.isNumRecordsOutComplete());

Map<ExecutionState, Integer> statusCounts = new HashMap<>();
Map<ExecutionState, Integer> statusCounts = new HashMap<>(ExecutionState.values().length);
for (ExecutionState state : ExecutionState.values()) {
statusCounts.put(state, tasksPerState[state.ordinal()]);
}
taskManagersInfoList.add(new JobVertexTaskManagersInfo.TaskManagersInfo(host, jobVertexState, startTime, endTime, duration, jobVertexMetrics, statusCounts));
taskManagersInfoList.add(new JobVertexTaskManagersInfo.TaskManagersInfo(
host,
jobVertexState,
startTime,
endTime,
duration,
jobVertexMetrics,
statusCounts));
}

return new JobVertexTaskManagersInfo(jobVertexID, jobVertex.getName(), now, taskManagersInfoList);
Expand Down
Expand Up @@ -36,6 +36,8 @@ public class JobVertexTaskManagersHeaders implements MessageHeaders<EmptyRequest
"/:" + JobVertexIdPathParameter.KEY +
"/taskmanagers";

private JobVertexTaskManagersHeaders() {}

@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
Expand Down
Expand Up @@ -30,7 +30,7 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;

import java.util.List;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;

Expand All @@ -56,18 +56,18 @@ public class JobVertexTaskManagersInfo implements ResponseBody {
private final long now;

@JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS)
private List<TaskManagersInfo> taskManagers;
private Collection<TaskManagersInfo> taskManagerInfos;

@JsonCreator
public JobVertexTaskManagersInfo(
@JsonDeserialize(using = JobVertexIDDeserializer.class) @JsonProperty(VERTEX_TASK_FIELD_ID) JobVertexID jobVertexID,
@JsonProperty(VERTEX_TASK_FIELD_NAME) String name,
@JsonProperty(VERTEX_TASK_FIELD_NOW) long now,
@JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS) List<TaskManagersInfo> taskManagers) {
@JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS) Collection<TaskManagersInfo> taskManagerInfos) {
this.jobVertexID = checkNotNull(jobVertexID);
this.name = checkNotNull(name);
this.now = now;
this.taskManagers = checkNotNull(taskManagers);
this.taskManagerInfos = checkNotNull(taskManagerInfos);
}

@Override
Expand All @@ -82,12 +82,12 @@ public boolean equals(Object o) {
return Objects.equals(jobVertexID, that.jobVertexID) &&
Objects.equals(name, that.name) &&
now == that.now &&
Objects.equals(taskManagers, that.taskManagers);
Objects.equals(taskManagerInfos, that.taskManagerInfos);
}

@Override
public int hashCode() {
return Objects.hash(jobVertexID, name, now, taskManagers);
return Objects.hash(jobVertexID, name, now, taskManagerInfos);
}

// ---------------------------------------------------
Expand Down

0 comments on commit 37b4e2c

Please sign in to comment.