Skip to content

Commit

Permalink
[FLINK-7458] Generalize GatewayRetriever for WebRuntimeMonitor
Browse files Browse the repository at this point in the history
Introduce a generalized GatewayRetriever replacing the JobManagerRetriever. The
GatewayRetriever fulfills the same purpose as the JobManagerRetriever with the
ability to retrieve the gateway for an arbitrary endpoint type.

This closes #4549.
  • Loading branch information
tillrohrmann committed Sep 18, 2017
1 parent 1269f75 commit 6ad0d35
Show file tree
Hide file tree
Showing 29 changed files with 734 additions and 262 deletions.
Expand Up @@ -19,22 +19,18 @@
package org.apache.flink.runtime.webmonitor; package org.apache.flink.runtime.webmonitor;


import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler; import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExceptionUtils;


import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;


Expand All @@ -43,12 +39,12 @@


import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URLDecoder; import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;


import static org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils.ENCODING;
import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkNotNull;


/** /**
Expand All @@ -62,8 +58,6 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {


private static final Logger LOG = LoggerFactory.getLogger(RuntimeMonitorHandler.class); private static final Logger LOG = LoggerFactory.getLogger(RuntimeMonitorHandler.class);


private static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;

public static final String WEB_MONITOR_ADDRESS_KEY = "web.monitor.address"; public static final String WEB_MONITOR_ADDRESS_KEY = "web.monitor.address";


private final RequestHandler handler; private final RequestHandler handler;
Expand All @@ -73,7 +67,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
public RuntimeMonitorHandler( public RuntimeMonitorHandler(
WebMonitorConfig cfg, WebMonitorConfig cfg,
RequestHandler handler, RequestHandler handler,
JobManagerRetriever retriever, GatewayRetriever<JobManagerGateway> retriever,
CompletableFuture<String> localJobManagerAddressFuture, CompletableFuture<String> localJobManagerAddressFuture,
Time timeout, Time timeout,
boolean httpsEnabled) { boolean httpsEnabled) {
Expand Down Expand Up @@ -124,18 +118,9 @@ protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobMana


if (optNotFound.isPresent()) { if (optNotFound.isPresent()) {
// this should result in a 404 error code (not found) // this should result in a 404 error code (not found)
Throwable e = optNotFound.get(); finalResponse = HandlerRedirectUtils.getResponse(HttpResponseStatus.NOT_FOUND, optNotFound.get().getMessage());
ByteBuf message = e.getMessage() == null ? Unpooled.buffer(0)
: Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING));
finalResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, message);
finalResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
finalResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, finalResponse.content().readableBytes());
} else { } else {
byte[] bytes = ExceptionUtils.stringifyException(throwable).getBytes(ENCODING); finalResponse = HandlerRedirectUtils.getErrorResponse(throwable);
finalResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
finalResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
finalResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, finalResponse.content().readableBytes());
} }
} else { } else {
finalResponse = httpResponse; finalResponse = httpResponse;
Expand Down
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils; import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler; import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;


import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
Expand All @@ -31,6 +31,9 @@
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
Expand All @@ -46,7 +49,9 @@
@ChannelHandler.Sharable @ChannelHandler.Sharable
public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHandler<Routed> { public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHandler<Routed> {


private final JobManagerRetriever retriever; private final Logger logger = LoggerFactory.getLogger(getClass());

private final GatewayRetriever<JobManagerGateway> retriever;


protected final CompletableFuture<String> localJobManagerAddressFuture; protected final CompletableFuture<String> localJobManagerAddressFuture;


Expand All @@ -58,7 +63,7 @@ public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHand
protected String localJobManagerAddress; protected String localJobManagerAddress;


public RuntimeMonitorHandlerBase( public RuntimeMonitorHandlerBase(
JobManagerRetriever retriever, GatewayRetriever<JobManagerGateway> retriever,
CompletableFuture<String> localJobManagerAddressFuture, CompletableFuture<String> localJobManagerAddressFuture,
Time timeout, Time timeout,
boolean httpsEnabled) { boolean httpsEnabled) {
Expand All @@ -83,19 +88,38 @@ protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exc
localJobManagerAddress = localJobManagerAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); localJobManagerAddress = localJobManagerAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
} }


Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow(); Optional<JobManagerGateway> optJobManagerGateway = retriever.getNow();


if (optJobManagerGateway.isPresent()) { if (optJobManagerGateway.isPresent()) {
JobManagerGateway jobManagerGateway = optJobManagerGateway.get(); JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
String redirectAddress = HandlerRedirectUtils.getRedirectAddress( Optional<CompletableFuture<String>> optRedirectAddress = HandlerRedirectUtils.getRedirectAddress(
localJobManagerAddress, jobManagerGateway, timeout); localJobManagerAddress,

jobManagerGateway,
if (redirectAddress != null) { timeout);
HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path(),
httpsEnabled); if (optRedirectAddress.isPresent()) {
KeepAliveWrite.flush(ctx, routed.request(), redirect); optRedirectAddress.get().whenComplete(
} (String redirectAddress, Throwable throwable) -> {
else { HttpResponse response;

if (throwable != null) {
logger.error("Could not retrieve the redirect address.", throwable);
response = HandlerRedirectUtils.getErrorResponse(throwable);
} else {
try {
response = HandlerRedirectUtils.getRedirectResponse(
redirectAddress,
routed.path(),
httpsEnabled);
} catch (Exception e) {
logger.error("Could not create the redirect response.", e);
response = HandlerRedirectUtils.getErrorResponse(e);
}
}

KeepAliveWrite.flush(ctx, routed.request(), response);
});
} else {
respondAsLeader(ctx, routed, jobManagerGateway); respondAsLeader(ctx, routed, jobManagerGateway);
} }
} else { } else {
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.configuration.WebOptions; import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.blob.BlobView; import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler; import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
Expand Down Expand Up @@ -70,7 +71,7 @@
import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler; import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler; import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler;
import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
import org.apache.flink.util.FileUtils; import org.apache.flink.util.FileUtils;
Expand Down Expand Up @@ -117,7 +118,7 @@ public class WebRuntimeMonitor implements WebMonitor {
private final LeaderRetrievalService leaderRetrievalService; private final LeaderRetrievalService leaderRetrievalService;


/** Service which retrieves the currently leading JobManager and opens a JobManagerGateway. */ /** Service which retrieves the currently leading JobManager and opens a JobManagerGateway. */
private final JobManagerRetriever retriever; private final LeaderGatewayRetriever<JobManagerGateway> retriever;


private final SSLContext serverSSLContext; private final SSLContext serverSSLContext;


Expand Down Expand Up @@ -146,7 +147,7 @@ public WebRuntimeMonitor(
Configuration config, Configuration config,
LeaderRetrievalService leaderRetrievalService, LeaderRetrievalService leaderRetrievalService,
BlobView blobView, BlobView blobView,
JobManagerRetriever jobManagerRetriever, LeaderGatewayRetriever<JobManagerGateway> jobManagerRetriever,
MetricQueryServiceRetriever queryServiceRetriever, MetricQueryServiceRetriever queryServiceRetriever,
Time timeout, Time timeout,
Executor executor) throws IOException, InterruptedException { Executor executor) throws IOException, InterruptedException {
Expand Down Expand Up @@ -292,7 +293,11 @@ public WebRuntimeMonitor(
router router
// log and stdout // log and stdout
.GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") : .GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") :
new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, logFiles.logFile, new StaticFileServerHandler(
retriever,
jobManagerAddressFuture,
timeout,
logFiles.logFile,
enableSSL)) enableSSL))


.GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") : .GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") :
Expand Down
Expand Up @@ -27,9 +27,9 @@
*****************************************************************************/ *****************************************************************************/


import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils; import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;


import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
Expand Down Expand Up @@ -97,7 +97,7 @@
* example.</p> * example.</p>
*/ */
@ChannelHandler.Sharable @ChannelHandler.Sharable
public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed> { public class StaticFileServerHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {


/** Default logger, if none is specified. */ /** Default logger, if none is specified. */
private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class); private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
Expand All @@ -113,7 +113,7 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------


private final JobManagerRetriever retriever; private final GatewayRetriever<T> retriever;


private final CompletableFuture<String> localJobManagerAddressFuture; private final CompletableFuture<String> localJobManagerAddressFuture;


Expand All @@ -131,7 +131,7 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
private String localJobManagerAddress; private String localJobManagerAddress;


public StaticFileServerHandler( public StaticFileServerHandler(
JobManagerRetriever retriever, GatewayRetriever<T> retriever,
CompletableFuture<String> localJobManagerAddressPromise, CompletableFuture<String> localJobManagerAddressPromise,
Time timeout, Time timeout,
File rootPath, File rootPath,
Expand All @@ -141,7 +141,7 @@ public StaticFileServerHandler(
} }


public StaticFileServerHandler( public StaticFileServerHandler(
JobManagerRetriever retriever, GatewayRetriever<T> retriever,
CompletableFuture<String> localJobManagerAddressFuture, CompletableFuture<String> localJobManagerAddressFuture,
Time timeout, Time timeout,
File rootPath, File rootPath,
Expand All @@ -168,31 +168,47 @@ public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Except
} }


final HttpRequest request = routed.request(); final HttpRequest request = routed.request();
String requestPath = routed.path(); final String requestPath;


// make sure we request the "index.html" in case there is a directory request // make sure we request the "index.html" in case there is a directory request
if (requestPath.endsWith("/")) { if (routed.path().endsWith("/")) {
requestPath = requestPath + "index.html"; requestPath = routed.path() + "index.html";
} }

// in case the files being accessed are logs or stdout files, find appropriate paths. // in case the files being accessed are logs or stdout files, find appropriate paths.
if (requestPath.equals("/jobmanager/log") || requestPath.equals("/jobmanager/stdout")) { else if (routed.path().equals("/jobmanager/log") || routed.path().equals("/jobmanager/stdout")) {
requestPath = ""; requestPath = "";
} else {
requestPath = routed.path();
} }


Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow(); Optional<T> optLeader = retriever.getNow();


if (optJobManagerGateway.isPresent()) { if (optLeader.isPresent()) {
// Redirect to leader if necessary // Redirect to leader if necessary
String redirectAddress = HandlerRedirectUtils.getRedirectAddress( Optional<CompletableFuture<String>> optRedirectAddress = HandlerRedirectUtils.getRedirectAddress(
localJobManagerAddress, optJobManagerGateway.get(), timeout); localJobManagerAddress,

optLeader.get(),
if (redirectAddress != null) { timeout);
HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(
redirectAddress, requestPath, httpsEnabled); if (optRedirectAddress.isPresent()) {
KeepAliveWrite.flush(ctx, routed.request(), redirect); optRedirectAddress.get().whenComplete(
} (String address, Throwable throwable) -> {
else { if (throwable != null) {
logger.error("Failed to obtain redirect address.", throwable);
sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
} else {
try {
HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(
address, requestPath, httpsEnabled);

KeepAliveWrite.flush(ctx, routed.request(), redirect);
} catch (Exception e) {
logger.error("Failed to send redirect response.", e);
sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}
});
} else {
respondAsLeader(ctx, request, requestPath); respondAsLeader(ctx, request, requestPath);
} }
} }
Expand Down

0 comments on commit 6ad0d35

Please sign in to comment.