Skip to content

Commit

Permalink
[FLINK-7705] Add JobDetailsHandler
Browse files Browse the repository at this point in the history
Add JobID(De)Serializer and JobVertexID(De)Serializer for jackson

This closes #4884.
  • Loading branch information
tillrohrmann committed Nov 7, 2017
1 parent 1c78dee commit de201a6
Show file tree
Hide file tree
Showing 13 changed files with 1,003 additions and 12 deletions.
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler; import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
import org.apache.flink.runtime.rest.handler.job.JobConfigHandler; import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler; import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
import org.apache.flink.runtime.rest.handler.job.JobPlanHandler; import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler; import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders; import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
Expand Down Expand Up @@ -289,6 +291,17 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
TaskManagerDetailsHeaders.getInstance(), TaskManagerDetailsHeaders.getInstance(),
resourceManagerRetriever, resourceManagerRetriever,
metricFetcher); metricFetcher);

final JobDetailsHandler jobDetailsHandler = new JobDetailsHandler(
restAddressFuture,
leaderRetriever,
timeout,
responseHeaders,
JobDetailsHeaders.getInstance(),
executionGraphCache,
executor,
metricFetcher);

final File tmpDir = restConfiguration.getTmpDir(); final File tmpDir = restConfiguration.getTmpDir();


Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent; Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
Expand Down Expand Up @@ -317,6 +330,7 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), taskCheckpointStatisticDetailsHandler)); handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), taskCheckpointStatisticDetailsHandler));
handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler)); handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler));
handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler)); handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler));
handlers.add(Tuple2.of(JobDetailsHeaders.getInstance(), jobDetailsHandler));
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler)); handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler)); handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler)); handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler));
Expand Down
@@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.job;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.Preconditions;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

/**
* Handler returning the details for the specified job.
*/
public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsInfo, JobMessageParameters> {

private final MetricFetcher<?> metricFetcher;

public JobDetailsHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, JobDetailsInfo, JobMessageParameters> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor,
MetricFetcher<?> metricFetcher) {
super(
localRestAddress,
leaderRetriever,
timeout,
responseHeaders,
messageHeaders,
executionGraphCache,
executor);

this.metricFetcher = Preconditions.checkNotNull(metricFetcher);
}

@Override
protected JobDetailsInfo handleRequest(
HandlerRequest<EmptyRequestBody, JobMessageParameters> request,
AccessExecutionGraph executionGraph) throws RestHandlerException {

final long now = System.currentTimeMillis();
final long startTime = executionGraph.getStatusTimestamp(JobStatus.CREATED);
final long endTime = executionGraph.getState().isGloballyTerminalState() ?
executionGraph.getStatusTimestamp(executionGraph.getState()) : -1L;
final long duration = (endTime > 0L ? endTime : now) - startTime;

final Map<JobStatus, Long> timestamps = new HashMap<>(JobStatus.values().length);

for (JobStatus jobStatus : JobStatus.values()) {
timestamps.put(jobStatus, executionGraph.getStatusTimestamp(jobStatus));
}

Collection<JobDetailsInfo.JobVertexDetailsInfo> jobVertexInfos = new ArrayList<>(executionGraph.getAllVertices().size());
int[] jobVerticesPerState = new int[ExecutionState.values().length];

for (AccessExecutionJobVertex accessExecutionJobVertex : executionGraph.getVerticesTopologically()) {
final JobDetailsInfo.JobVertexDetailsInfo vertexDetailsInfo = createJobVertexDetailsInfo(
accessExecutionJobVertex,
now,
executionGraph.getJobID(),
metricFetcher);

jobVertexInfos.add(vertexDetailsInfo);
jobVerticesPerState[vertexDetailsInfo.getExecutionState().ordinal()]++;
}

Map<ExecutionState, Integer> jobVerticesPerStateMap = new HashMap<>(ExecutionState.values().length);

for (ExecutionState executionState : ExecutionState.values()) {
jobVerticesPerStateMap.put(executionState, jobVerticesPerState[executionState.ordinal()]);
}

return new JobDetailsInfo(
executionGraph.getJobID(),
executionGraph.getJobName(),
executionGraph.isStoppable(),
executionGraph.getState(),
startTime,
endTime,
duration,
now,
timestamps,
jobVertexInfos,
jobVerticesPerStateMap,
executionGraph.getJsonPlan());
}

public static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(
AccessExecutionJobVertex ejv,
long now,
JobID jobId,
MetricFetcher<?> metricFetcher) {
int[] tasksPerState = new int[ExecutionState.values().length];
long startTime = Long.MAX_VALUE;
long endTime = 0;
boolean allFinished = true;

for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
final ExecutionState state = vertex.getExecutionState();
tasksPerState[state.ordinal()]++;

// take the earliest start time
long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
if (started > 0L) {
startTime = Math.min(startTime, started);
}

allFinished &= state.isTerminal();
endTime = Math.max(endTime, vertex.getStateTimestamp(state));
}

long duration;
if (startTime < Long.MAX_VALUE) {
if (allFinished) {
duration = endTime - startTime;
}
else {
endTime = -1L;
duration = now - startTime;
}
}
else {
startTime = -1L;
endTime = -1L;
duration = -1L;
}

ExecutionState jobVertexState =
ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, ejv.getParallelism());

Map<ExecutionState, Integer> tasksPerStateMap = new HashMap<>(tasksPerState.length);

for (ExecutionState executionState : ExecutionState.values()) {
tasksPerStateMap.put(executionState, tasksPerState[executionState.ordinal()]);
}

MutableIOMetrics counts = new MutableIOMetrics();

for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
counts.addIOMetrics(
vertex.getCurrentExecutionAttempt(),
metricFetcher,
jobId.toString(),
ejv.getJobVertexId().toString());
}

final JobDetailsInfo.JobVertexMetrics jobVertexMetrics = new JobDetailsInfo.JobVertexMetrics(
counts.getNumBytesInLocal() + counts.getNumBytesInRemote(),
counts.isNumBytesInLocalComplete() && counts.isNumBytesInRemoteComplete(),
counts.getNumBytesOut(),
counts.isNumBytesOutComplete(),
counts.getNumRecordsIn(),
counts.isNumRecordsInComplete(),
counts.getNumRecordsOut(),
counts.isNumRecordsOutComplete());

return new JobDetailsInfo.JobVertexDetailsInfo(
ejv.getJobVertexId(),
ejv.getName(),
ejv.getParallelism(),
jobVertexState,
startTime,
endTime,
duration,
tasksPerStateMap,
jobVertexMetrics);
}
}
Expand Up @@ -54,6 +54,26 @@ public MutableIOMetrics() {
super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D); super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D);
} }


public boolean isNumBytesInLocalComplete() {
return numBytesInLocalComplete;
}

public boolean isNumBytesInRemoteComplete() {
return numBytesInRemoteComplete;
}

public boolean isNumBytesOutComplete() {
return numBytesOutComplete;
}

public boolean isNumRecordsInComplete() {
return numRecordsInComplete;
}

public boolean isNumRecordsOutComplete() {
return numRecordsOutComplete;
}

/** /**
* Adds the IO metrics for the given attempt to this object. If the {@link AccessExecution} is in * Adds the IO metrics for the given attempt to this object. If the {@link AccessExecution} is in
* a terminal state the contained {@link IOMetrics} object is added. Otherwise the given {@link MetricFetcher} is * a terminal state the contained {@link IOMetrics} object is added. Otherwise the given {@link MetricFetcher} is
Expand Down
Expand Up @@ -25,8 +25,8 @@
import org.apache.flink.runtime.checkpoint.TaskStateStats; import org.apache.flink.runtime.checkpoint.TaskStateStats;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer; import org.apache.flink.runtime.rest.messages.json.JobVertexIDKeyDeserializer;
import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer; import org.apache.flink.runtime.rest.messages.json.JobVertexIDKeySerializer;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;


import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
Expand Down Expand Up @@ -106,7 +106,7 @@ public class CheckpointStatistics implements ResponseBody {
private final int numAckSubtasks; private final int numAckSubtasks;


@JsonProperty(FIELD_NAME_TASKS) @JsonProperty(FIELD_NAME_TASKS)
@JsonSerialize(keyUsing = JobVertexIDSerializer.class) @JsonSerialize(keyUsing = JobVertexIDKeySerializer.class)
private final Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask; private final Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;


@JsonCreator @JsonCreator
Expand All @@ -121,7 +121,7 @@ private CheckpointStatistics(
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks, @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) { @JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
this.id = id; this.id = id;
this.status = Preconditions.checkNotNull(status); this.status = Preconditions.checkNotNull(status);
this.savepoint = savepoint; this.savepoint = savepoint;
Expand Down Expand Up @@ -309,7 +309,7 @@ public CompletedCheckpointStatistics(
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks, @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask, @JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
@JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath, @JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath,
@JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) { @JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) {
super( super(
Expand Down Expand Up @@ -388,7 +388,7 @@ public FailedCheckpointStatistics(
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks, @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask, @JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
@JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long failureTimestamp, @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long failureTimestamp,
@JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable String failureMessage) { @JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable String failureMessage) {
super( super(
Expand Down

0 comments on commit de201a6

Please sign in to comment.