From 056c72af994bc0b7bd838faff6b2991763fc2ac1 Mon Sep 17 00:00:00 2001 From: zjureel Date: Tue, 19 Dec 2017 16:56:50 +0800 Subject: [PATCH] [FLINK-7858][flip6] Port JobVertexTaskManagersHandler to REST endpoint This closes #5149. --- .../job/JobVertexTaskManagersHandler.java | 165 +++++++++++++++++ .../JobVertexTaskManagersHeaders.java | 72 ++++++++ .../messages/JobVertexTaskManagersInfo.java | 171 ++++++++++++++++++ .../webmonitor/WebMonitorEndpoint.java | 13 ++ .../JobVertexTaskManagersInfoTest.java | 65 +++++++ 5 files changed, 486 insertions(+) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java new file mode 100644 index 0000000000000..9b59e8d5b3e61 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters; +import org.apache.flink.runtime.rest.messages.JobVertexTaskManagersInfo; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler for the job vertex task managers. + */ +public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler { + private MetricFetcher metricFetcher; + + public JobVertexTaskManagersHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + Map responseHeaders, + MessageHeaders messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor, + MetricFetcher metricFetcher) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor); + this.metricFetcher = metricFetcher; + } + + @Override + protected JobVertexTaskManagersInfo handleRequest( + HandlerRequest request, + AccessExecutionGraph executionGraph) throws RestHandlerException { + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class); + AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID); + + // Build a map that groups tasks by TaskManager + Map> taskManagerVertices = new HashMap<>(); + for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { + TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); + String taskManager = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort(); + List vertices = taskManagerVertices.get(taskManager); + if (vertices == null) { + vertices = new ArrayList<>(); + taskManagerVertices.put(taskManager, vertices); + } + + vertices.add(vertex); + } + + final long now = System.currentTimeMillis(); + + List taskManagersInfoList = new ArrayList<>(); + for (Map.Entry> entry : taskManagerVertices.entrySet()) { + String host = entry.getKey(); + List taskVertices = entry.getValue(); + + int[] tasksPerState = new int[ExecutionState.values().length]; + + long startTime = Long.MAX_VALUE; + long endTime = 0; + boolean allFinished = true; + + MutableIOMetrics counts = new MutableIOMetrics(); + + for (AccessExecutionVertex vertex : taskVertices) { + final ExecutionState state = vertex.getExecutionState(); + tasksPerState[state.ordinal()]++; + + // take the earliest start time + long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING); + if (started > 0) { + startTime = Math.min(startTime, started); + } + + allFinished &= state.isTerminal(); + endTime = Math.max(endTime, vertex.getStateTimestamp(state)); + + counts.addIOMetrics( + vertex.getCurrentExecutionAttempt(), + metricFetcher, + jobID.toString(), + jobVertex.getJobVertexId().toString()); + } + + long duration; + if (startTime < Long.MAX_VALUE) { + if (allFinished) { + duration = endTime - startTime; + } + else { + endTime = -1L; + duration = now - startTime; + } + } + else { + startTime = -1L; + endTime = -1L; + duration = -1L; + } + + ExecutionState jobVertexState = + ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, taskVertices.size()); + final IOMetricsInfo jobVertexMetrics = new IOMetricsInfo( + counts.getNumBytesInLocal() + counts.getNumBytesInRemote(), + counts.isNumBytesInLocalComplete() && counts.isNumBytesInRemoteComplete(), + counts.getNumBytesOut(), + counts.isNumBytesOutComplete(), + counts.getNumRecordsIn(), + counts.isNumRecordsInComplete(), + counts.getNumRecordsOut(), + counts.isNumRecordsOutComplete()); + + Map statusCounts = new HashMap<>(); + for (ExecutionState state : ExecutionState.values()) { + statusCounts.put(state, tasksPerState[state.ordinal()]); + } + taskManagersInfoList.add(new JobVertexTaskManagersInfo.TaskManagersInfo(host, jobVertexState, startTime, endTime, duration, jobVertexMetrics, statusCounts)); + } + + return new JobVertexTaskManagersInfo(jobVertexID, jobVertex.getName(), now, taskManagersInfoList); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java new file mode 100644 index 0000000000000..311d0470e1a40 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.job.JobVertexTaskManagersHandler; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for the {@link JobVertexTaskManagersHandler}. + */ +public class JobVertexTaskManagersHeaders implements MessageHeaders { + + private static final JobVertexTaskManagersHeaders INSTANCE = new JobVertexTaskManagersHeaders(); + + public static final String URL = "/jobs" + + "/:" + JobIDPathParameter.KEY + + "/vertices" + + "/:" + JobVertexIdPathParameter.KEY + + "/taskmanagers"; + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public Class getResponseClass() { + return JobVertexTaskManagersInfo.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public JobVertexMessageParameters getUnresolvedMessageParameters() { + return new JobVertexMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static JobVertexTaskManagersHeaders getInstance() { + return INSTANCE; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java new file mode 100644 index 0000000000000..fc30155ddeebb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.job.JobVertexTaskManagersHandler; +import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Response type of the {@link JobVertexTaskManagersHandler}. + */ +public class JobVertexTaskManagersInfo implements ResponseBody { + public static final String VERTEX_TASK_FIELD_ID = "id"; + public static final String VERTEX_TASK_FIELD_NAME = "name"; + public static final String VERTEX_TASK_FIELD_NOW = "now"; + public static final String VERTEX_TASK_FIELD_TASK_MANAGERS = "taskmanagers"; + + @JsonProperty(VERTEX_TASK_FIELD_ID) + @JsonSerialize(using = JobVertexIDSerializer.class) + private final JobVertexID jobVertexID; + + @JsonProperty(VERTEX_TASK_FIELD_NAME) + private final String name; + + @JsonProperty(VERTEX_TASK_FIELD_NOW) + private final long now; + + @JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS) + private List taskManagers; + + @JsonCreator + public JobVertexTaskManagersInfo( + @JsonDeserialize(using = JobVertexIDDeserializer.class) @JsonProperty(VERTEX_TASK_FIELD_ID) JobVertexID jobVertexID, + @JsonProperty(VERTEX_TASK_FIELD_NAME) String name, + @JsonProperty(VERTEX_TASK_FIELD_NOW) long now, + @JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS) List taskManagers) { + this.jobVertexID = checkNotNull(jobVertexID); + this.name = checkNotNull(name); + this.now = now; + this.taskManagers = checkNotNull(taskManagers); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobVertexTaskManagersInfo that = (JobVertexTaskManagersInfo) o; + return Objects.equals(jobVertexID, that.jobVertexID) && + Objects.equals(name, that.name) && + now == that.now && + Objects.equals(taskManagers, that.taskManagers); + } + + @Override + public int hashCode() { + return Objects.hash(jobVertexID, name, now, taskManagers); + } + + // --------------------------------------------------- + // Static inner classes + // --------------------------------------------------- + + /** + * Detailed information about task managers. + */ + public static class TaskManagersInfo { + public static final String TASK_MANAGERS_FIELD_HOST = "host"; + public static final String TASK_MANAGERS_FIELD_STATUS = "status"; + public static final String TASK_MANAGERS_FIELD_START_TIME = "start-time"; + public static final String TASK_MANAGERS_FIELD_END_TIME = "end-time"; + public static final String TASK_MANAGERS_FIELD_DURATION = "duration"; + public static final String TASK_MANAGERS_FIELD_METRICS = "metrics"; + public static final String TASK_MANAGERS_FIELD_STATUS_COUNTS = "status-counts"; + + @JsonProperty(TASK_MANAGERS_FIELD_HOST) + private final String host; + + @JsonProperty(TASK_MANAGERS_FIELD_STATUS) + private final ExecutionState status; + + @JsonProperty(TASK_MANAGERS_FIELD_START_TIME) + private final long startTime; + + @JsonProperty(TASK_MANAGERS_FIELD_END_TIME) + private final long endTime; + + @JsonProperty(TASK_MANAGERS_FIELD_DURATION) + private final long duration; + + @JsonProperty(TASK_MANAGERS_FIELD_METRICS) + private final IOMetricsInfo metrics; + + @JsonProperty(TASK_MANAGERS_FIELD_STATUS_COUNTS) + private final Map statusCounts; + + @JsonCreator + public TaskManagersInfo( + @JsonProperty(TASK_MANAGERS_FIELD_HOST) String host, + @JsonProperty(TASK_MANAGERS_FIELD_STATUS) ExecutionState status, + @JsonProperty(TASK_MANAGERS_FIELD_START_TIME) long startTime, + @JsonProperty(TASK_MANAGERS_FIELD_END_TIME) long endTime, + @JsonProperty(TASK_MANAGERS_FIELD_DURATION) long duration, + @JsonProperty(TASK_MANAGERS_FIELD_METRICS) IOMetricsInfo metrics, + @JsonProperty(TASK_MANAGERS_FIELD_STATUS_COUNTS) Map statusCounts) { + this.host = checkNotNull(host); + this.status = checkNotNull(status); + this.startTime = startTime; + this.endTime = endTime; + this.duration = duration; + this.metrics = checkNotNull(metrics); + this.statusCounts = checkNotNull(statusCounts); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TaskManagersInfo that = (TaskManagersInfo) o; + return Objects.equals(host, that.host) && + Objects.equals(status, that.status) && + startTime == that.startTime && + endTime == that.endTime && + duration == that.duration && + Objects.equals(metrics, that.metrics) && + Objects.equals(statusCounts, that.statusCounts); + } + + @Override + public int hashCode() { + return Objects.hash(host, status, startTime, endTime, duration, metrics, statusCounts); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index e4327521f5e6b..30a68d1ada0d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.rest.handler.job.JobIdsHandler; import org.apache.flink.runtime.rest.handler.job.JobPlanHandler; import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler; +import org.apache.flink.runtime.rest.handler.job.JobVertexTaskManagersHandler; import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler; import org.apache.flink.runtime.rest.handler.job.SubtaskCurrentAttemptDetailsHandler; import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptAccumulatorsHandler; @@ -69,6 +70,7 @@ import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders; import org.apache.flink.runtime.rest.messages.JobPlanHeaders; import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders; +import org.apache.flink.runtime.rest.messages.JobVertexTaskManagersHeaders; import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; import org.apache.flink.runtime.rest.messages.SubtasksTimesHeaders; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders; @@ -350,6 +352,16 @@ protected List> initiali responseHeaders, metricFetcher); + final JobVertexTaskManagersHandler jobVertexTaskManagersHandler = new JobVertexTaskManagersHandler( + restAddressFuture, + leaderRetriever, + timeout, + responseHeaders, + JobVertexTaskManagersHeaders.getInstance(), + executionGraphCache, + executor, + metricFetcher); + final JobExecutionResultHandler jobExecutionResultHandler = new JobExecutionResultHandler( restAddressFuture, leaderRetriever, @@ -446,6 +458,7 @@ protected List> initiali handlers.add(Tuple2.of(SubtaskExecutionAttemptDetailsHeaders.getInstance(), subtaskExecutionAttemptDetailsHandler)); handlers.add(Tuple2.of(SubtaskExecutionAttemptAccumulatorsHeaders.getInstance(), subtaskExecutionAttemptAccumulatorsHandler)); handlers.add(Tuple2.of(SubtaskCurrentAttemptDetailsHeaders.getInstance(), subtaskCurrentAttemptDetailsHandler)); + handlers.add(Tuple2.of(JobVertexTaskManagersHeaders.getInstance(), jobVertexTaskManagersHandler)); // This handler MUST be added last, as it otherwise masks all subsequent GET handlers optWebContent.ifPresent( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java new file mode 100644 index 0000000000000..1a7b521f8c0a3 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import static org.apache.flink.runtime.rest.messages.JobVertexTaskManagersInfo.TaskManagersInfo; + +/** + * Tests that the {@link JobVertexTaskManagersInfo} can be marshalled and unmarshalled. + */ +public class JobVertexTaskManagersInfoTest extends RestResponseMarshallingTestBase { + @Override + protected Class getTestResponseClass() { + return JobVertexTaskManagersInfo.class; + } + + @Override + protected JobVertexTaskManagersInfo getTestResponseInstance() throws Exception { + final Random random = new Random(); + List taskManagersInfoList = new ArrayList<>(); + + final Map statusCounts = new HashMap<>(ExecutionState.values().length); + final IOMetricsInfo jobVertexMetrics = new IOMetricsInfo( + random.nextLong(), + random.nextBoolean(), + random.nextLong(), + random.nextBoolean(), + random.nextLong(), + random.nextBoolean(), + random.nextLong(), + random.nextBoolean()); + int count = 100; + for (ExecutionState executionState : ExecutionState.values()) { + statusCounts.put(executionState, count++); + } + taskManagersInfoList.add(new TaskManagersInfo("host1", ExecutionState.CANCELING, 1L, 2L, 3L, jobVertexMetrics, statusCounts)); + + return new JobVertexTaskManagersInfo(new JobVertexID(), "test", System.currentTimeMillis(), taskManagersInfoList); + } +}