Skip to content

Commit

Permalink
[web client] Fix webclient config forwarding
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Feb 25, 2015
1 parent c0c4c9f commit 9ca4219
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 259 deletions.
Expand Up @@ -16,9 +16,9 @@
* limitations under the License. * limitations under the License.
*/ */



package org.apache.flink.client; package org.apache.flink.client;


import org.apache.flink.runtime.util.EnvironmentInformation;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.flink.client.web.WebInterfaceServer; import org.apache.flink.client.web.WebInterfaceServer;
Expand All @@ -29,7 +29,6 @@
/** /**
* Main entry point for the web frontend. Creates a web server according to the configuration * Main entry point for the web frontend. Creates a web server according to the configuration
* in the given directory. * in the given directory.
*
*/ */
public class WebFrontend { public class WebFrontend {
/** /**
Expand All @@ -38,28 +37,24 @@ public class WebFrontend {
private static final Logger LOG = LoggerFactory.getLogger(WebFrontend.class); private static final Logger LOG = LoggerFactory.getLogger(WebFrontend.class);


/** /**
* Main method. accepts a single parameter, which is the config directory. * Main method. Accepts a single command line parameter, which is the config directory.
* *
* @param args * @param args The command line parameters.
* The parameters to the entry point.
*/ */
public static void main(String[] args) { public static void main(String[] args) {
try {
// get the config directory first
String configDir = null;


if (args.length >= 2 && args[0].equals("--configDir")) { EnvironmentInformation.logEnvironmentInfo(LOG, "Web Client");
configDir = args[1]; EnvironmentInformation.checkJavaVersion();
}


if (configDir == null) { // check the arguments
System.err if (args.length < 2 || !args[0].equals("--configDir")) {
.println("Error: Configuration directory must be specified.\nWebFrontend --configDir <directory>\n"); LOG.error("Wrong command line arguments. Usage: WebFrontend --configDir <directory>");
System.exit(1); System.exit(1);
return; }
}


try {
// load the global configuration // load the global configuration
String configDir = args[1];
GlobalConfiguration.loadConfiguration(configDir); GlobalConfiguration.loadConfiguration(configDir);
Configuration config = GlobalConfiguration.getConfiguration(); Configuration config = GlobalConfiguration.getConfiguration();


Expand All @@ -68,15 +63,17 @@ public static void main(String[] args) {


// get the listening port // get the listening port
int port = config.getInteger(ConfigConstants.WEB_FRONTEND_PORT_KEY, int port = config.getInteger(ConfigConstants.WEB_FRONTEND_PORT_KEY,
ConfigConstants.DEFAULT_WEBCLIENT_PORT); ConfigConstants.DEFAULT_WEBCLIENT_PORT);


// start the server // start the server
WebInterfaceServer server = new WebInterfaceServer(config, port); WebInterfaceServer server = new WebInterfaceServer(config, port);
LOG.info("Starting web frontend server on port " + port + '.'); LOG.info("Starting web frontend server on port " + port + '.');
server.start(); server.start();
server.join(); server.join();
} catch (Throwable t) { }
LOG.error("Unexpected exception: " + t.getMessage(), t); catch (Throwable t) {
LOG.error("Exception while starting the web server: " + t.getMessage(), t);
System.exit(2);
} }
} }
} }
Expand Up @@ -89,11 +89,11 @@ public class JobSubmissionServlet extends HttpServlet {


private final Random rand; // random number generator for UID private final Random rand; // random number generator for UID


private final Configuration nepheleConfig; private final Configuration config;




public JobSubmissionServlet(Configuration nepheleConfig, File jobDir, File planDir) { public JobSubmissionServlet(Configuration config, File jobDir, File planDir) {
this.nepheleConfig = nepheleConfig; this.config = config;
this.jobStoreDirectory = jobDir; this.jobStoreDirectory = jobDir;
this.planDumpDirectory = planDir; this.planDumpDirectory = planDir;


Expand Down Expand Up @@ -139,7 +139,7 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws Se
} }


// parse the arguments // parse the arguments
List<String> params = null; List<String> params;
try { try {
params = tokenizeArguments(args); params = tokenizeArguments(args);
} catch (IllegalArgumentException iaex) { } catch (IllegalArgumentException iaex) {
Expand All @@ -166,7 +166,7 @@ else if (params.get(0).equals("-p")) {
} }


// create the plan // create the plan
String[] options = params.isEmpty() ? new String[0] : (String[]) params.toArray(new String[params.size()]); String[] options = params.isEmpty() ? new String[0] : params.toArray(new String[params.size()]);
PackagedProgram program; PackagedProgram program;
FlinkPlan optPlan; FlinkPlan optPlan;
Client client; Client client;
Expand All @@ -178,7 +178,7 @@ else if (params.get(0).equals("-p")) {
program = new PackagedProgram(jarFile, assemblerClass, options); program = new PackagedProgram(jarFile, assemblerClass, options);
} }


client = new Client(nepheleConfig, program.getUserCodeClassLoader()); client = new Client(config, program.getUserCodeClassLoader());


optPlan = client.getOptimizedPlan(program, parallelism); optPlan = client.getOptimizedPlan(program, parallelism);


Expand Down Expand Up @@ -239,7 +239,7 @@ else if (params.get(0).equals("-p")) {
// we have a request to show the plan // we have a request to show the plan


// create a UID for the job // create a UID for the job
Long uid = null; Long uid;
do { do {
uid = Math.abs(this.rand.nextLong()); uid = Math.abs(this.rand.nextLong());
} while (this.submittedJobs.containsKey(uid)); } while (this.submittedJobs.containsKey(uid));
Expand All @@ -250,24 +250,33 @@ else if (params.get(0).equals("-p")) {


if (optPlan instanceof StreamingPlan) { if (optPlan instanceof StreamingPlan) {
((StreamingPlan) optPlan).dumpStreamingPlanAsJSON(jsonFile); ((StreamingPlan) optPlan).dumpStreamingPlanAsJSON(jsonFile);
} else { }
else {
PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator(); PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
jsonGen.setEncodeForHTML(true); jsonGen.setEncodeForHTML(true);
jsonGen.dumpOptimizerPlanAsJSON((OptimizedPlan) optPlan, jsonFile); jsonGen.dumpOptimizerPlanAsJSON((OptimizedPlan) optPlan, jsonFile);
} }


// submit the job only, if it should not be suspended // submit the job only, if it should not be suspended
if (!suspend) { if (!suspend) {
try { if (optPlan instanceof OptimizedPlan) {
client.run(program,(OptimizedPlan) optPlan, false); try {
} catch (Throwable t) { client.run(program, (OptimizedPlan) optPlan, false);
LOG.error("Error submitting job to the job-manager.", t); }
showErrorPage(resp, t.getMessage()); catch (Throwable t) {
return; LOG.error("Error submitting job to the job-manager.", t);
} finally { showErrorPage(resp, t.getMessage());
program.deleteExtractedLibraries(); return;
}
finally {
program.deleteExtractedLibraries();
}
} }
} else { else {
throw new RuntimeException("Not implemented for Streaming Job plans");
}
}
else {
try { try {
this.submittedJobs.put(uid, client.getJobGraph(program, optPlan)); this.submittedJobs.put(uid, client.getJobGraph(program, optPlan));
} }
Expand All @@ -285,23 +294,27 @@ else if (params.get(0).equals("-p")) {


// redirect to the plan display page // redirect to the plan display page
resp.sendRedirect("showPlan?id=" + uid + "&suspended=" + (suspend ? "true" : "false")); resp.sendRedirect("showPlan?id=" + uid + "&suspended=" + (suspend ? "true" : "false"));
} else { }
else {
// don't show any plan. directly submit the job and redirect to the // don't show any plan. directly submit the job and redirect to the
// nephele runtime monitor // runtime monitor
try { try {
client.run(program, parallelism, false); client.run(program, parallelism, false);
} catch (Exception ex) { }
catch (Exception ex) {
LOG.error("Error submitting job to the job-manager.", ex); LOG.error("Error submitting job to the job-manager.", ex);
// HACK: Is necessary because Message contains whole stack trace // HACK: Is necessary because Message contains whole stack trace
String errorMessage = ex.getMessage().split("\n")[0]; String errorMessage = ex.getMessage().split("\n")[0];
showErrorPage(resp, errorMessage); showErrorPage(resp, errorMessage);
return; return;
} finally { }
finally {
program.deleteExtractedLibraries(); program.deleteExtractedLibraries();
} }
resp.sendRedirect(START_PAGE_URL); resp.sendRedirect(START_PAGE_URL);
} }
} else if (action.equals(ACTION_RUN_SUBMITTED_VALUE)) { }
else if (action.equals(ACTION_RUN_SUBMITTED_VALUE)) {
// --------------- run a job that has been submitted earlier, but was ------------------- // --------------- run a job that has been submitted earlier, but was -------------------
// --------------- not executed because of a plan display ------------------- // --------------- not executed because of a plan display -------------------


Expand All @@ -328,9 +341,10 @@ else if (params.get(0).equals("-p")) {


// submit the job // submit the job
try { try {
Client client = new Client(nepheleConfig, getClass().getClassLoader()); Client client = new Client(config, getClass().getClassLoader());
client.run(job, false); client.run(job, false);
} catch (Exception ex) { }
catch (Exception ex) {
LOG.error("Error submitting job to the job-manager.", ex); LOG.error("Error submitting job to the job-manager.", ex);
resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
// HACK: Is necessary because Message contains whole stack trace // HACK: Is necessary because Message contains whole stack trace
Expand Down

This file was deleted.

0 comments on commit 9ca4219

Please sign in to comment.