From 16b3f11b96b1a8b0f290945af3d255d47739992e Mon Sep 17 00:00:00 2001 From: "biao.liub" Date: Wed, 10 Jan 2018 14:25:07 +0800 Subject: [PATCH 1/2] [FLINK-8368] Migrate org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler to new a REST handler that registered in WebMonitorEndpoint --- .../handler/job/AbstractJobVertexHandler.java | 101 ++++++++++ .../job/AbstractSubtaskAttemptHandler.java | 109 +++++++++++ .../handler/job/AbstractSubtaskHandler.java | 103 ++++++++++ .../rest/handler/job/JobDetailsHandler.java | 3 +- .../job/JobVertexAccumulatorsHandler.java | 37 ++-- ...SubtaskExecutionAttemptDetailsHandler.java | 131 +++++++++++++ .../handler/job/SubtasksTimesHandler.java | 9 +- .../AbstractJobVertexRequestHandler.java | 2 +- .../rest/messages/job/JobDetailsInfo.java | 96 +-------- .../job/SubtaskAttemptMessageParameters.java | 41 ++++ .../job/SubtaskAttemptPathParameter.java | 53 +++++ ...SubtaskExecutionAttemptDetailsHeaders.java | 77 ++++++++ .../SubtaskExecutionAttemptDetailsInfo.java | 122 ++++++++++++ .../job/SubtaskMessageParameters.java | 39 ++++ .../messages/job/metrics/IOMetricsInfo.java | 114 +++++++++++ .../SubtaskMetricsMessageParameters.java | 14 +- .../webmonitor/WebMonitorEndpoint.java | 14 ++ ...askExecutionAttemptDetailsHandlerTest.java | 182 ++++++++++++++++++ .../rest/messages/job/JobDetailsInfoTest.java | 3 +- ...ubtaskExecutionAttemptDetailsInfoTest.java | 63 ++++++ 20 files changed, 1177 insertions(+), 136 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractJobVertexHandler.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskHandler.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskAttemptMessageParameters.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskAttemptPathParameter.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskMessageParameters.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractJobVertexHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractJobVertexHandler.java new file mode 100644 index 0000000000000..f0f11bd181ba7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractJobVertexHandler.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Base class for request handlers whose response depends on a specific job vertex (defined + * via the "{@link JobVertexIdPathParameter#KEY}" parameter) in a specific job, + * defined via (defined via the "{@link JobIDPathParameter#KEY}" parameter). + * + * @param the response type + * @param the message parameters type + */ +public abstract class AbstractJobVertexHandler extends AbstractExecutionGraphHandler { + + /** + * Instantiates a new Abstract job vertex handler. + * + * @param localRestAddress the local rest address + * @param leaderRetriever the leader retriever + * @param timeout the timeout + * @param responseHeaders the response headers + * @param messageHeaders the message headers + * @param executionGraphCache the execution graph cache + * @param executor the executor + */ + protected AbstractJobVertexHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + Map responseHeaders, + MessageHeaders messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor) { + + super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor); + } + + @Override + protected R handleRequest( + HandlerRequest request, + AccessExecutionGraph executionGraph) throws RestHandlerException { + + final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class); + final AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID); + + if (jobVertex == null) { + throw new RestHandlerException("No vertex with ID '" + jobVertexID + "' exists.", HttpResponseStatus.NOT_FOUND); + } + + return handleRequest(request, jobVertex); + } + + /** + * Called for each request after the corresponding {@link AccessExecutionJobVertex} has been retrieved from the + * {@link AccessExecutionGraph}. + * + * @param request the request + * @param jobVertex the execution job vertex + * @return the response + * @throws RestHandlerException if the handler could not process the request + */ + protected abstract R handleRequest(HandlerRequest request, AccessExecutionJobVertex jobVertex) throws RestHandlerException; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java new file mode 100644 index 0000000000000..388ba8bff922b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter; +import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters; +import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptPathParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Base class for request handlers whose response depends on a specific attempt (defined + * via the "{@link SubtaskAttemptPathParameter#KEY}" of a specific subtask (defined + * via the "{@link SubtaskIndexPathParameter#KEY}" in a specific job vertex, (defined + * via the "{@link JobVertexIdPathParameter#KEY}" parameter) in a specific job, + * defined via (defined via the "{@link JobIDPathParameter#KEY}" parameter). + * + * @param the response type + * @param the message parameters type + */ +public abstract class AbstractSubtaskAttemptHandler extends AbstractSubtaskHandler{ + /** + * Instantiates a new Abstract job vertex handler. + * + * @param localRestAddress the local rest address + * @param leaderRetriever the leader retriever + * @param timeout the timeout + * @param responseHeaders the response headers + * @param messageHeaders the message headers + * @param executionGraphCache the execution graph cache + * @param executor the executor + */ + protected AbstractSubtaskAttemptHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + Map responseHeaders, + MessageHeaders messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor) { + + super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor); + } + + @Override + protected R handleRequest(HandlerRequest request, AccessExecutionVertex executionVertex) throws RestHandlerException { + final Integer attemptNumber = request.getPathParameter(SubtaskAttemptPathParameter.class); + + final AccessExecution currentAttempt = executionVertex.getCurrentExecutionAttempt(); + if (attemptNumber == currentAttempt.getAttemptNumber()) { + return handleRequest(request, currentAttempt); + } else if (attemptNumber >= 0 && attemptNumber < currentAttempt.getAttemptNumber()) { + final AccessExecution execution = executionVertex.getPriorExecutionAttempt(attemptNumber); + + if (execution != null) { + return handleRequest(request, execution); + } else { + throw new RestHandlerException("Attempt " + attemptNumber + " not found in subtask " + + executionVertex.getTaskNameWithSubtaskIndex(), HttpResponseStatus.NOT_FOUND); + } + } else { + throw new RestHandlerException("Invalid attempt num " + attemptNumber, HttpResponseStatus.NOT_FOUND); + } + } + + /** + * Called for each request after the corresponding {@link AccessExecution} has been retrieved from the + * {@link AccessExecutionVertex}. + * + * @param request the request + * @param execution the execution + * @return the response + * @throws RestHandlerException the rest handler exception + */ + protected abstract R handleRequest(HandlerRequest request, AccessExecution execution) throws RestHandlerException; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskHandler.java new file mode 100644 index 0000000000000..82606047e09dd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskHandler.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter; +import org.apache.flink.runtime.rest.messages.job.SubtaskMessageParameters; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + + +/** + * Base class for request handlers whose response depends on a specific subtask (defined + * via the "{@link SubtaskIndexPathParameter#KEY}" in a specific job vertex, (defined + * via the "{@link JobVertexIdPathParameter#KEY}" parameter) in a specific job, + * defined via (defined via the "{@link JobIDPathParameter#KEY}" parameter). + * + * + * @param the response type + * @param the message parameters type + */ +public abstract class AbstractSubtaskHandler extends AbstractJobVertexHandler { + + /** + * Instantiates a new Abstract job vertex handler. + * + * @param localRestAddress the local rest address + * @param leaderRetriever the leader retriever + * @param timeout the timeout + * @param responseHeaders the response headers + * @param messageHeaders the message headers + * @param executionGraphCache the execution graph cache + * @param executor the executor + */ + protected AbstractSubtaskHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, Map responseHeaders, + MessageHeaders messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor) { + + super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor); + } + + @Override + protected R handleRequest( + HandlerRequest request, + AccessExecutionJobVertex jobVertex) throws RestHandlerException { + + final Integer subtaskIndex = request.getPathParameter(SubtaskIndexPathParameter.class); + final AccessExecutionVertex[] executionVertices = jobVertex.getTaskVertices(); + + if (subtaskIndex >= executionVertices.length || subtaskIndex < 0) { + throw new RestHandlerException("Invalid subtask index for vertex " + jobVertex.getJobVertexId(), HttpResponseStatus.NOT_FOUND); + } + + return handleRequest(request, executionVertices[subtaskIndex]); + } + + /** + * Called for each request after the corresponding {@link AccessExecutionVertex} has been retrieved from the + * {@link AccessExecutionJobVertex}. + * + * @param request the request + * @param executionVertex the execution vertex + * @return the response + * @throws RestHandlerException the rest handler exception + */ + protected abstract R handleRequest(HandlerRequest request, AccessExecutionVertex executionVertex) throws RestHandlerException; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java index 647763a2c269f..82f24d3aff2d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; +import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.Preconditions; @@ -185,7 +186,7 @@ private static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo( ejv.getJobVertexId().toString()); } - final JobDetailsInfo.JobVertexMetrics jobVertexMetrics = new JobDetailsInfo.JobVertexMetrics( + final IOMetricsInfo jobVertexMetrics = new IOMetricsInfo( counts.getNumBytesInLocal() + counts.getNumBytesInRemote(), counts.isNumBytesInLocalComplete() && counts.isNumBytesInRemoteComplete(), counts.getNumBytesOut(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java index 55c465c09214a..52e5632b9480a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java @@ -20,22 +20,17 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsInfo; -import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters; import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; - import java.util.ArrayList; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -44,7 +39,7 @@ /** * Request handler for the job vertex accumulators. */ -public class JobVertexAccumulatorsHandler extends AbstractExecutionGraphHandler { +public class JobVertexAccumulatorsHandler extends AbstractJobVertexHandler { public JobVertexAccumulatorsHandler( CompletableFuture localRestAddress, @@ -65,25 +60,21 @@ public JobVertexAccumulatorsHandler( } @Override - protected JobVertexAccumulatorsInfo handleRequest(HandlerRequest request, AccessExecutionGraph executionGraph) throws RestHandlerException { - JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class); - AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID); - - if (null != jobVertex) { - StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified(); - ArrayList userAccumulatorList = new ArrayList<>(accs.length); + protected JobVertexAccumulatorsInfo handleRequest( + HandlerRequest request, + AccessExecutionJobVertex jobVertex) throws RestHandlerException { - for (StringifiedAccumulatorResult acc : accs) { - userAccumulatorList.add( - new JobVertexAccumulatorsInfo.UserAccumulator( - acc.getName(), - acc.getType(), - acc.getValue())); - } + StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified(); + ArrayList userAccumulatorList = new ArrayList<>(accs.length); - return new JobVertexAccumulatorsInfo(jobVertex.getJobVertexId().toString(), userAccumulatorList); - } else { - throw new RestHandlerException("There is no accumulator for vertex " + jobVertexID + '.', HttpResponseStatus.NOT_FOUND); + for (StringifiedAccumulatorResult acc : accs) { + userAccumulatorList.add( + new JobVertexAccumulatorsInfo.UserAccumulator( + acc.getName(), + acc.getType(), + acc.getValue())); } + + return new JobVertexAccumulatorsInfo(jobVertex.getJobVertexId().toString(), userAccumulatorList); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java new file mode 100644 index 0000000000000..1669914bb12e2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters; +import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo; +import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Handler of specific sub task execution attempt. + */ +public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemptHandler { + + private final MetricFetcher metricFetcher; + + /** + * Instantiates a new Abstract job vertex handler. + * + * @param localRestAddress the local rest address + * @param leaderRetriever the leader retriever + * @param timeout the timeout + * @param responseHeaders the response headers + * @param messageHeaders the message headers + * @param executionGraphCache the execution graph cache + * @param executor the executor + */ + public SubtaskExecutionAttemptDetailsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + Map responseHeaders, + MessageHeaders messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor, + MetricFetcher metricFetcher) { + + super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor); + + this.metricFetcher = Preconditions.checkNotNull(metricFetcher); + } + + @Override + protected SubtaskExecutionAttemptDetailsInfo handleRequest( + HandlerRequest request, + AccessExecution execution) throws RestHandlerException { + + final ExecutionState status = execution.getState(); + final long now = System.currentTimeMillis(); + + final TaskManagerLocation location = execution.getAssignedResourceLocation(); + final String locationString = location == null ? "(unassigned)" : location.getHostname(); + + long startTime = execution.getStateTimestamp(ExecutionState.DEPLOYING); + if (startTime == 0) { + startTime = -1; + } + final long endTime = status.isTerminal() ? execution.getStateTimestamp(status) : -1; + final long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1; + + final MutableIOMetrics counts = new MutableIOMetrics(); + + final JobID jobID = request.getPathParameter(JobIDPathParameter.class); + final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class); + + counts.addIOMetrics( + execution, + metricFetcher, + jobID.toString(), + jobVertexID.toString() + ); + + final IOMetricsInfo ioMetricsInfo = new IOMetricsInfo( + counts.getNumBytesInLocal() + counts.getNumBytesInRemote(), + counts.isNumBytesInLocalComplete() && counts.isNumBytesInRemoteComplete(), + counts.getNumBytesOut(), + counts.isNumBytesOutComplete(), + counts.getNumRecordsIn(), + counts.isNumRecordsInComplete(), + counts.getNumRecordsOut(), + counts.isNumRecordsOutComplete()); + + return new SubtaskExecutionAttemptDetailsInfo( + execution.getParallelSubtaskIndex(), + status, + execution.getAttemptNumber(), + locationString, + startTime, + endTime, + duration, + ioMetricsInfo + ); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java index bc72e51a89471..29c0f933c4de0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java @@ -20,14 +20,11 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; -import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters; import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.SubtasksTimesInfo; @@ -45,7 +42,7 @@ /** * Request handler for the subtasks times info. */ -public class SubtasksTimesHandler extends AbstractExecutionGraphHandler { +public class SubtasksTimesHandler extends AbstractJobVertexHandler { public SubtasksTimesHandler( CompletableFuture localRestAddress, GatewayRetriever leaderRetriever, @@ -65,9 +62,7 @@ public SubtasksTimesHandler( } @Override - protected SubtasksTimesInfo handleRequest(HandlerRequest request, AccessExecutionGraph executionGraph) { - JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class); - AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID); + protected SubtasksTimesInfo handleRequest(HandlerRequest request, AccessExecutionJobVertex jobVertex) { final String id = jobVertex.getJobVertexId().toString(); final String name = jobVertex.getName(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java index 70606e4229ebf..92bf51e44a76e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java @@ -28,7 +28,7 @@ /** * Base class for request handlers whose response depends on a specific job vertex (defined - * via the "vertexid" parameter) in a specific job, defined via (defined voa the "jobid" parameter). + * via the "vertexid" parameter) in a specific job, defined via (defined via the "jobid" parameter). */ public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionGraphRequestHandler { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java index 551913f621dab..2c74389d96e8d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer; import org.apache.flink.runtime.rest.messages.json.JobIDSerializer; import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer; @@ -217,7 +218,7 @@ public static final class JobVertexDetailsInfo { private final Map tasksPerState; @JsonProperty(FIELD_NAME_JOB_VERTEX_METRICS) - private final JobVertexMetrics jobVertexMetrics; + private final IOMetricsInfo jobVertexMetrics; @JsonCreator public JobVertexDetailsInfo( @@ -229,7 +230,7 @@ public JobVertexDetailsInfo( @JsonProperty(FIELD_NAME_JOB_VERTEX_END_TIME) long endTime, @JsonProperty(FIELD_NAME_JOB_VERTEX_DURATION) long duration, @JsonProperty(FIELD_NAME_TASKS_PER_STATE) Map tasksPerState, - @JsonProperty(FIELD_NAME_JOB_VERTEX_METRICS) JobVertexMetrics jobVertexMetrics) { + @JsonProperty(FIELD_NAME_JOB_VERTEX_METRICS) IOMetricsInfo jobVertexMetrics) { this.jobVertexID = Preconditions.checkNotNull(jobVertexID); this.name = Preconditions.checkNotNull(name); this.parallelism = parallelism; @@ -273,7 +274,7 @@ public Map getTasksPerState() { return tasksPerState; } - public JobVertexMetrics getJobVertexMetrics() { + public IOMetricsInfo getJobVertexMetrics() { return jobVertexMetrics; } @@ -303,93 +304,4 @@ public int hashCode() { } } - /** - * Metrics of a job vertex. - */ - public static final class JobVertexMetrics { - - public static final String FIELD_NAME_BYTES_READ = "read-bytes"; - - public static final String FIELD_NAME_BYTES_READ_COMPLETE = "read-bytes-complete"; - - public static final String FIELD_NAME_BYTES_WRITTEN = "write-bytes"; - - public static final String FIELD_NAME_BYTES_WRITTEN_COMPLETE = "write-bytes-complete"; - - public static final String FIELD_NAME_RECORDS_READ = "read-records"; - - public static final String FIELD_NAME_RECORDS_READ_COMPLETE = "read-records-complete"; - - public static final String FIELD_NAME_RECORDS_WRITTEN = "write-records"; - - public static final String FIELD_NAME_RECORDS_WRITTEN_COMPLETE = "write-records-complete"; - - @JsonProperty(FIELD_NAME_BYTES_READ) - private final long bytesRead; - - @JsonProperty(FIELD_NAME_BYTES_READ_COMPLETE) - private final boolean bytesReadComplete; - - @JsonProperty(FIELD_NAME_BYTES_WRITTEN) - private final long bytesWritten; - - @JsonProperty(FIELD_NAME_BYTES_WRITTEN_COMPLETE) - private final boolean bytesWrittenComplete; - - @JsonProperty(FIELD_NAME_RECORDS_READ) - private final long recordsRead; - - @JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE) - private final boolean recordsReadComplete; - - @JsonProperty(FIELD_NAME_RECORDS_WRITTEN) - private final long recordsWritten; - - @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) - private final boolean recordsWrittenComplete; - - @JsonCreator - public JobVertexMetrics( - @JsonProperty(FIELD_NAME_BYTES_READ) long bytesRead, - @JsonProperty(FIELD_NAME_BYTES_READ_COMPLETE) boolean bytesReadComplete, - @JsonProperty(FIELD_NAME_BYTES_WRITTEN) long bytesWritten, - @JsonProperty(FIELD_NAME_BYTES_WRITTEN_COMPLETE) boolean bytesWrittenComplete, - @JsonProperty(FIELD_NAME_RECORDS_READ) long recordsRead, - @JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE) boolean recordsReadComplete, - @JsonProperty(FIELD_NAME_RECORDS_WRITTEN) long recordsWritten, - @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) boolean recordsWrittenComplete) { - this.bytesRead = bytesRead; - this.bytesReadComplete = bytesReadComplete; - this.bytesWritten = bytesWritten; - this.bytesWrittenComplete = bytesWrittenComplete; - this.recordsRead = recordsRead; - this.recordsReadComplete = recordsReadComplete; - this.recordsWritten = recordsWritten; - this.recordsWrittenComplete = recordsWrittenComplete; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - JobVertexMetrics that = (JobVertexMetrics) o; - return bytesRead == that.bytesRead && - bytesReadComplete == that.bytesReadComplete && - bytesWritten == that.bytesWritten && - bytesWrittenComplete == that.bytesWrittenComplete && - recordsRead == that.recordsRead && - recordsReadComplete == that.recordsReadComplete && - recordsWritten == that.recordsWritten && - recordsWrittenComplete == that.recordsWrittenComplete; - } - - @Override - public int hashCode() { - return Objects.hash(bytesRead, bytesReadComplete, bytesWritten, bytesWrittenComplete, recordsRead, recordsReadComplete, recordsWritten, recordsWrittenComplete); - } - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskAttemptMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskAttemptMessageParameters.java new file mode 100644 index 0000000000000..a14426493eaf9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskAttemptMessageParameters.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.rest.messages.MessagePathParameter; + +import java.util.Arrays; +import java.util.Collection; + +/** + * The type Subtask attempt message parameters. + */ +public class SubtaskAttemptMessageParameters extends SubtaskMessageParameters { + + protected final SubtaskAttemptPathParameter subtaskAttemptPathParameter = new SubtaskAttemptPathParameter(); + + @Override + public Collection> getPathParameters() { + return Arrays.asList( + jobPathParameter, + jobVertexIdPathParameter, + subtaskIndexPathParameter, + subtaskAttemptPathParameter); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskAttemptPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskAttemptPathParameter.java new file mode 100644 index 0000000000000..220a121eac89b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskAttemptPathParameter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.rest.messages.ConversionException; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; + +/** + * Path parameter identifying subtask attempt. + */ +public class SubtaskAttemptPathParameter extends MessagePathParameter { + /** + * The constant subtask attempt KEY. + */ + public static final String KEY = "attempt"; + + /** + * Instantiates a new Subtask attempt path parameter. + */ + protected SubtaskAttemptPathParameter() { + super(KEY); + } + + @Override + protected Integer convertFromString(String value) throws ConversionException { + try { + return Integer.parseInt(value); + } catch (NumberFormatException e) { + throw new ConversionException("Invalid attempt num " + value); + } + } + + @Override + protected String convertToString(Integer value) { + return value.toString(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java new file mode 100644 index 0000000000000..e7bfc43c059bb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for the {@link SubtaskExecutionAttemptDetailsHandler}. + */ +public class SubtaskExecutionAttemptDetailsHeaders implements MessageHeaders { + + private static final SubtaskExecutionAttemptDetailsHeaders INSTANCE = new SubtaskExecutionAttemptDetailsHeaders(); + + public static final String URL = String.format( + "/jobs/:%s/vertices/:%s/subtasks/:%s", + JobIDPathParameter.KEY, + JobVertexIdPathParameter.KEY, + SubtaskIndexPathParameter.KEY); + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public Class getResponseClass() { + return SubtaskExecutionAttemptDetailsInfo.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public SubtaskAttemptMessageParameters getUnresolvedMessageParameters() { + return new SubtaskAttemptMessageParameters(); + } + + public static SubtaskExecutionAttemptDetailsHeaders getInstance() { + return INSTANCE; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java new file mode 100644 index 0000000000000..a34dc104751c0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * The sub task execution attempt response. + */ +public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody { + + public static final String FIELD_NAME_SUBTASK_INDEX = "subtask"; + + public static final String FIELD_NAME_STATUS = "status"; + + public static final String FIELD_NAME_ATTEMPT = "attempt"; + + public static final String FIELD_NAME_HOST = "host"; + + public static final String FIELD_NAME_START_TIME = "start-time"; + + public static final String FIELD_NAME_END_TIME = "end-time"; + + public static final String FIELD_NAME_DURATION = "duration"; + + public static final String FIELD_NAME_METRICS = "metrics"; + + @JsonProperty(FIELD_NAME_SUBTASK_INDEX) + private final int subtaskIndex; + + @JsonProperty(FIELD_NAME_STATUS) + private final ExecutionState status; + + @JsonProperty(FIELD_NAME_ATTEMPT) + private final int attempt; + + @JsonProperty(FIELD_NAME_HOST) + private final String host; + + @JsonProperty(FIELD_NAME_START_TIME) + private final long startTime; + + @JsonProperty(FIELD_NAME_END_TIME) + private final long endTime; + + @JsonProperty(FIELD_NAME_DURATION) + private final long duration; + + @JsonProperty(FIELD_NAME_METRICS) + private final IOMetricsInfo ioMetricsInfo; + + @JsonCreator + public SubtaskExecutionAttemptDetailsInfo( + @JsonProperty(FIELD_NAME_SUBTASK_INDEX) int subtaskIndex, + @JsonProperty(FIELD_NAME_STATUS) ExecutionState status, + @JsonProperty(FIELD_NAME_ATTEMPT) int attempt, + @JsonProperty(FIELD_NAME_HOST) String host, + @JsonProperty(FIELD_NAME_START_TIME) long startTime, + @JsonProperty(FIELD_NAME_END_TIME) long endTime, + @JsonProperty(FIELD_NAME_DURATION) long duration, + @JsonProperty(FIELD_NAME_METRICS) IOMetricsInfo ioMetricsInfo) { + + this.subtaskIndex = subtaskIndex; + this.status = Preconditions.checkNotNull(status); + this.attempt = attempt; + this.host = Preconditions.checkNotNull(host); + this.startTime = startTime; + this.endTime = endTime; + this.duration = duration; + this.ioMetricsInfo = Preconditions.checkNotNull(ioMetricsInfo); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SubtaskExecutionAttemptDetailsInfo that = (SubtaskExecutionAttemptDetailsInfo) o; + + return subtaskIndex == that.subtaskIndex && + status == that.status && + attempt == that.attempt && + Objects.equals(host, that.host) && + startTime == that.startTime && + endTime == that.endTime && + duration == that.duration && + Objects.equals(ioMetricsInfo, that.ioMetricsInfo); + } + + @Override + public int hashCode() { + return Objects.hash(subtaskIndex, status, attempt, host, startTime, endTime, duration, ioMetricsInfo); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskMessageParameters.java new file mode 100644 index 0000000000000..872ff17061999 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskMessageParameters.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter; + +import java.util.Arrays; +import java.util.Collection; + +/** + * Message parameters for subtask REST handlers. + */ +public class SubtaskMessageParameters extends JobVertexMessageParameters { + + protected final SubtaskIndexPathParameter subtaskIndexPathParameter = new SubtaskIndexPathParameter(); + + @Override + public Collection> getPathParameters() { + return Arrays.asList(jobPathParameter, jobVertexIdPathParameter, subtaskIndexPathParameter); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java new file mode 100644 index 0000000000000..d84265d588761 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * IO metrics information. + */ +public final class IOMetricsInfo { + + public static final String FIELD_NAME_BYTES_READ = "read-bytes"; + + public static final String FIELD_NAME_BYTES_READ_COMPLETE = "read-bytes-complete"; + + public static final String FIELD_NAME_BYTES_WRITTEN = "write-bytes"; + + public static final String FIELD_NAME_BYTES_WRITTEN_COMPLETE = "write-bytes-complete"; + + public static final String FIELD_NAME_RECORDS_READ = "read-records"; + + public static final String FIELD_NAME_RECORDS_READ_COMPLETE = "read-records-complete"; + + public static final String FIELD_NAME_RECORDS_WRITTEN = "write-records"; + + public static final String FIELD_NAME_RECORDS_WRITTEN_COMPLETE = "write-records-complete"; + + @JsonProperty(FIELD_NAME_BYTES_READ) + private final long bytesRead; + + @JsonProperty(FIELD_NAME_BYTES_READ_COMPLETE) + private final boolean bytesReadComplete; + + @JsonProperty(FIELD_NAME_BYTES_WRITTEN) + private final long bytesWritten; + + @JsonProperty(FIELD_NAME_BYTES_WRITTEN_COMPLETE) + private final boolean bytesWrittenComplete; + + @JsonProperty(FIELD_NAME_RECORDS_READ) + private final long recordsRead; + + @JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE) + private final boolean recordsReadComplete; + + @JsonProperty(FIELD_NAME_RECORDS_WRITTEN) + private final long recordsWritten; + + @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) + private final boolean recordsWrittenComplete; + + @JsonCreator + public IOMetricsInfo( + @JsonProperty(FIELD_NAME_BYTES_READ) long bytesRead, + @JsonProperty(FIELD_NAME_BYTES_READ_COMPLETE) boolean bytesReadComplete, + @JsonProperty(FIELD_NAME_BYTES_WRITTEN) long bytesWritten, + @JsonProperty(FIELD_NAME_BYTES_WRITTEN_COMPLETE) boolean bytesWrittenComplete, + @JsonProperty(FIELD_NAME_RECORDS_READ) long recordsRead, + @JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE) boolean recordsReadComplete, + @JsonProperty(FIELD_NAME_RECORDS_WRITTEN) long recordsWritten, + @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) boolean recordsWrittenComplete) { + this.bytesRead = bytesRead; + this.bytesReadComplete = bytesReadComplete; + this.bytesWritten = bytesWritten; + this.bytesWrittenComplete = bytesWrittenComplete; + this.recordsRead = recordsRead; + this.recordsReadComplete = recordsReadComplete; + this.recordsWritten = recordsWritten; + this.recordsWrittenComplete = recordsWrittenComplete; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IOMetricsInfo that = (IOMetricsInfo) o; + return bytesRead == that.bytesRead && + bytesReadComplete == that.bytesReadComplete && + bytesWritten == that.bytesWritten && + bytesWrittenComplete == that.bytesWrittenComplete && + recordsRead == that.recordsRead && + recordsReadComplete == that.recordsReadComplete && + recordsWritten == that.recordsWritten && + recordsWrittenComplete == that.recordsWrittenComplete; + } + + @Override + public int hashCode() { + return Objects.hash(bytesRead, bytesReadComplete, bytesWritten, bytesWrittenComplete, recordsRead, recordsReadComplete, recordsWritten, recordsWrittenComplete); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsMessageParameters.java index bdfa00304301f..166766f691f4e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsMessageParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsMessageParameters.java @@ -19,12 +19,10 @@ package org.apache.flink.runtime.rest.messages.job.metrics; import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler; -import org.apache.flink.runtime.rest.messages.JobIDPathParameter; -import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; import org.apache.flink.runtime.rest.messages.MessageParameters; import org.apache.flink.runtime.rest.messages.MessagePathParameter; import org.apache.flink.runtime.rest.messages.MessageQueryParameter; -import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter; +import org.apache.flink.runtime.rest.messages.job.SubtaskMessageParameters; import java.util.Arrays; import java.util.Collection; @@ -33,20 +31,14 @@ /** * {@link MessageParameters} for {@link SubtaskMetricsHandler}. */ -public class SubtaskMetricsMessageParameters extends MessageParameters { - - private final JobIDPathParameter jobIDPathParameter = new JobIDPathParameter(); - - private final JobVertexIdPathParameter jobVertexIdPathParameter = new JobVertexIdPathParameter(); - - private final SubtaskIndexPathParameter subtaskIndexPathParameter = new SubtaskIndexPathParameter(); +public class SubtaskMetricsMessageParameters extends SubtaskMessageParameters { private final MetricsFilterParameter metricsFilterParameter = new MetricsFilterParameter(); @Override public Collection> getPathParameters() { return Collections.unmodifiableCollection(Arrays.asList( - jobIDPathParameter, + jobPathParameter, jobVertexIdPathParameter, subtaskIndexPathParameter )); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index a41f0fa37825a..0bc82c00a0d44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.rest.handler.job.JobPlanHandler; import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler; import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler; +import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler; import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler; @@ -72,6 +73,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders; import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders; import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders; +import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsHeaders; import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders; import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders; import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders; @@ -346,6 +348,17 @@ protected List> initiali timeout, responseHeaders); + final SubtaskExecutionAttemptDetailsHandler subtaskExecutionAttemptDetailsHandler = new SubtaskExecutionAttemptDetailsHandler( + restAddressFuture, + leaderRetriever, + timeout, + responseHeaders, + null, + executionGraphCache, + executor, + metricFetcher + ); + final File tmpDir = restConfiguration.getTmpDir(); Optional> optWebContent; @@ -385,6 +398,7 @@ protected List> initiali handlers.add(Tuple2.of(TaskManagerMetricsHeaders.getInstance(), taskManagerMetricsHandler)); handlers.add(Tuple2.of(JobManagerMetricsHeaders.getInstance(), jobManagerMetricsHandler)); handlers.add(Tuple2.of(JobExecutionResultHeaders.getInstance(), jobExecutionResultHandler)); + handlers.add(Tuple2.of(SubtaskExecutionAttemptDetailsHeaders.getInstance(), subtaskExecutionAttemptDetailsHandler)); // This handler MUST be added last, as it otherwise masks all subsequent GET handlers optWebContent.ifPresent( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java new file mode 100644 index 0000000000000..4f32087e2b193 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.DummyJobInformation; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter; +import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters; +import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptPathParameter; +import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo; +import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +import java.util.Collections; +import java.util.HashMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests of {@link SubtaskExecutionAttemptDetailsHandler}. + */ +public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger { + + @Test + public void testHandleRequest() throws Exception { + + // Prepare the execution graph. + final JobID jobID = new JobID(); + + final ExecutionGraph executionGraph = new ExecutionGraph( + new DummyJobInformation(jobID, "job name"), + mock(ScheduledExecutorService.class), + mock(Executor.class), + Time.milliseconds(100), + new NoRestartStrategy(), + new RestartAllStrategy.Factory(), + mock(SlotProvider.class), + ExecutionGraph.class.getClassLoader(), + VoidBlobWriter.getInstance() + ); + + final JobVertex jobVertex = new JobVertex("MockVertex"); + jobVertex.setParallelism(128); + jobVertex.setInvokableClass(AbstractInvokable.class); + + executionGraph.attachJobGraph(Collections.singletonList(jobVertex)); + + // The testing subtask. + final int subtaskIndex = 1; + final ExecutionState expectedState = ExecutionState.SCHEDULED; + + // Change some fields so we can make it different from other sub tasks. + Execution execution = executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[subtaskIndex].getCurrentExecutionAttempt(); + Whitebox.setInternalState(execution, "state", expectedState); + + // Mock the metric fetcher. + final MetricFetcher metricFetcher = mock(MetricFetcher.class); + final MetricStore metricStore = mock(MetricStore.class); + final MetricStore.ComponentMetricStore componentMetricStore = mock(MetricStore.ComponentMetricStore.class); + + final long bytesInLocal = 1; + final long bytesInRemote = 2; + final long bytesOut = 10; + final long recordsIn = 20; + final long recordsOut = 30; + + when(componentMetricStore.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL)).thenReturn(Long.toString(bytesInLocal)); + when(componentMetricStore.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE)).thenReturn(Long.toString(bytesInRemote)); + when(componentMetricStore.getMetric(MetricNames.IO_NUM_BYTES_OUT)).thenReturn(Long.toString(bytesOut)); + when(componentMetricStore.getMetric(MetricNames.IO_NUM_RECORDS_IN)).thenReturn(Long.toString(recordsIn)); + when(componentMetricStore.getMetric(MetricNames.IO_NUM_RECORDS_OUT)).thenReturn(Long.toString(recordsOut)); + + when(metricStore.getSubtaskMetricStore(jobID.toString(), jobVertex.getID().toString(), subtaskIndex)) + .thenReturn(componentMetricStore); + when(metricFetcher.getMetricStore()).thenReturn(metricStore); + + // Instance the handler. + final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(new Configuration()); + + final SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler( + CompletableFuture.completedFuture("127.0.0.1:9527"), + mock(GatewayRetriever.class), + Time.milliseconds(100), + restHandlerConfiguration.getResponseHeaders(), + null, + new ExecutionGraphCache( + restHandlerConfiguration.getTimeout(), + Time.milliseconds(restHandlerConfiguration.getRefreshInterval())), + mock(Executor.class), + metricFetcher + ); + + final int attempt = 0; + + final HandlerRequest request = new HandlerRequest<>( + EmptyRequestBody.getInstance(), + new SubtaskAttemptMessageParameters(), + new HashMap() {{ + put(JobIDPathParameter.KEY, jobID.toString()); + put(JobVertexIdPathParameter.KEY, jobVertex.getID().toString()); + put(SubtaskIndexPathParameter.KEY, Integer.toString(subtaskIndex)); + put(SubtaskAttemptPathParameter.KEY, Integer.toString(attempt)); + }}, + Collections.emptyMap() + ); + + // Handle request. + final SubtaskExecutionAttemptDetailsInfo detailsInfo = handler.handleRequest( + request, + executionGraph.getJobVertex(jobVertex.getID())); + + // Verify + final IOMetricsInfo ioMetricsInfo = new IOMetricsInfo( + bytesInLocal + bytesInRemote, + true, + bytesOut, + true, + recordsIn, + true, + recordsOut, + true + ); + + final SubtaskExecutionAttemptDetailsInfo expectedDetailsInfo = new SubtaskExecutionAttemptDetailsInfo( + subtaskIndex, + expectedState, + attempt, + "(unassigned)", + -1, + -1, + -1, + ioMetricsInfo + ); + + assertEquals(expectedDetailsInfo, detailsInfo); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java index aec86747d2285..808361194c5ec 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase; +import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; import java.util.ArrayList; import java.util.Collection; @@ -79,7 +80,7 @@ protected JobDetailsInfo getTestResponseInstance() throws Exception { private JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(Random random) { final Map tasksPerState = new HashMap<>(ExecutionState.values().length); - final JobDetailsInfo.JobVertexMetrics jobVertexMetrics = new JobDetailsInfo.JobVertexMetrics( + final IOMetricsInfo jobVertexMetrics = new IOMetricsInfo( random.nextLong(), random.nextBoolean(), random.nextLong(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java new file mode 100644 index 0000000000000..ee1f48436b971 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase; +import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; + +import java.util.Random; + +/** + * Tests (un)marshalling of the {@link SubtaskExecutionAttemptDetailsInfo}. + */ +public class SubtaskExecutionAttemptDetailsInfoTest extends RestResponseMarshallingTestBase { + + @Override + protected Class getTestResponseClass() { + return SubtaskExecutionAttemptDetailsInfo.class; + } + + @Override + protected SubtaskExecutionAttemptDetailsInfo getTestResponseInstance() throws Exception { + final Random random = new Random(); + + final IOMetricsInfo ioMetricsInfo = new IOMetricsInfo( + Math.abs(random.nextLong()), + random.nextBoolean(), + Math.abs(random.nextLong()), + random.nextBoolean(), + Math.abs(random.nextLong()), + random.nextBoolean(), + Math.abs(random.nextLong()), + random.nextBoolean() + ); + + return new SubtaskExecutionAttemptDetailsInfo( + Math.abs(random.nextInt()), + ExecutionState.values()[random.nextInt(ExecutionState.values().length)], + Math.abs(random.nextInt()), + "localhost:" + random.nextInt(65536), + Math.abs(random.nextLong()), + Math.abs(random.nextLong()), + Math.abs(random.nextLong()), + ioMetricsInfo + ); + } +} From 50eb6e72293abc02ef17707f3d70e069d2f2a463 Mon Sep 17 00:00:00 2001 From: "biao.liub" Date: Fri, 12 Jan 2018 16:38:55 +0800 Subject: [PATCH 2/2] [FLINK-8368] Add attempts path info that is missing in SubtaskExecutionAttemptDetailsHeaders --- .../messages/job/SubtaskExecutionAttemptDetailsHeaders.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java index e7bfc43c059bb..aa650073ee17b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java @@ -36,10 +36,11 @@ public class SubtaskExecutionAttemptDetailsHeaders implements MessageHeaders