From 9ca421905773f25606d32f0e3f234384e4277b02 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Tue, 24 Feb 2015 20:27:04 +0100 Subject: [PATCH] [web client] Fix webclient config forwarding --- .../org/apache/flink/client/WebFrontend.java | 37 ++-- .../client/web/JobSubmissionServlet.java | 62 ++++--- .../flink/client/web/JobsInfoServlet.java | 170 ------------------ .../flink/client/web/PlanDisplayServlet.java | 10 +- .../flink/client/web/WebInterfaceServer.java | 46 ++--- .../runtime/jobmanager/web/WebInfoServer.java | 4 +- .../runtime/util/EnvironmentInformation.java | 31 +++- .../flink/runtime/jobmanager/JobManager.scala | 18 +- .../runtime/taskmanager/TaskManager.scala | 8 +- .../util/EnvironmentInformationTest.java | 1 + 10 files changed, 128 insertions(+), 259 deletions(-) delete mode 100644 flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java diff --git a/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java index bc88c7bcb60b5..45f4391078246 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java @@ -16,9 +16,9 @@ * limitations under the License. */ - package org.apache.flink.client; +import org.apache.flink.runtime.util.EnvironmentInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.client.web.WebInterfaceServer; @@ -29,7 +29,6 @@ /** * Main entry point for the web frontend. Creates a web server according to the configuration * in the given directory. - * */ public class WebFrontend { /** @@ -38,28 +37,24 @@ public class WebFrontend { private static final Logger LOG = LoggerFactory.getLogger(WebFrontend.class); /** - * Main method. accepts a single parameter, which is the config directory. + * Main method. Accepts a single command line parameter, which is the config directory. * - * @param args - * The parameters to the entry point. + * @param args The command line parameters. */ public static void main(String[] args) { - try { - // get the config directory first - String configDir = null; - if (args.length >= 2 && args[0].equals("--configDir")) { - configDir = args[1]; - } + EnvironmentInformation.logEnvironmentInfo(LOG, "Web Client"); + EnvironmentInformation.checkJavaVersion(); - if (configDir == null) { - System.err - .println("Error: Configuration directory must be specified.\nWebFrontend --configDir \n"); - System.exit(1); - return; - } + // check the arguments + if (args.length < 2 || !args[0].equals("--configDir")) { + LOG.error("Wrong command line arguments. Usage: WebFrontend --configDir "); + System.exit(1); + } + try { // load the global configuration + String configDir = args[1]; GlobalConfiguration.loadConfiguration(configDir); Configuration config = GlobalConfiguration.getConfiguration(); @@ -68,15 +63,17 @@ public static void main(String[] args) { // get the listening port int port = config.getInteger(ConfigConstants.WEB_FRONTEND_PORT_KEY, - ConfigConstants.DEFAULT_WEBCLIENT_PORT); + ConfigConstants.DEFAULT_WEBCLIENT_PORT); // start the server WebInterfaceServer server = new WebInterfaceServer(config, port); LOG.info("Starting web frontend server on port " + port + '.'); server.start(); server.join(); - } catch (Throwable t) { - LOG.error("Unexpected exception: " + t.getMessage(), t); + } + catch (Throwable t) { + LOG.error("Exception while starting the web server: " + t.getMessage(), t); + System.exit(2); } } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java index 62414bf00d499..21cd8e72d4748 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java +++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java @@ -89,11 +89,11 @@ public class JobSubmissionServlet extends HttpServlet { private final Random rand; // random number generator for UID - private final Configuration nepheleConfig; + private final Configuration config; - public JobSubmissionServlet(Configuration nepheleConfig, File jobDir, File planDir) { - this.nepheleConfig = nepheleConfig; + public JobSubmissionServlet(Configuration config, File jobDir, File planDir) { + this.config = config; this.jobStoreDirectory = jobDir; this.planDumpDirectory = planDir; @@ -139,7 +139,7 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws Se } // parse the arguments - List params = null; + List params; try { params = tokenizeArguments(args); } catch (IllegalArgumentException iaex) { @@ -166,7 +166,7 @@ else if (params.get(0).equals("-p")) { } // create the plan - String[] options = params.isEmpty() ? new String[0] : (String[]) params.toArray(new String[params.size()]); + String[] options = params.isEmpty() ? new String[0] : params.toArray(new String[params.size()]); PackagedProgram program; FlinkPlan optPlan; Client client; @@ -178,7 +178,7 @@ else if (params.get(0).equals("-p")) { program = new PackagedProgram(jarFile, assemblerClass, options); } - client = new Client(nepheleConfig, program.getUserCodeClassLoader()); + client = new Client(config, program.getUserCodeClassLoader()); optPlan = client.getOptimizedPlan(program, parallelism); @@ -239,7 +239,7 @@ else if (params.get(0).equals("-p")) { // we have a request to show the plan // create a UID for the job - Long uid = null; + Long uid; do { uid = Math.abs(this.rand.nextLong()); } while (this.submittedJobs.containsKey(uid)); @@ -250,7 +250,8 @@ else if (params.get(0).equals("-p")) { if (optPlan instanceof StreamingPlan) { ((StreamingPlan) optPlan).dumpStreamingPlanAsJSON(jsonFile); - } else { + } + else { PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator(); jsonGen.setEncodeForHTML(true); jsonGen.dumpOptimizerPlanAsJSON((OptimizedPlan) optPlan, jsonFile); @@ -258,16 +259,24 @@ else if (params.get(0).equals("-p")) { // submit the job only, if it should not be suspended if (!suspend) { - try { - client.run(program,(OptimizedPlan) optPlan, false); - } catch (Throwable t) { - LOG.error("Error submitting job to the job-manager.", t); - showErrorPage(resp, t.getMessage()); - return; - } finally { - program.deleteExtractedLibraries(); + if (optPlan instanceof OptimizedPlan) { + try { + client.run(program, (OptimizedPlan) optPlan, false); + } + catch (Throwable t) { + LOG.error("Error submitting job to the job-manager.", t); + showErrorPage(resp, t.getMessage()); + return; + } + finally { + program.deleteExtractedLibraries(); + } } - } else { + else { + throw new RuntimeException("Not implemented for Streaming Job plans"); + } + } + else { try { this.submittedJobs.put(uid, client.getJobGraph(program, optPlan)); } @@ -285,23 +294,27 @@ else if (params.get(0).equals("-p")) { // redirect to the plan display page resp.sendRedirect("showPlan?id=" + uid + "&suspended=" + (suspend ? "true" : "false")); - } else { + } + else { // don't show any plan. directly submit the job and redirect to the - // nephele runtime monitor + // runtime monitor try { client.run(program, parallelism, false); - } catch (Exception ex) { + } + catch (Exception ex) { LOG.error("Error submitting job to the job-manager.", ex); // HACK: Is necessary because Message contains whole stack trace String errorMessage = ex.getMessage().split("\n")[0]; showErrorPage(resp, errorMessage); return; - } finally { + } + finally { program.deleteExtractedLibraries(); } resp.sendRedirect(START_PAGE_URL); } - } else if (action.equals(ACTION_RUN_SUBMITTED_VALUE)) { + } + else if (action.equals(ACTION_RUN_SUBMITTED_VALUE)) { // --------------- run a job that has been submitted earlier, but was ------------------- // --------------- not executed because of a plan display ------------------- @@ -328,9 +341,10 @@ else if (params.get(0).equals("-p")) { // submit the job try { - Client client = new Client(nepheleConfig, getClass().getClassLoader()); + Client client = new Client(config, getClass().getClassLoader()); client.run(job, false); - } catch (Exception ex) { + } + catch (Exception ex) { LOG.error("Error submitting job to the job-manager.", ex); resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); // HACK: Is necessary because Message contains whole stack trace diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java deleted file mode 100644 index 381ee33bce7c9..0000000000000 --- a/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.client.web; - -import java.io.IOException; -import java.io.PrintWriter; -import java.net.InetSocketAddress; -import java.util.Iterator; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.pattern.Patterns; -import akka.util.Timeout; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.jobmanager.JobManager; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs; -import scala.concurrent.Await; -import scala.concurrent.duration.FiniteDuration; -import scala.concurrent.Future; - - -public class JobsInfoServlet extends HttpServlet { - /** - * Serial UID for serialization interoperability. - */ - private static final long serialVersionUID = 558077298726449201L; - - // ------------------------------------------------------------------------ - - private final Configuration config; - - private final ActorSystem system; - - private final FiniteDuration timeout; - - private final ActorRef jobmanager; - - public JobsInfoServlet(Configuration flinkConfig) { - this.config = flinkConfig; - system = ActorSystem.create("JobsInfoServletActorSystem", - AkkaUtils.getDefaultAkkaConfig()); - this.timeout = AkkaUtils.getTimeout(flinkConfig); - - String jmHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); - int jmPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); - - InetSocketAddress address = new InetSocketAddress(jmHost, jmPort); - - Future jobManagerFuture = JobManager.getJobManagerRemoteReferenceFuture(address, system, timeout); - - try { - this.jobmanager = Await.result(jobManagerFuture, timeout); - } catch (Exception ex) { - throw new RuntimeException("Could not find job manager at specified address " + - JobManager.getRemoteJobManagerAkkaURL(address) + "."); - } - } - - @Override - protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - //resp.setContentType("application/json"); - - try { - final Future response = Patterns.ask(jobmanager, - JobManagerMessages.getRequestRunningJobs(), - new Timeout(timeout)); - - Object result = null; - - try { - result = Await.result(response, timeout); - } catch (Exception exception) { - throw new IOException("Could not retrieve the running jobs from the job manager.", - exception); - } - - if(!(result instanceof RunningJobs)) { - throw new RuntimeException("ReqeustRunningJobs requires a response of type " + - "RunningJob. Instead the response is of type " + result.getClass() + "."); - } else { - - final Iterator graphs = ((RunningJobs) result). - asJavaIterable().iterator(); - - resp.setStatus(HttpServletResponse.SC_OK); - PrintWriter wrt = resp.getWriter(); - wrt.write("["); - while(graphs.hasNext()){ - ExecutionGraph graph = graphs.next(); - //Serialize job to json - wrt.write("{"); - wrt.write("\"jobid\": \"" + graph.getJobID() + "\","); - if(graph.getJobName() != null) { - wrt.write("\"jobname\": \"" + graph.getJobName()+"\","); - } - wrt.write("\"status\": \""+ graph.getState() + "\","); - wrt.write("\"time\": " + graph.getStatusTimestamp(graph.getState())); - wrt.write("}"); - //Write seperator between json objects - if(graphs.hasNext()) { - wrt.write(","); - } - } - wrt.write("]"); - } - } catch (Throwable t) { - resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); - resp.getWriter().print(t.getMessage()); - } - } - - protected String escapeString(String str) { - int len = str.length(); - char[] s = str.toCharArray(); - StringBuilder sb = new StringBuilder(); - - for (int i = 0; i < len; i += 1) { - char c = s[i]; - if ((c == '\\') || (c == '"') || (c == '/')) { - sb.append('\\'); - sb.append(c); - } else if (c == '\b') { - sb.append("\\b"); - } else if (c == '\t') { - sb.append("\\t"); - } else if (c == '\n') { - sb.append("
"); - } else if (c == '\f') { - sb.append("\\f"); - } else if (c == '\r') { - sb.append("\\r"); - } else { - if (c < ' ') { - // Unreadable throw away - } else { - sb.append(c); - } - } - } - - return sb.toString(); - } -} diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/PlanDisplayServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/PlanDisplayServlet.java index 1c2663f582e36..9043a2e8af504 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/web/PlanDisplayServlet.java +++ b/flink-clients/src/main/java/org/apache/flink/client/web/PlanDisplayServlet.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.client.web; import java.io.IOException; @@ -27,11 +26,11 @@ import javax.servlet.http.HttpServletRequest; +/** + * Simple servlet that displays the visualization of a data flow plan. + */ public class PlanDisplayServlet extends GUIServletStub { - - /** - * Serial UID for serialization interoperability. - */ + private static final long serialVersionUID = 3610115341264927614L; @@ -83,7 +82,6 @@ public void printPage(PrintWriter writer, Map parameters, Http URI request = new URI(req.getRequestURL().toString()); URI vizURI = new URI(request.getScheme(), null, request.getHost(), runtimeVisualizationPort, null, null, null); this.runtimeVisURL = vizURI.toString(); - System.out.println(this.runtimeVisURL); } catch (URISyntaxException e) { ; // ignore and simply do not forward } diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java b/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java index ad7b6d45234a0..6384c9af0c9da 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java +++ b/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.client.web; import java.io.File; @@ -42,7 +41,7 @@ import org.eclipse.jetty.servlet.ServletHolder; /** - * This class sets up the web-server that serves the web frontend. It instantiates and + * This class sets up the web-server that serves the web client. It instantiates and * configures an embedded jetty server. */ public class WebInterfaceServer { @@ -65,25 +64,22 @@ public class WebInterfaceServer { * It serves the asynchronous requests for the plans and all other static resources, like * static web pages, stylesheets or javascript files. * - * @param nepheleConfig - * The configuration for the nephele job manager. All compiled jobs will be sent - * to the manager described by this configuration. + * @param config + * The configuration for the JobManager. All jobs will be sent + * to the JobManager described by this configuration. * @param port * The port to launch the server on. * @throws IOException * Thrown, if the server setup failed for an I/O related reason. */ - public WebInterfaceServer(Configuration nepheleConfig, int port) - throws IOException { - Configuration config = GlobalConfiguration.getConfiguration(); - + public WebInterfaceServer(Configuration config, int port) throws IOException { // if no explicit configuration is given, use the global configuration - if (nepheleConfig == null) { - nepheleConfig = config; + if (config == null) { + config = GlobalConfiguration.getConfiguration(); } // get base path of Flink installation - String basePath = nepheleConfig.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY,""); + String basePath = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY,""); File tmpDir; File uploadDir; @@ -100,9 +96,7 @@ public WebInterfaceServer(Configuration nepheleConfig, int port) ConfigConstants.DEFAULT_WEB_TMP_DIR); tmpDir = new File(tmpDirPath); - if(tmpDir.isAbsolute()) { - // absolute path, everything all right - } else { + if (!tmpDir.isAbsolute()) { // path relative to base dir tmpDir = new File(basePath+"/"+tmpDirPath); } @@ -111,9 +105,7 @@ public WebInterfaceServer(Configuration nepheleConfig, int port) ConfigConstants.DEFAULT_WEB_JOB_STORAGE_DIR); uploadDir = new File(uploadDirPath); - if(uploadDir.isAbsolute()) { - // absolute path, everything peachy - } else { + if (!uploadDir.isAbsolute()) { // path relative to base dir uploadDir = new File(basePath+"/"+uploadDirPath); } @@ -122,21 +114,19 @@ public WebInterfaceServer(Configuration nepheleConfig, int port) ConfigConstants.DEFAULT_WEB_PLAN_DUMP_DIR); planDumpDir = new File(planDumpDirPath); - if(planDumpDir.isAbsolute()) { - // absolute path, nice and dandy - } else { + if (!planDumpDir.isAbsolute()) { // path relative to base dir planDumpDir = new File(basePath+"/"+planDumpDirPath); } if (LOG.isInfoEnabled()) { - LOG.info("Setting up web frontend server, using web-root directory '" + - webRootDir.toExternalForm() + "'."); + LOG.info("Setting up web client server, using web-root directory '" + + webRootDir.toExternalForm() + "'."); LOG.info("Web frontend server will store temporary files in '" + tmpDir.getAbsolutePath() + "', uploaded jobs in '" + uploadDir.getAbsolutePath() + "', plan-json-dumps in '" + planDumpDir.getAbsolutePath() + "'."); - LOG.info("Web-frontend will submit jobs to nephele job-manager on " + LOG.info("Web client will submit jobs to JobManager at " + config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + ", port " + config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) + "."); @@ -149,18 +139,16 @@ public WebInterfaceServer(Configuration nepheleConfig, int port) checkAndCreateDirectories(uploadDir, true); checkAndCreateDirectories(planDumpDir, true); - int jobManagerWebPort = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT); + int jobManagerWebPort = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, + ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT); // ----- the handlers for the servlets ----- ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS); servletContext.setContextPath("/"); servletContext.addServlet(new ServletHolder(new PactJobJSONServlet(uploadDir)), "/pactPlan"); - servletContext.addServlet(new ServletHolder(new JobsInfoServlet(nepheleConfig)), - "/jobsInfo"); servletContext.addServlet(new ServletHolder(new PlanDisplayServlet(jobManagerWebPort)), "/showPlan"); servletContext.addServlet(new ServletHolder(new JobsServlet(uploadDir, tmpDir, "launch.html")), "/jobs"); - servletContext.addServlet(new ServletHolder(new JobSubmissionServlet(nepheleConfig, uploadDir, planDumpDir)), - "/runJob"); + servletContext.addServlet(new ServletHolder(new JobSubmissionServlet(config, uploadDir, planDumpDir)), "/runJob"); // ----- the hander serving the written pact plans ----- ResourceHandler pactPlanHandler = new ResourceHandler(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java index fcb41cb8f4d93..7f72370967082 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java @@ -99,7 +99,7 @@ public WebInfoServer(Configuration config, ActorRef jobmanager, ActorRef archive URL webRootDir = this.getClass().getClassLoader().getResource(WEB_ROOT_DIR); if(webRootDir == null) { - throw new FileNotFoundException("Cannot start jobmanager web info server. The " + + throw new FileNotFoundException("Cannot start JobManager web info server. The " + "resource " + WEB_ROOT_DIR + " is not included in the jar."); } @@ -110,7 +110,7 @@ public WebInfoServer(Configuration config, ActorRef jobmanager, ActorRef archive } if (LOG.isInfoEnabled()) { - LOG.info("Setting up web info server, using web-root directory" + + LOG.info("Setting up web info server, using web-root directory " + webRootDir.toExternalForm() + "."); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java index d2147e498b27e..1fb6422b330c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java @@ -21,6 +21,7 @@ import java.io.InputStream; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; +import java.util.List; import java.util.Properties; import org.slf4j.Logger; @@ -175,6 +176,22 @@ public static String getJvmStartupOptions() { } } + /** + * Gets the system parameters and environment parameters that were passed to the JVM on startup. + * + * @return The options passed to the JVM on startup. + */ + public static String[] getJvmStartupOptionsArray() { + try { + RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean(); + List options = bean.getInputArguments(); + return options.toArray(new String[options.size()]); + } + catch (Throwable t) { + return new String[0]; + } + } + /** * Gets the directory for temporary files, as returned by the JVM system property "java.io.tmpdir". * @@ -199,7 +216,7 @@ public static void logEnvironmentInfo(Logger log, String componentName) { String user = getUserRunning(); String jvmVersion = getJvmVersion(); - String options = getJvmStartupOptions(); + String[] options = getJvmStartupOptionsArray(); String javaHome = System.getenv("JAVA_HOME"); @@ -210,7 +227,17 @@ public static void logEnvironmentInfo(Logger log, String componentName) { + "Rev:" + rev.commitId + ", " + "Date:" + rev.commitDate + ")"); log.info(" Current user: " + user); log.info(" JVM: " + jvmVersion); - log.info(" Startup Options: " + options); + + if (options.length == 0) { + log.info(" Startup Options: (none)"); + } + else { + log.info(" Startup Options:"); + for (String s: options) { + log.info(" " + s); + } + } + log.info(" Maximum heap size: " + maxHeapMegabytes + " MiBytes"); log.info(" JAVA_HOME: " + (javaHome == null ? "not set" : javaHome)); log.info("--------------------------------------------------------------------------------"); diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 06301153ab84d..415a20cea9fbc 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -645,7 +645,9 @@ object JobManager { runJobManager(configuration, executionMode, listeningHost, listeningPort) } }) - } else { + } + else { + LOG.info("Security is not enabled. Starting non-authenticated JobManager.") runJobManager(configuration, executionMode, listeningHost, listeningPort) } } @@ -679,10 +681,15 @@ object JobManager { LOG.info("Starting JobManager") // Bring up the job manager actor system first, bind it to the given address. - LOG.debug("Starting JobManager actor system") + LOG.info("Starting JobManager actor system at {}:{}", listeningAddress, listeningPort) val jobManagerSystem = try { - AkkaUtils.createActorSystem(configuration, Some((listeningAddress, listeningPort))) + val akkaConfig = AkkaUtils.getAkkaConfig(configuration, + Some((listeningAddress, listeningPort))) + if (LOG.isDebugEnabled) { + LOG.debug("Using akka configuration\n " + akkaConfig) + } + AkkaUtils.createActorSystem(akkaConfig) } catch { case t: Throwable => { @@ -700,11 +707,12 @@ object JobManager { try { // bring up the job manager actor - LOG.debug("Starting JobManager actor") + LOG.info("Starting JobManager actor") val (jobManager, archiver) = startJobManagerActors(configuration, jobManagerSystem) // start a process reaper that watches the JobManager. If the JobManager actor dies, // the process reaper will kill the JVM process (to ensure easy failure detection) + LOG.debug("Starting JobManager process reaper") jobManagerSystem.actorOf( Props(classOf[ProcessReaper], jobManager, LOG, RUNTIME_FAILURE_RETURN_CODE), "JobManager_Process_Reaper") @@ -763,8 +771,8 @@ object JobManager { parser.parse(args, JobManagerCLIConfiguration()) map { config => + LOG.info("Loading configuration from " + config.configDir) GlobalConfiguration.loadConfiguration(config.configDir) - val configuration = GlobalConfiguration.getConfiguration if (config.configDir != null && new File(config.configDir).isDirectory) { diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 4c85e5becb981..7bfa3708b4074 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -806,6 +806,7 @@ object TaskManager { // load the configuration try { + LOG.info("Loading configuration from " + cliConfig.configDir) GlobalConfiguration.loadConfiguration(cliConfig.configDir) GlobalConfiguration.getConfiguration() } @@ -906,7 +907,12 @@ object TaskManager { LOG.info("Starting TaskManager actor system") val taskManagerSystem = try { - AkkaUtils.createActorSystem(configuration, Some((taskManagerHostname, actorSystemPort))) + val akkaConfig = AkkaUtils.getAkkaConfig(configuration, + Some((taskManagerHostname, actorSystemPort))) + if (LOG.isDebugEnabled) { + LOG.debug("Using akka configuration\n " + akkaConfig) + } + AkkaUtils.createActorSystem(akkaConfig) } catch { case t: Throwable => { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java index d9fc6b9c6d0b9..64a676c6d81ac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java @@ -47,6 +47,7 @@ public void testJavaMemory() { public void testEnvironmentMethods() { try { assertNotNull(EnvironmentInformation.getJvmStartupOptions()); + assertNotNull(EnvironmentInformation.getJvmStartupOptionsArray()); assertNotNull(EnvironmentInformation.getJvmVersion()); assertNotNull(EnvironmentInformation.getRevisionInformation()); assertNotNull(EnvironmentInformation.getVersion());