Skip to content

Commit

Permalink
[FLINK-8367] Migrate SubtaskCurrentAttemptDetailsHandler to new a RES…
Browse files Browse the repository at this point in the history
…T handler

This closes #5287.
  • Loading branch information
biao.liub authored and tillrohrmann committed Jan 14, 2018
1 parent dc9a4f2 commit de30d16
Show file tree
Hide file tree
Showing 8 changed files with 448 additions and 127 deletions.
Expand Up @@ -57,11 +57,11 @@ public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter by
}

public IOMetrics(
int numBytesInLocal,
int numBytesInRemote,
int numBytesOut,
int numRecordsIn,
int numRecordsOut,
long numBytesInLocal,
long numBytesInRemote,
long numBytesOut,
long numRecordsIn,
long numRecordsOut,
double numBytesInLocalPerSecond,
double numBytesInRemotePerSecond,
double numBytesOutPerSecond,
Expand Down
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.job;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.SubtaskMessageParameters;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.Preconditions;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

/**
* Request handler providing details about a single task execution attempt.
*/
public class SubtaskCurrentAttemptDetailsHandler extends AbstractSubtaskHandler<SubtaskExecutionAttemptDetailsInfo, SubtaskMessageParameters> {

private final MetricFetcher<?> metricFetcher;

public SubtaskCurrentAttemptDetailsHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, SubtaskExecutionAttemptDetailsInfo, SubtaskMessageParameters> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor,
MetricFetcher<?> metricFetcher) {

super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);

this.metricFetcher = Preconditions.checkNotNull(metricFetcher);
}

@Override
protected SubtaskExecutionAttemptDetailsInfo handleRequest(
HandlerRequest<EmptyRequestBody, SubtaskMessageParameters> request,
AccessExecutionVertex executionVertex) throws RestHandlerException {

final AccessExecution execution = executionVertex.getCurrentExecutionAttempt();

final MutableIOMetrics ioMetrics = new MutableIOMetrics();

final JobID jobID = request.getPathParameter(JobIDPathParameter.class);
final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);

ioMetrics.addIOMetrics(
execution,
metricFetcher,
jobID.toString(),
jobVertexID.toString()
);

return SubtaskExecutionAttemptDetailsInfo.create(execution, ioMetrics);
}
}
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
Expand All @@ -34,8 +33,6 @@
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -82,50 +79,18 @@ protected SubtaskExecutionAttemptDetailsInfo handleRequest(
HandlerRequest<EmptyRequestBody, SubtaskAttemptMessageParameters> request,
AccessExecution execution) throws RestHandlerException {

final ExecutionState status = execution.getState();
final long now = System.currentTimeMillis();

final TaskManagerLocation location = execution.getAssignedResourceLocation();
final String locationString = location == null ? "(unassigned)" : location.getHostname();

long startTime = execution.getStateTimestamp(ExecutionState.DEPLOYING);
if (startTime == 0) {
startTime = -1;
}
final long endTime = status.isTerminal() ? execution.getStateTimestamp(status) : -1;
final long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1;

final MutableIOMetrics counts = new MutableIOMetrics();
final MutableIOMetrics ioMetrics = new MutableIOMetrics();

final JobID jobID = request.getPathParameter(JobIDPathParameter.class);
final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);

counts.addIOMetrics(
ioMetrics.addIOMetrics(
execution,
metricFetcher,
jobID.toString(),
jobVertexID.toString()
);

final IOMetricsInfo ioMetricsInfo = new IOMetricsInfo(
counts.getNumBytesInLocal() + counts.getNumBytesInRemote(),
counts.isNumBytesInLocalComplete() && counts.isNumBytesInRemoteComplete(),
counts.getNumBytesOut(),
counts.isNumBytesOutComplete(),
counts.getNumRecordsIn(),
counts.isNumRecordsInComplete(),
counts.getNumRecordsOut(),
counts.isNumRecordsOutComplete());

return new SubtaskExecutionAttemptDetailsInfo(
execution.getParallelSubtaskIndex(),
status,
execution.getAttemptNumber(),
locationString,
startTime,
endTime,
duration,
ioMetricsInfo
);
return SubtaskExecutionAttemptDetailsInfo.create(execution, ioMetrics);
}
}
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages.job;

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.job.SubtaskCurrentAttemptDetailsHandler;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

/**
* Message headers for the {@link SubtaskCurrentAttemptDetailsHandler}.
*/
public class SubtaskCurrentAttemptDetailsHeaders implements MessageHeaders<EmptyRequestBody, SubtaskExecutionAttemptDetailsInfo, SubtaskMessageParameters> {

private static final SubtaskCurrentAttemptDetailsHeaders INSTANCE = new SubtaskCurrentAttemptDetailsHeaders();

public static final String URL = String.format(
"/jobs/:%s/vertices/:%s/subtasks/:%s",
JobIDPathParameter.KEY,
JobVertexIdPathParameter.KEY,
SubtaskIndexPathParameter.KEY);

@Override
public HttpMethodWrapper getHttpMethod() {
return HttpMethodWrapper.GET;
}

@Override
public String getTargetRestEndpointURL() {
return URL;
}

@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
}

@Override
public Class<SubtaskExecutionAttemptDetailsInfo> getResponseClass() {
return SubtaskExecutionAttemptDetailsInfo.class;
}

@Override
public HttpResponseStatus getResponseStatusCode() {
return HttpResponseStatus.OK;
}

@Override
public SubtaskMessageParameters getUnresolvedMessageParameters() {
return new SubtaskMessageParameters();
}

public static SubtaskCurrentAttemptDetailsHeaders getInstance() {
return INSTANCE;
}
}
Expand Up @@ -19,8 +19,11 @@
package org.apache.flink.runtime.rest.messages.job;

import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
Expand Down Expand Up @@ -151,4 +154,40 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(subtaskIndex, status, attempt, host, startTime, endTime, duration, ioMetricsInfo);
}

public static SubtaskExecutionAttemptDetailsInfo create(AccessExecution execution, MutableIOMetrics ioMetrics) {
final ExecutionState status = execution.getState();
final long now = System.currentTimeMillis();

final TaskManagerLocation location = execution.getAssignedResourceLocation();
final String locationString = location == null ? "(unassigned)" : location.getHostname();

long startTime = execution.getStateTimestamp(ExecutionState.DEPLOYING);
if (startTime == 0) {
startTime = -1;
}
final long endTime = status.isTerminal() ? execution.getStateTimestamp(status) : -1;
final long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1;

final IOMetricsInfo ioMetricsInfo = new IOMetricsInfo(
ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote(),
ioMetrics.isNumBytesInLocalComplete() && ioMetrics.isNumBytesInRemoteComplete(),
ioMetrics.getNumBytesOut(),
ioMetrics.isNumBytesOutComplete(),
ioMetrics.getNumRecordsIn(),
ioMetrics.isNumRecordsInComplete(),
ioMetrics.getNumRecordsOut(),
ioMetrics.isNumRecordsOutComplete());

return new SubtaskExecutionAttemptDetailsInfo(
execution.getParallelSubtaskIndex(),
status,
execution.getAttemptNumber(),
locationString,
startTime,
endTime,
duration,
ioMetricsInfo
);
}
}
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
import org.apache.flink.runtime.rest.handler.job.SubtaskCurrentAttemptDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
Expand Down Expand Up @@ -76,6 +77,7 @@
import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
import org.apache.flink.runtime.rest.messages.job.SubtaskCurrentAttemptDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
Expand Down Expand Up @@ -389,6 +391,17 @@ protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initiali
executor
);

final SubtaskCurrentAttemptDetailsHandler subtaskCurrentAttemptDetailsHandler = new SubtaskCurrentAttemptDetailsHandler(
restAddressFuture,
leaderRetriever,
timeout,
responseHeaders,
null,
executionGraphCache,
executor,
metricFetcher
);

final File tmpDir = restConfiguration.getTmpDir();

Optional<StaticFileServerHandler<T>> optWebContent;
Expand Down Expand Up @@ -432,6 +445,7 @@ protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initiali
handlers.add(Tuple2.of(SavepointStatusHeaders.getInstance(), savepointStatusHandler));
handlers.add(Tuple2.of(SubtaskExecutionAttemptDetailsHeaders.getInstance(), subtaskExecutionAttemptDetailsHandler));
handlers.add(Tuple2.of(SubtaskExecutionAttemptAccumulatorsHeaders.getInstance(), subtaskExecutionAttemptAccumulatorsHandler));
handlers.add(Tuple2.of(SubtaskCurrentAttemptDetailsHeaders.getInstance(), subtaskCurrentAttemptDetailsHandler));

// This handler MUST be added last, as it otherwise masks all subsequent GET handlers
optWebContent.ifPresent(
Expand Down

0 comments on commit de30d16

Please sign in to comment.