Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-8368] [REST] Migrate SubtaskExecutionAttemptDetailsHandler to new a REST handler #5270

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.job;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

/**
* Base class for request handlers whose response depends on a specific job vertex (defined
* via the "{@link JobVertexIdPathParameter#KEY}" parameter) in a specific job,
* defined via (defined via the "{@link JobIDPathParameter#KEY}" parameter).
*
* @param <R> the response type
* @param <M> the message parameters type
*/
public abstract class AbstractJobVertexHandler<R extends ResponseBody, M extends JobVertexMessageParameters> extends AbstractExecutionGraphHandler<R, M> {

/**
* Instantiates a new Abstract job vertex handler.
*
* @param localRestAddress the local rest address
* @param leaderRetriever the leader retriever
* @param timeout the timeout
* @param responseHeaders the response headers
* @param messageHeaders the message headers
* @param executionGraphCache the execution graph cache
* @param executor the executor
*/
protected AbstractJobVertexHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor) {

super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
}

@Override
protected R handleRequest(
HandlerRequest<EmptyRequestBody, M> request,
AccessExecutionGraph executionGraph) throws RestHandlerException {

final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
final AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);

if (jobVertex == null) {
throw new RestHandlerException("No vertex with ID '" + jobVertexID + "' exists.", HttpResponseStatus.NOT_FOUND);
}

return handleRequest(request, jobVertex);
}

/**
* Called for each request after the corresponding {@link AccessExecutionJobVertex} has been retrieved from the
* {@link AccessExecutionGraph}.
*
* @param request the request
* @param jobVertex the execution job vertex
* @return the response
* @throws RestHandlerException if the handler could not process the request
*/
protected abstract R handleRequest(HandlerRequest<EmptyRequestBody, M> request, AccessExecutionJobVertex jobVertex) throws RestHandlerException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.job;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptPathParameter;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

/**
* Base class for request handlers whose response depends on a specific attempt (defined
* via the "{@link SubtaskAttemptPathParameter#KEY}" of a specific subtask (defined
* via the "{@link SubtaskIndexPathParameter#KEY}" in a specific job vertex, (defined
* via the "{@link JobVertexIdPathParameter#KEY}" parameter) in a specific job,
* defined via (defined via the "{@link JobIDPathParameter#KEY}" parameter).
*
* @param <R> the response type
* @param <M> the message parameters type
*/
public abstract class AbstractSubtaskAttemptHandler<R extends ResponseBody, M extends SubtaskAttemptMessageParameters> extends AbstractSubtaskHandler<R, M>{
/**
* Instantiates a new Abstract job vertex handler.
*
* @param localRestAddress the local rest address
* @param leaderRetriever the leader retriever
* @param timeout the timeout
* @param responseHeaders the response headers
* @param messageHeaders the message headers
* @param executionGraphCache the execution graph cache
* @param executor the executor
*/
protected AbstractSubtaskAttemptHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor) {

super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
}

@Override
protected R handleRequest(HandlerRequest<EmptyRequestBody, M> request, AccessExecutionVertex executionVertex) throws RestHandlerException {
final Integer attemptNumber = request.getPathParameter(SubtaskAttemptPathParameter.class);

final AccessExecution currentAttempt = executionVertex.getCurrentExecutionAttempt();
if (attemptNumber == currentAttempt.getAttemptNumber()) {
return handleRequest(request, currentAttempt);
} else if (attemptNumber >= 0 && attemptNumber < currentAttempt.getAttemptNumber()) {
final AccessExecution execution = executionVertex.getPriorExecutionAttempt(attemptNumber);

if (execution != null) {
return handleRequest(request, execution);
} else {
throw new RestHandlerException("Attempt " + attemptNumber + " not found in subtask " +
executionVertex.getTaskNameWithSubtaskIndex(), HttpResponseStatus.NOT_FOUND);
}
} else {
throw new RestHandlerException("Invalid attempt num " + attemptNumber, HttpResponseStatus.NOT_FOUND);
}
}

/**
* Called for each request after the corresponding {@link AccessExecution} has been retrieved from the
* {@link AccessExecutionVertex}.
*
* @param request the request
* @param execution the execution
* @return the response
* @throws RestHandlerException the rest handler exception
*/
protected abstract R handleRequest(HandlerRequest<EmptyRequestBody, M> request, AccessExecution execution) throws RestHandlerException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.job;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
import org.apache.flink.runtime.rest.messages.job.SubtaskMessageParameters;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;


/**
* Base class for request handlers whose response depends on a specific subtask (defined
* via the "{@link SubtaskIndexPathParameter#KEY}" in a specific job vertex, (defined
* via the "{@link JobVertexIdPathParameter#KEY}" parameter) in a specific job,
* defined via (defined via the "{@link JobIDPathParameter#KEY}" parameter).
*
*
* @param <R> the response type
* @param <M> the message parameters type
*/
public abstract class AbstractSubtaskHandler<R extends ResponseBody, M extends SubtaskMessageParameters> extends AbstractJobVertexHandler<R, M> {

/**
* Instantiates a new Abstract job vertex handler.
*
* @param localRestAddress the local rest address
* @param leaderRetriever the leader retriever
* @param timeout the timeout
* @param responseHeaders the response headers
* @param messageHeaders the message headers
* @param executionGraphCache the execution graph cache
* @param executor the executor
*/
protected AbstractSubtaskHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout, Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor) {

super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
}

@Override
protected R handleRequest(
HandlerRequest<EmptyRequestBody, M> request,
AccessExecutionJobVertex jobVertex) throws RestHandlerException {

final Integer subtaskIndex = request.getPathParameter(SubtaskIndexPathParameter.class);
final AccessExecutionVertex[] executionVertices = jobVertex.getTaskVertices();

if (subtaskIndex >= executionVertices.length || subtaskIndex < 0) {
throw new RestHandlerException("Invalid subtask index for vertex " + jobVertex.getJobVertexId(), HttpResponseStatus.NOT_FOUND);
}

return handleRequest(request, executionVertices[subtaskIndex]);
}

/**
* Called for each request after the corresponding {@link AccessExecutionVertex} has been retrieved from the
* {@link AccessExecutionJobVertex}.
*
* @param request the request
* @param executionVertex the execution vertex
* @return the response
* @throws RestHandlerException the rest handler exception
*/
protected abstract R handleRequest(HandlerRequest<EmptyRequestBody, M> request, AccessExecutionVertex executionVertex) throws RestHandlerException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -185,7 +186,7 @@ private static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(
ejv.getJobVertexId().toString());
}

final JobDetailsInfo.JobVertexMetrics jobVertexMetrics = new JobDetailsInfo.JobVertexMetrics(
final IOMetricsInfo jobVertexMetrics = new IOMetricsInfo(
counts.getNumBytesInLocal() + counts.getNumBytesInRemote(),
counts.isNumBytesInLocalComplete() && counts.isNumBytesInRemoteComplete(),
counts.getNumBytesOut(),
Expand Down