Skip to content

Commit

Permalink
[FLINK-2358] [dashboard] First part dashboard server backend
Browse files Browse the repository at this point in the history
 - Adds a separate Maven project for easier maintenance. Also allows users to refer to runtime without web libraries.
 - Simple HTTP server based on Netty HTTP (slim dependency, since we use netty anyways)
 - REST URL parsing via Netty Router
 - Abstract stubs for handlers that deal with errors and request/response
 - First set of URL request handlers that produce JSON responses

This closes #677
This closes #623
This closes #297
  • Loading branch information
StephanEwen committed Jul 21, 2015
1 parent 75d1639 commit 44ee1c1
Show file tree
Hide file tree
Showing 48 changed files with 3,201 additions and 136 deletions.
Expand Up @@ -28,11 +28,6 @@
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.GlobalConfiguration;
import org.eclipse.jetty.http.security.Constraint;
import org.eclipse.jetty.security.ConstraintMapping;
import org.eclipse.jetty.security.ConstraintSecurityHandler;
import org.eclipse.jetty.security.HashLoginService;
import org.eclipse.jetty.security.authentication.BasicAuthenticator;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.HandlerList; import org.eclipse.jetty.server.handler.HandlerList;
Expand Down Expand Up @@ -182,32 +177,7 @@ public WebInterfaceServer(Configuration config, int port) throws IOException {
af = null; af = null;
} }
} }
if (af != null) { server.setHandler(handlers);
HashLoginService loginService = new HashLoginService("Flink Query Engine Interface", authFile);
server.addBean(loginService);

Constraint constraint = new Constraint();
constraint.setName(Constraint.__BASIC_AUTH);
constraint.setAuthenticate(true);
constraint.setRoles(new String[] { "user" });

ConstraintMapping mapping = new ConstraintMapping();
mapping.setPathSpec("/*");
mapping.setConstraint(constraint);

ConstraintSecurityHandler sh = new ConstraintSecurityHandler();
sh.addConstraintMapping(mapping);
sh.setAuthenticator(new BasicAuthenticator());
sh.setLoginService(loginService);
sh.setStrict(true);

// set the handers: the server hands the request to the security handler,
// which hands the request to the other handlers when authenticated
sh.setHandler(handlers);
server.setHandler(sh);
} else {
server.setHandler(handlers);
}
} }


/** /**
Expand Down
Expand Up @@ -236,7 +236,7 @@ public final class ConfigConstants {
public static final String YARN_APPLICATION_ATTEMPTS = "yarn.application-attempts"; public static final String YARN_APPLICATION_ATTEMPTS = "yarn.application-attempts";


/** /**
* The heartbeat intervall between the Application Master and the YARN Resource Manager. * The heartbeat interval between the Application Master and the YARN Resource Manager.
* *
* The default value is 5 (seconds). * The default value is 5 (seconds).
*/ */
Expand Down Expand Up @@ -300,16 +300,24 @@ public final class ConfigConstants {
public static final String JOB_MANAGER_WEB_PORT_KEY = "jobmanager.web.port"; public static final String JOB_MANAGER_WEB_PORT_KEY = "jobmanager.web.port";


/** /**
* The config parameter defining the path to the htaccess file protecting the web frontend. * The option that specifies whether to use the new web frontend
*/ */
public static final String JOB_MANAGER_WEB_ACCESS_FILE_KEY = "jobmanager.web.access"; public static final String JOB_MANAGER_NEW_WEB_FRONTEND_KEY = "jobmanager.new-web-frontend";

/**
* The port for the runtime monitor web-frontend server.
*/
public static final String JOB_MANAGER_NEW_WEB_PORT_KEY = "jobmanager.new-web.port";


/** /**
* The config parameter defining the number of archived jobs for the jobmanager * The config parameter defining the number of archived jobs for the jobmanager
*/ */
public static final String JOB_MANAGER_WEB_ARCHIVE_COUNT = "jobmanager.web.history"; public static final String JOB_MANAGER_WEB_ARCHIVE_COUNT = "jobmanager.web.history";


public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.logpath"; public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.logpath";

/** The directory where the web server's static contents is stored */
public static final String JOB_MANAGER_WEB_DOC_ROOT_KEY = "jobmanager.web.docroot";




// ------------------------------ Web Client ------------------------------ // ------------------------------ Web Client ------------------------------
Expand Down Expand Up @@ -605,6 +613,12 @@ public final class ConfigConstants {
*/ */
public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081; public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081;


/**
* The config key for the port of the JobManager new web frontend.
* Setting this value to {@code -1} disables the web frontend.
*/
public static final int DEFAULT_JOB_MANAGER_NEW_WEB_FRONTEND_PORT = 8082;

/** /**
* The default number of archived jobs for the jobmanager * The default number of archived jobs for the jobmanager
*/ */
Expand Down
6 changes: 6 additions & 0 deletions flink-dist/pom.xml
Expand Up @@ -59,6 +59,12 @@ under the License.
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${project.version}</version>
</dependency>

<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-optimizer</artifactId> <artifactId>flink-optimizer</artifactId>
Expand Down
3 changes: 3 additions & 0 deletions flink-dist/src/main/resources/flink-conf.yaml
Expand Up @@ -68,6 +68,9 @@ jobmanager.web.port: 8081


webclient.port: 8080 webclient.port: 8080


# Temporary: Uncomment this to be able to use the new web frontend
#jobmanager.new-web-frontend: true



#============================================================================== #==============================================================================
# Streaming state checkpointing # Streaming state checkpointing
Expand Down
Expand Up @@ -48,6 +48,7 @@
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.runtime.io.network.DataExchangeMode; import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion; import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
Expand Down Expand Up @@ -221,6 +222,9 @@ public JobGraph compileJobGraph(OptimizedPlan program) {
throw new RuntimeException("Config object could not be written to Job Configuration: " + e); throw new RuntimeException("Config object could not be written to Job Configuration: " + e);
} }


String jsonPlan = new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(program);
graph.setJsonPlan(jsonPlan);

// release all references again // release all references again
this.vertices = null; this.vertices = null;
this.chainedTasks = null; this.chainedTasks = null;
Expand Down
108 changes: 108 additions & 0 deletions flink-runtime-web/pom.xml
@@ -0,0 +1,108 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parent</artifactId>
<version>0.10-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

<artifactId>flink-runtime-web</artifactId>
<name>flink-runtime-web</name>

<packaging>jar</packaging>

<dependencies>

<!-- ===================================================
Flink Dependencies
=================================================== -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java-examples</artifactId>
<version>${project.version}</version>
</dependency>

<!-- ===================================================
Dependencies for the Web Server
=================================================== -->

<dependency>
<groupId>tv.cntt</groupId>
<artifactId>netty-router</artifactId>
<version>1.10</version>
</dependency>

<dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
<version>3.18.2-GA</version>
</dependency>

<!-- ===================================================
Dependencies for Actor Calls
=================================================== -->

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_${scala.binary.version}</artifactId>
</dependency>

<!-- ===================================================
Utilities
=================================================== -->

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

</dependencies>

</project>
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor;

import akka.actor.ActorRef;
import akka.pattern.Patterns;
import akka.util.Timeout;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.messages.JobManagerMessages;

import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

import java.util.WeakHashMap;

/**
* Gateway to obtaining an {@link ExecutionGraph} from a source, like JobManager or Archiver.
* <p>
* The holder will cache the ExecutionGraph behind a weak reference, which will be cleared
* at some point once no one else is pointing to the ExecutionGraph.
* Note that while the holder runs in the same JVM as the JobManager or Archive, the reference should
* stay valid.
*/
public class ExecutionGraphHolder {

private final ActorRef source;

private final FiniteDuration timeout;

private final WeakHashMap<JobID, ExecutionGraph> cache = new WeakHashMap<JobID, ExecutionGraph>();


public ExecutionGraphHolder(ActorRef source) {
this(source, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
}

public ExecutionGraphHolder(ActorRef source, FiniteDuration timeout) {
if (source == null || timeout == null) {
throw new NullPointerException();
}
this.source = source;
this.timeout = timeout;
}


public ExecutionGraph getExecutionGraph(JobID jid) {
ExecutionGraph cached = cache.get(jid);
if (cached != null) {
return cached;
}

try {
Timeout to = new Timeout(timeout);
Future<Object> future = Patterns.ask(source, new JobManagerMessages.RequestJob(jid), to);
Object result = Await.result(future, timeout);
if (result instanceof JobManagerMessages.JobNotFound) {
return null;
}
else if (result instanceof JobManagerMessages.JobFound) {
ExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
cache.put(jid, eg);
return eg;
}
else {
throw new RuntimeException("Unknown response from JobManager / Archive: " + result);
}
}
catch (Exception e) {
throw new RuntimeException("Error requesting execution graph", e);
}
}
}

0 comments on commit 44ee1c1

Please sign in to comment.