Skip to content

Commit

Permalink
[FLINK-7528] Create DispatcherRestEndpoint and integrate with Dispatcher
Browse files Browse the repository at this point in the history
This commit creates the DispatcherRestEndpoint and integrates it with the
Dispatcher. The DispatcherRestEndpoint is created in the SessionClusterEntrypoint
and its address is passed to the Dispatcher such that it can answer the
requestRestAddress RPC.

This closes #4598.
  • Loading branch information
tillrohrmann committed Sep 19, 2017
1 parent 75e84e0 commit 6a62f14
Show file tree
Hide file tree
Showing 47 changed files with 586 additions and 434 deletions.
Expand Up @@ -282,22 +282,7 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie
ioExecutor,
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);

// 1: the JobManager
LOG.debug("Starting JobManager actor");

// we start the JobManager with its standard name
ActorRef jobManager = JobManager.startJobManagerActors(
config,
actorSystem,
futureExecutor,
ioExecutor,
highAvailabilityServices,
Option.apply(JobMaster.JOB_MANAGER_NAME),
Option.apply(JobMaster.ARCHIVE_NAME),
getJobManagerClass(),
getArchivistClass())._1();

// 2: the web monitor
// 1: the web monitor
LOG.debug("Starting Web Frontend");

Time webMonitorTimeout = Time.milliseconds(config.getLong(WebOptions.TIMEOUT));
Expand All @@ -309,13 +294,28 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie
new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout),
webMonitorTimeout,
futureExecutor,
AkkaUtils.getAkkaURL(actorSystem, jobManager),
LOG);
if (webMonitor != null) {
final URL webMonitorURL = new URL("http", appMasterHostname, webMonitor.getServerPort(), "/");
final URL webMonitorURL = new URL(webMonitor.getRestAddress());
mesosConfig.frameworkInfo().setWebuiUrl(webMonitorURL.toExternalForm());
}

// 2: the JobManager
LOG.debug("Starting JobManager actor");

// we start the JobManager with its standard name
ActorRef jobManager = JobManager.startJobManagerActors(
config,
actorSystem,
futureExecutor,
ioExecutor,
highAvailabilityServices,
webMonitor != null ? Option.apply(webMonitor.getRestAddress()) : Option.empty(),
Option.apply(JobMaster.JOB_MANAGER_NAME),
Option.apply(JobMaster.ARCHIVE_NAME),
getJobManagerClass(),
getArchivistClass())._1();

// 3: Flink's Mesos ResourceManager
LOG.debug("Starting Mesos Flink Resource Manager");

Expand Down
Expand Up @@ -66,7 +66,8 @@ class MesosJobManager(
submittedJobGraphs : SubmittedJobGraphStore,
checkpointRecoveryFactory : CheckpointRecoveryFactory,
jobRecoveryTimeout: FiniteDuration,
metricsRegistry: Option[FlinkMetricRegistry])
metricsRegistry: Option[FlinkMetricRegistry],
optRestAddress: Option[String])
extends ContaineredJobManager(
flinkConfiguration,
futureExecutor,
Expand All @@ -82,7 +83,8 @@ class MesosJobManager(
submittedJobGraphs,
checkpointRecoveryFactory,
jobRecoveryTimeout,
metricsRegistry) {
metricsRegistry,
optRestAddress) {

val jobPollingInterval: FiniteDuration = 5 seconds

Expand Down
Expand Up @@ -123,7 +123,7 @@ public class WebRuntimeMonitor implements WebMonitor {

private final SSLContext serverSSLContext;

private final CompletableFuture<String> jobManagerAddressFuture = new CompletableFuture<>();
private final CompletableFuture<String> localRestAddress = new CompletableFuture<>();

private final Time timeout;

Expand Down Expand Up @@ -273,7 +273,7 @@ public WebRuntimeMonitor(
new TaskManagerLogHandler(
retriever,
executor,
jobManagerAddressFuture,
localRestAddress,
timeout,
TaskManagerLogHandler.FileMode.LOG,
config,
Expand All @@ -283,7 +283,7 @@ public WebRuntimeMonitor(
new TaskManagerLogHandler(
retriever,
executor,
jobManagerAddressFuture,
localRestAddress,
timeout,
TaskManagerLogHandler.FileMode.STDOUT,
config,
Expand All @@ -296,13 +296,13 @@ public WebRuntimeMonitor(
.GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") :
new StaticFileServerHandler<>(
retriever,
jobManagerAddressFuture,
localRestAddress,
timeout,
logFiles.logFile,
enableSSL))

.GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") :
new StaticFileServerHandler<>(retriever, jobManagerAddressFuture, timeout, logFiles.stdOutFile,
new StaticFileServerHandler<>(retriever, localRestAddress, timeout, logFiles.stdOutFile,
enableSSL));

get(router, new JobManagerMetricsHandler(executor, metricFetcher));
Expand Down Expand Up @@ -355,7 +355,7 @@ public WebRuntimeMonitor(
// this handler serves all the static contents
router.GET("/:*", new StaticFileServerHandler<>(
retriever,
jobManagerAddressFuture,
localRestAddress,
timeout,
webRootDir,
enableSSL));
Expand All @@ -377,6 +377,8 @@ public void run() {
}

this.netty = new WebFrontendBootstrap(router, LOG, uploadDir, serverSSLContext, configuredAddress, configuredPort, config);

localRestAddress.complete(netty.getRestAddress());
}

/**
Expand Down Expand Up @@ -420,11 +422,8 @@ public static JsonArchivist[] getJsonArchivists() {
}

@Override
public void start(String jobManagerAkkaUrl) throws Exception {
LOG.info("Starting with JobManager {} on port {}", jobManagerAkkaUrl, getServerPort());

public void start() throws Exception {
synchronized (startupShutdownLock) {
jobManagerAddressFuture.complete(jobManagerAkkaUrl);
leaderRetrievalService.start(retriever);

long delay = backPressureStatsTracker.getCleanUpInterval();
Expand Down Expand Up @@ -466,6 +465,11 @@ public int getServerPort() {
return netty.getServerPort();
}

@Override
public String getRestAddress() {
return netty.getRestAddress();
}

private void cleanup() {
if (!cleanedUp.compareAndSet(false, true)) {
return;
Expand Down Expand Up @@ -526,7 +530,7 @@ private static <T extends ChannelInboundHandler & WebHandler> void post(Router r
// ------------------------------------------------------------------------

private RuntimeMonitorHandler handler(RequestHandler handler) {
return new RuntimeMonitorHandler(cfg, handler, retriever, jobManagerAddressFuture, timeout,
return new RuntimeMonitorHandler(cfg, handler, retriever, localRestAddress, timeout,
serverSSLContext != null);
}

Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.utils;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.webmonitor.HttpRequestHandler;
import org.apache.flink.runtime.webmonitor.PipelineErrorHandler;
Expand All @@ -43,7 +44,9 @@
import javax.net.ssl.SSLEngine;

import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;

/**
* This classes encapsulates the boot-strapping of netty for the web-frontend.
Expand All @@ -55,6 +58,7 @@ public class WebFrontendBootstrap {
private final SSLContext serverSSLContext;
private final ServerBootstrap bootstrap;
private final Channel serverChannel;
private final String restAddress;

public WebFrontendBootstrap(
Router router,
Expand All @@ -63,7 +67,7 @@ public WebFrontendBootstrap(
SSLContext sslContext,
String configuredAddress,
int configuredPort,
final Configuration config) throws InterruptedException {
final Configuration config) throws InterruptedException, UnknownHostException {
this.router = Preconditions.checkNotNull(router);
this.log = Preconditions.checkNotNull(log);
this.uploadDir = directory;
Expand Down Expand Up @@ -110,10 +114,21 @@ protected void initChannel(SocketChannel ch) {
this.serverChannel = ch.sync().channel();

InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
String address = bindAddress.getAddress().getHostAddress();

InetAddress inetAddress = bindAddress.getAddress();
final String address;

if (inetAddress.isAnyLocalAddress()) {
address = config.getString(JobManagerOptions.ADDRESS, InetAddress.getLocalHost().getHostName());
} else {
address = inetAddress.getHostAddress();
}

int port = bindAddress.getPort();

this.log.info("Web frontend listening at {}" + ':' + "{}", address, port);

this.restAddress = address + ':' + port;
}

public ServerBootstrap getBootstrap() {
Expand All @@ -134,6 +149,10 @@ public int getServerPort() {
return -1;
}

public String getRestAddress() {
return restAddress;
}

public void shutdown() {
if (this.serverChannel != null) {
this.serverChannel.close().awaitUninterruptibly();
Expand Down
Expand Up @@ -66,8 +66,7 @@ public class RedirectHandlerTest extends TestLogger {
@Test
public void testRedirectHandler() throws Exception {
final String restPath = "/testing";
final String correctAddress = "foobar";
final String incorrectAddres = "barfoo";
final String correctAddress = "foobar:21345";
final String redirectionAddress = "foobar:12345";
final String expectedRedirection = "http://" + redirectionAddress + restPath;

Expand All @@ -78,11 +77,10 @@ public void testRedirectHandler() throws Exception {
final GatewayRetriever<RestfulGateway> gatewayRetriever = mock(GatewayRetriever.class);

final RestfulGateway redirectionGateway = mock(RestfulGateway.class);
when(redirectionGateway.getAddress()).thenReturn(incorrectAddres);
when(redirectionGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(redirectionAddress));

final RestfulGateway localGateway = mock(RestfulGateway.class);
when(localGateway.getAddress()).thenReturn(correctAddress);
when(localGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(correctAddress));

when(gatewayRetriever.getNow()).thenReturn(Optional.empty(), Optional.of(redirectionGateway), Optional.of(localGateway));

Expand All @@ -103,12 +101,12 @@ public void testRedirectHandler() throws Exception {
configuration);

try (HttpTestClient httpClient = new HttpTestClient("localhost", bootstrap.getServerPort())) {
// 1. without completed local address future --> Service unavailable
// 1. without completed local address future --> Internal server error
httpClient.sendGetRequest(restPath, FutureUtils.toFiniteDuration(timeout));

HttpTestClient.SimpleHttpResponse response = httpClient.getNextResponse(FutureUtils.toFiniteDuration(timeout));

Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE, response.getStatus());
Assert.assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, response.getStatus());

// 2. with completed local address future but no leader gateway available --> Service unavailable
localAddressFuture.complete(correctAddress);
Expand Down

0 comments on commit 6a62f14

Please sign in to comment.