Skip to content

Commit

Permalink
[FLINK-2358] [dashboard] Add server for static dashboard files (HTML,…
Browse files Browse the repository at this point in the history
… JS, CSS, ...)
  • Loading branch information
StephanEwen committed Jul 21, 2015
1 parent 44ee1c1 commit e86f451
Show file tree
Hide file tree
Showing 5 changed files with 589 additions and 60 deletions.
30 changes: 20 additions & 10 deletions flink-dist/src/main/assemblies/bin.xml
Expand Up @@ -60,33 +60,36 @@ under the License.
</files>

<fileSets>
<!-- copy start scripts -->
<fileSet>
<!-- copy start scripts -->
<directory>src/main/flink-bin/bin</directory>
<outputDirectory>bin</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>

<!-- flink scala shell-->
<fileSet>
<directory>../flink-staging/flink-scala-shell/start-script/</directory>
<outputDirectory>bin</outputDirectory>
<fileMode>755</fileMode>
</fileSet>

<!-- copy yarn start scripts -->
<fileSet>
<!-- copy yarn start scripts -->
<directory>src/main/flink-bin/yarn-bin</directory>
<outputDirectory>bin</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>

<!-- copy default configuration -->
<fileSet>
<!-- copy default configuration -->
<directory>src/main/flink-bin/conf</directory>
<outputDirectory>conf</outputDirectory>
<fileMode>0644</fileMode>
</fileSet>

<!-- create an empty log directory -->
<fileSet>
<!-- create an empty log directory -->
<directory>src/main/flink-bin/</directory>
<outputDirectory>log</outputDirectory>
<fileMode>0644</fileMode>
Expand All @@ -95,8 +98,8 @@ under the License.
</excludes>
</fileSet>

<!-- copy *.txt files -->
<fileSet>
<!-- copy *.txt files -->
<directory>src/main/flink-bin/</directory>
<outputDirectory></outputDirectory>
<fileMode>0644</fileMode>
Expand All @@ -107,8 +110,8 @@ under the License.
</includes>
</fileSet>

<!-- copy JavaDocs -->
<fileSet>
<!-- copy JavaDocs -->
<!-- <directory>../target/apidocs</directory -->
<directory>../target</directory>
<includes>
Expand All @@ -118,6 +121,13 @@ under the License.
<fileMode>0644</fileMode>
</fileSet>

<!-- copy the web documents -->
<fileSet>
<directory>../flink-runtime-web/web-dashboard/web</directory>
<outputDirectory>resources/web-runtime-monitor</outputDirectory>
<fileMode>0644</fileMode>
</fileSet>

<!-- copy the tools -->
<fileSet>
<directory>src/main/flink-bin/tools</directory>
Expand All @@ -136,9 +146,8 @@ under the License.
</excludes>
</fileSet>


<!-- copy jar files of java examples -->
<fileSet>
<!-- copy jar files of java examples -->
<directory>../flink-examples/flink-java-examples/target</directory>
<outputDirectory>examples</outputDirectory>
<fileMode>0644</fileMode>
Expand All @@ -153,10 +162,11 @@ under the License.
<exclude>flink-java-examples-${project.version}-javadoc.jar</exclude>
</excludes>
</fileSet>

<!-- copy python package -->
<fileSet>
<!-- copy python package -->
<directory>../flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python</directory>
<outputDirectory>resources/python/</outputDirectory>
<outputDirectory>resources/python</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
</fileSets>
Expand Down
Expand Up @@ -23,17 +23,19 @@
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.router.BadClientSilencer;
import io.netty.handler.codec.http.router.Handler;
import io.netty.handler.codec.http.router.Router;

import io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
import org.apache.flink.runtime.webmonitor.handlers.ExecutionPlanHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobSummaryHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVerticesOverviewHandler;
Expand All @@ -47,21 +49,33 @@

import scala.concurrent.duration.FiniteDuration;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

/**
* The root component of the web runtime monitor.
*
* <p>The web runtime monitor is based in Netty HTTP. It uses the Netty-Router library to route
* HTTP requests of different paths to different response handlers. In addition, it serves the static
* files of the web frontend, such as HTML, CSS, or JS files.</p>
*/
public class WebRuntimeMonitor implements WebMonitor {

public static final FiniteDuration DEFAULT_REQUEST_TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);

public static final long DEFAULT_REFRESH_INTERVAL = 2000;

public static final long DEFAULT_REFRESH_INTERVAL = 5000;

/** Logger for web frontend startup / shutdown messages */
private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);

/** Teh default path under which the static contents is stored */
private static final String STATIC_CONTENTS_PATH = "resources/web-runtime-monitor";

// ------------------------------------------------------------------------
// ------------------------------------------------------------------------

private final Object startupShutdownLock = new Object();

private final Router router;

Expand All @@ -73,11 +87,34 @@ public class WebRuntimeMonitor implements WebMonitor {


public WebRuntimeMonitor(Configuration config, ActorRef jobManager, ActorRef archive) throws IOException {
// figure out where our static contents is
final String configuredWebRoot = config.getString(ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY, null);
final String flinkRoot = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);

final File webRootDir;
if (configuredWebRoot != null) {
webRootDir = new File(configuredWebRoot);
}
else if (flinkRoot != null) {
webRootDir = new File(flinkRoot, STATIC_CONTENTS_PATH);
}
else {
throw new IllegalConfigurationException("The given configuration provides neither the web-document root ("
+ ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY + "), not the Flink installation root ("
+ ConfigConstants.FLINK_BASE_DIR_PATH_KEY + ").");
}

// validate that the doc root is a valid directory
if (!(webRootDir.exists() && webRootDir.isDirectory() && webRootDir.canRead())) {
throw new IllegalConfigurationException("The path to the static contents (" +
webRootDir.getAbsolutePath() + ") is not a readable directory.");
}

// port configuration
this.configuredPort = config.getInteger(ConfigConstants.JOB_MANAGER_NEW_WEB_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_NEW_WEB_FRONTEND_PORT);
if (this.configuredPort < 0) {
throw new IllegalArgumentException("Web frontend port is " + this.configuredPort);
throw new IllegalArgumentException("Web frontend port is invalid: " + this.configuredPort);
}

ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(jobManager);
Expand All @@ -93,61 +130,70 @@ public WebRuntimeMonitor(Configuration config, ActorRef jobManager, ActorRef arc
.GET("/jobs", handler(new RequestJobIdsHandler(jobManager)))
.GET("/jobs/:jobid", handler(new JobSummaryHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices", handler(new JobVerticesOverviewHandler(currentGraphs)))
.GET("/jobs/:jobid/plan", handler(new ExecutionPlanHandler(currentGraphs)));
.GET("/jobs/:jobid/plan", handler(new ExecutionPlanHandler(currentGraphs)))

// .GET("/running/:jobid/:jobvertex", handler(new ExecutionPlanHandler(currentGraphs)))

// this handler serves all the static contents
.GET("/:*", new StaticFileServerHandler(webRootDir));


}

@Override
public void start() throws Exception {

ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) {
Handler handler = new Handler(router);

ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(handler.name(), handler)
.addLast(new BadClientSilencer());
synchronized (startupShutdownLock) {
if (this.bootstrap != null) {
throw new IllegalStateException("The server has already been started");
}
};

NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();

this.bootstrap = new ServerBootstrap();
this.bootstrap.group(bossGroup, workerGroup)
.childOption(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
.childOption(ChannelOption.SO_KEEPALIVE, java.lang.Boolean.TRUE)
.channel(NioServerSocketChannel.class)
.childHandler(initializer);

Channel ch = this.bootstrap.bind(configuredPort).sync().channel();

InetSocketAddress bindAddress = (InetSocketAddress) ch.localAddress();
String address = bindAddress.getAddress().getHostAddress();
int port = bindAddress.getPort();

LOG.info("Web frontend listening at " + address + ':' + port);

ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) {
Handler handler = new Handler(router);

ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(65536))
.addLast(new ChunkedWriteHandler())
.addLast(handler.name(), handler);
}
};

NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();

this.bootstrap = new ServerBootstrap();
this.bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer);

Channel ch = this.bootstrap.bind(configuredPort).sync().channel();
this.serverChannel = ch;

InetSocketAddress bindAddress = (InetSocketAddress) ch.localAddress();
String address = bindAddress.getAddress().getHostAddress();
int port = bindAddress.getPort();

LOG.info("Web frontend listening at " + address + ':' + port);
}
}

@Override
public void stop() throws Exception {
Channel server = this.serverChannel;
ServerBootstrap bootstrap = this.bootstrap;

if (server != null) {
server.close().awaitUninterruptibly();
this.serverChannel = null;
}

if (bootstrap != null) {
if (bootstrap.group() != null) {
bootstrap.group().shutdownGracefully();
synchronized (startupShutdownLock) {
if (this.serverChannel != null) {
this.serverChannel.close().awaitUninterruptibly();
this.serverChannel = null;
}
if (bootstrap != null) {
if (bootstrap.group() != null) {
bootstrap.group().shutdownGracefully();
}
this.bootstrap = null;
}
this.bootstrap = null;
}
}

Expand All @@ -166,7 +212,6 @@ public int getServerPort() {
return -1;
}


// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
Expand Down

0 comments on commit e86f451

Please sign in to comment.