Skip to content

Commit

Permalink
[FLINK-7409] [web] Make WebRuntimeMonitor reactive
Browse files Browse the repository at this point in the history
This commit changes the behaviour of the WebRuntimeMonitor to not longer block serving
threads by waiting on the result of futures. Instead the RequestHandler now returns a
CompletableFuture<FullHttpResponse> which is written out to the Netty channel upon
completion. This will improve the performance of our WebRuntimeMonitor.

This closes #4527.
  • Loading branch information
tillrohrmann committed Sep 3, 2017
1 parent a008303 commit 44facda
Show file tree
Hide file tree
Showing 89 changed files with 1,277 additions and 900 deletions.
Expand Up @@ -36,7 +36,7 @@ public class RetryRejectedExecutionFailureHandler implements ActionRequestFailur

@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
indexer.add(action);
} else {
// rethrow all other failures
Expand Down
Expand Up @@ -30,6 +30,7 @@
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -276,27 +277,27 @@ else if (t instanceof Error) {
}

/**
* Checks whether a throwable chain contains a specific type of exception.
* Checks whether a throwable chain contains a specific type of exception and returns it.
*
* @param throwable the throwable chain to check.
* @param searchType the type of exception to search for in the chain.
* @return True, if the searched type is nested in the throwable, false otherwise.
* @return Optional throwable of the requested type if available, otherwise empty
*/
public static boolean containsThrowable(Throwable throwable, Class<?> searchType) {
public static Optional<Throwable> findThrowable(Throwable throwable, Class<?> searchType) {
if (throwable == null || searchType == null) {
return false;
return Optional.empty();
}

Throwable t = throwable;
while (t != null) {
if (searchType.isAssignableFrom(t.getClass())) {
return true;
return Optional.of(t);
} else {
t = t.getCause();
}
}

return false;
return Optional.empty();
}

/**
Expand Down
Expand Up @@ -31,7 +31,6 @@
import java.util.Optional;
import java.util.WeakHashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -65,26 +64,23 @@ public ExecutionGraphHolder(Time timeout) {
*
* @param jid jobID of the execution graph to be retrieved
* @return Optional ExecutionGraph if it has been retrievable, empty if there has been no ExecutionGraph
* @throws Exception if the ExecutionGraph retrieval failed.
*/
public Optional<AccessExecutionGraph> getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) throws Exception {
public CompletableFuture<Optional<AccessExecutionGraph>> getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) {
AccessExecutionGraph cached = cache.get(jid);
if (cached != null) {
if (cached.getState() == JobStatus.SUSPENDED) {
cache.remove(jid);
} else {
return Optional.of(cached);
return CompletableFuture.completedFuture(Optional.of(cached));
}
}

CompletableFuture<Optional<AccessExecutionGraph>> executionGraphFuture = jobManagerGateway.requestJob(jid, timeout);

Optional<AccessExecutionGraph> result = executionGraphFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
executionGraphFuture.thenAcceptAsync(
optExecutionGraph ->
optExecutionGraph.ifPresent(executionGraph -> cache.put(jid, executionGraph)));

return result.map((executionGraph) -> {
cache.put(jid, executionGraph);

return executionGraph;
});
return executionGraphFuture;
}
}
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
Expand All @@ -45,6 +46,7 @@
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -88,7 +90,7 @@ public String[] getPaths() {

@Override
protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobManagerGateway jobManagerGateway) {
FullHttpResponse response;
CompletableFuture<FullHttpResponse> responseFuture;

try {
// we only pass the first element in the list to the handlers.
Expand All @@ -106,29 +108,41 @@ protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobMana
queryParams.put(WEB_MONITOR_ADDRESS_KEY,
(httpsEnabled ? "https://" : "http://") + address.getHostName() + ":" + address.getPort());

response = handler.handleRequest(pathParams, queryParams, jobManagerGateway);
responseFuture = handler.handleRequest(pathParams, queryParams, jobManagerGateway);
} catch (Exception e) {
responseFuture = FutureUtils.completedExceptionally(e);
}
catch (NotFoundException e) {
// this should result in a 404 error code (not found)
ByteBuf message = e.getMessage() == null ? Unpooled.buffer(0)
: Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING));
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, message);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
LOG.debug("Error while handling request", e);
}
catch (Exception e) {
byte[] bytes = ExceptionUtils.stringifyException(e).getBytes(ENCODING);
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());

LOG.debug("Error while handling request", e);
}

response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, allowOrigin);

KeepAliveWrite.flush(ctx, routed.request(), response);
responseFuture.whenComplete(
(FullHttpResponse httpResponse, Throwable throwable) -> {
final FullHttpResponse finalResponse;

if (throwable != null) {
LOG.debug("Error while handling request.", throwable);

Optional<Throwable> optNotFound = ExceptionUtils.findThrowable(throwable, NotFoundException.class);

if (optNotFound.isPresent()) {
// this should result in a 404 error code (not found)
Throwable e = optNotFound.get();
ByteBuf message = e.getMessage() == null ? Unpooled.buffer(0)
: Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING));
finalResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, message);
finalResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
finalResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, finalResponse.content().readableBytes());
} else {
byte[] bytes = ExceptionUtils.stringifyException(throwable).getBytes(ENCODING);
finalResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
finalResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
finalResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, finalResponse.content().readableBytes());
}
} else {
finalResponse = httpResponse;
}

finalResponse.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, allowOrigin);
KeepAliveWrite.flush(ctx, routed.request(), finalResponse);
});
}
}
Expand Up @@ -232,41 +232,41 @@ public WebRuntimeMonitor(

Router router = new Router();
// config how to interact with this web server
get(router, new DashboardConfigHandler(cfg.getRefreshInterval()));
get(router, new DashboardConfigHandler(executor, cfg.getRefreshInterval()));

// the overview - how many task managers, slots, free slots, ...
get(router, new ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT));
get(router, new ClusterOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT));

// job manager configuration
get(router, new JobManagerConfigHandler(config));
get(router, new JobManagerConfigHandler(executor, config));

// overview over jobs
get(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, true));
get(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, false));
get(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, false, true));

get(router, new CurrentJobIdsHandler(DEFAULT_REQUEST_TIMEOUT));

get(router, new JobDetailsHandler(currentGraphs, metricFetcher));

get(router, new JobVertexDetailsHandler(currentGraphs, metricFetcher));
get(router, new SubtasksTimesHandler(currentGraphs));
get(router, new JobVertexTaskManagersHandler(currentGraphs, metricFetcher));
get(router, new JobVertexAccumulatorsHandler(currentGraphs));
get(router, new JobVertexBackPressureHandler(currentGraphs, backPressureStatsTracker, refreshInterval));
get(router, new JobVertexMetricsHandler(metricFetcher));
get(router, new SubtasksAllAccumulatorsHandler(currentGraphs));
get(router, new SubtaskCurrentAttemptDetailsHandler(currentGraphs, metricFetcher));
get(router, new SubtaskExecutionAttemptDetailsHandler(currentGraphs, metricFetcher));
get(router, new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs));

get(router, new JobPlanHandler(currentGraphs));
get(router, new JobConfigHandler(currentGraphs));
get(router, new JobExceptionsHandler(currentGraphs));
get(router, new JobAccumulatorsHandler(currentGraphs));
get(router, new JobMetricsHandler(metricFetcher));

get(router, new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher));
get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, true, true));
get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, true, false));
get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, false, true));

get(router, new CurrentJobIdsHandler(executor, DEFAULT_REQUEST_TIMEOUT));

get(router, new JobDetailsHandler(currentGraphs, executor, metricFetcher));

get(router, new JobVertexDetailsHandler(currentGraphs, executor, metricFetcher));
get(router, new SubtasksTimesHandler(currentGraphs, executor));
get(router, new JobVertexTaskManagersHandler(currentGraphs, executor, metricFetcher));
get(router, new JobVertexAccumulatorsHandler(currentGraphs, executor));
get(router, new JobVertexBackPressureHandler(currentGraphs, executor, backPressureStatsTracker, refreshInterval));
get(router, new JobVertexMetricsHandler(executor, metricFetcher));
get(router, new SubtasksAllAccumulatorsHandler(currentGraphs, executor));
get(router, new SubtaskCurrentAttemptDetailsHandler(currentGraphs, executor, metricFetcher));
get(router, new SubtaskExecutionAttemptDetailsHandler(currentGraphs, executor, metricFetcher));
get(router, new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs, executor));

get(router, new JobPlanHandler(currentGraphs, executor));
get(router, new JobConfigHandler(currentGraphs, executor));
get(router, new JobExceptionsHandler(currentGraphs, executor));
get(router, new JobAccumulatorsHandler(currentGraphs, executor));
get(router, new JobMetricsHandler(executor, metricFetcher));

get(router, new TaskManagersHandler(executor, DEFAULT_REQUEST_TIMEOUT, metricFetcher));
get(router,
new TaskManagerLogHandler(
retriever,
Expand All @@ -287,7 +287,7 @@ public WebRuntimeMonitor(
config,
enableSSL,
blobView));
get(router, new TaskManagerMetricsHandler(metricFetcher));
get(router, new TaskManagerMetricsHandler(executor, metricFetcher));

router
// log and stdout
Expand All @@ -299,48 +299,48 @@ public WebRuntimeMonitor(
new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, logFiles.stdOutFile,
enableSSL));

get(router, new JobManagerMetricsHandler(metricFetcher));
get(router, new JobManagerMetricsHandler(executor, metricFetcher));

// Cancel a job via GET (for proper integration with YARN this has to be performed via GET)
get(router, new JobCancellationHandler(timeout));
get(router, new JobCancellationHandler(executor, timeout));
// DELETE is the preferred way of canceling a job (Rest-conform)
delete(router, new JobCancellationHandler(timeout));
delete(router, new JobCancellationHandler(executor, timeout));

get(router, triggerHandler);
get(router, inProgressHandler);

// stop a job via GET (for proper integration with YARN this has to be performed via GET)
get(router, new JobStoppingHandler(timeout));
get(router, new JobStoppingHandler(executor, timeout));
// DELETE is the preferred way of stopping a job (Rest-conform)
delete(router, new JobStoppingHandler(timeout));
delete(router, new JobStoppingHandler(executor, timeout));

int maxCachedEntries = config.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries);

// Register the checkpoint stats handlers
get(router, new CheckpointStatsHandler(currentGraphs));
get(router, new CheckpointConfigHandler(currentGraphs));
get(router, new CheckpointStatsDetailsHandler(currentGraphs, cache));
get(router, new CheckpointStatsDetailsSubtasksHandler(currentGraphs, cache));
get(router, new CheckpointStatsHandler(currentGraphs, executor));
get(router, new CheckpointConfigHandler(currentGraphs, executor));
get(router, new CheckpointStatsDetailsHandler(currentGraphs, executor, cache));
get(router, new CheckpointStatsDetailsSubtasksHandler(currentGraphs, executor, cache));

if (webSubmitAllow) {
// fetch the list of uploaded jars.
get(router, new JarListHandler(uploadDir));
get(router, new JarListHandler(executor, uploadDir));

// get plan for an uploaded jar
get(router, new JarPlanHandler(uploadDir));
get(router, new JarPlanHandler(executor, uploadDir));

// run a jar
post(router, new JarRunHandler(uploadDir, timeout, config));
post(router, new JarRunHandler(executor, uploadDir, timeout, config));

// upload a jar
post(router, new JarUploadHandler(uploadDir));
post(router, new JarUploadHandler(executor, uploadDir));

// delete an uploaded jar from submission interface
delete(router, new JarDeleteHandler(uploadDir));
delete(router, new JarDeleteHandler(executor, uploadDir));
} else {
// send an Access Denied message
JarAccessDeniedHandler jad = new JarAccessDeniedHandler();
JarAccessDeniedHandler jad = new JarAccessDeniedHandler(executor);
get(router, jad);
post(router, jad);
delete(router, jad);
Expand Down
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
Expand All @@ -28,6 +30,8 @@

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

/**
* Base class for request handlers whose response depends on an ExecutionGraph
Expand All @@ -37,12 +41,16 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR

private final ExecutionGraphHolder executionGraphHolder;

public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder) {
public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
super(executor);
this.executionGraphHolder = Preconditions.checkNotNull(executionGraphHolder);
}

@Override
public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
public CompletableFuture<String> handleJsonRequest(
Map<String, String> pathParams,
Map<String, String> queryParams,
JobManagerGateway jobManagerGateway) {
String jidString = pathParams.get("jobid");
if (jidString == null) {
throw new RuntimeException("JobId parameter missing");
Expand All @@ -53,21 +61,20 @@ public String handleJsonRequest(Map<String, String> pathParams, Map<String, Stri
jid = JobID.fromHexString(jidString);
}
catch (Exception e) {
throw new RuntimeException("Invalid JobID string '" + jidString + "': " + e.getMessage());
return FutureUtils.completedExceptionally(new FlinkException("Invalid JobID string '" + jidString + "'", e));
}

final Optional<AccessExecutionGraph> optGraph;
final CompletableFuture<Optional<AccessExecutionGraph>> graphFuture = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway);

try {
optGraph = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway);
} catch (Exception e) {
throw new FlinkException("Could not retrieve ExecutionGraph for job with jobId " + jid + " from the JobManager.", e);
}

final AccessExecutionGraph graph = optGraph.orElseThrow(() -> new NotFoundException("Could not find job with jobId " + jid + '.'));

return handleRequest(graph, pathParams);
return graphFuture.thenComposeAsync(
(Optional<AccessExecutionGraph> optGraph) -> {
if (optGraph.isPresent()) {
return handleRequest(optGraph.get(), pathParams);
} else {
throw new FlinkFutureException(new NotFoundException("Could not find job with jobId " + jid + '.'));
}
}, executor);
}

public abstract String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception;
public abstract CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params);
}

0 comments on commit 44facda

Please sign in to comment.