Skip to content

Commit

Permalink
[hotfix] Clean up CliFrontend after removing web client
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Jan 16, 2016
1 parent c0fd36b commit 17fa6a9
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 90 deletions.
125 changes: 49 additions & 76 deletions flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
Expand Up @@ -19,13 +19,14 @@
package org.apache.flink.client; package org.apache.flink.client;


import akka.actor.ActorSystem; import akka.actor.ActorSystem;

import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;

import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.cli.CancelOptions; import org.apache.flink.client.cli.CancelOptions;
import org.apache.flink.client.cli.CliArgsException; import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CliFrontendParser;
Expand Down Expand Up @@ -65,8 +66,10 @@
import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus; import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
import org.apache.flink.util.StringUtils; import org.apache.flink.util.StringUtils;

import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;

import scala.Some; import scala.Some;
import scala.concurrent.Await; import scala.concurrent.Await;
import scala.concurrent.Future; import scala.concurrent.Future;
Expand All @@ -87,6 +90,7 @@
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
Expand All @@ -95,7 +99,6 @@
import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint; import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure; import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure; import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
import static org.apache.flink.runtime.messages.JobManagerMessages.getRequestRunningJobsStatus;


/** /**
* Implementation of a simple command line frontend for executing programs. * Implementation of a simple command line frontend for executing programs.
Expand Down Expand Up @@ -133,6 +136,7 @@ public class CliFrontend {


private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class); private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class);



private final Configuration config; private final Configuration config;


private final FiniteDuration askTimeout; private final FiniteDuration askTimeout;
Expand All @@ -143,12 +147,6 @@ public class CliFrontend {


private AbstractFlinkYarnCluster yarnCluster; private AbstractFlinkYarnCluster yarnCluster;


static boolean webFrontend = false;

private FlinkPlan optimizedPlan;

private PackagedProgram packagedProgram;

/** /**
* *
* @throws Exception Thrown if the configuration directory was not found, the configuration could not * @throws Exception Thrown if the configuration directory was not found, the configuration could not
Expand Down Expand Up @@ -222,9 +220,9 @@ public CliFrontend(String configDir) throws Exception {


// handle the YARN client's dynamic properties // handle the YARN client's dynamic properties
String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING); String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
List<Tuple2<String, String>> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded); Map<String, String> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded);
for (Tuple2<String, String> dynamicProperty : dynamicProperties) { for (Map.Entry<String, String> dynamicProperty : dynamicProperties.entrySet()) {
this.config.setString(dynamicProperty.f0, dynamicProperty.f1); this.config.setString(dynamicProperty.getKey(), dynamicProperty.getValue());
} }
} }


Expand Down Expand Up @@ -408,42 +406,34 @@ protected int info(String[] args) {
LOG.info("Creating program plan dump"); LOG.info("Creating program plan dump");


Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);

FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, parallelism); FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, parallelism);

String jsonPlan = null;
if (flinkPlan instanceof OptimizedPlan) {
jsonPlan = new PlanJSONDumpGenerator().getOptimizerPlanAsJSON((OptimizedPlan) flinkPlan);
} else if (flinkPlan instanceof StreamingPlan) {
jsonPlan = ((StreamingPlan) flinkPlan).getStreamingPlanAsJSON();
}


if (webFrontend) { if (jsonPlan != null) {
this.optimizedPlan = flinkPlan; System.out.println("----------------------- Execution Plan -----------------------");
this.packagedProgram = program; System.out.println(jsonPlan);
} else { System.out.println("--------------------------------------------------------------");
String jsonPlan = null; }
if (flinkPlan instanceof OptimizedPlan) { else {
jsonPlan = new PlanJSONDumpGenerator().getOptimizerPlanAsJSON((OptimizedPlan) flinkPlan); System.out.println("JSON plan could not be generated.");
} else if (flinkPlan instanceof StreamingPlan) { }
jsonPlan = ((StreamingPlan) flinkPlan).getStreamingPlanAsJSON();
}

if (jsonPlan != null) {
System.out.println("----------------------- Execution Plan -----------------------");
System.out.println(jsonPlan);
System.out.println("--------------------------------------------------------------");
}
else {
System.out.println("JSON plan could not be generated.");
}


String description = program.getDescription(); String description = program.getDescription();
if (description != null) { if (description != null) {
System.out.println(); System.out.println();
System.out.println(description); System.out.println(description);
} }
else { else {
System.out.println(); System.out.println();
System.out.println("No description provided."); System.out.println("No description provided.");
}
} }
return 0; return 0;


} }
catch (Throwable t) { catch (Throwable t) {
return handleError(t); return handleError(t);
Expand Down Expand Up @@ -492,7 +482,7 @@ protected int list(String[] args) {


LOG.info("Connecting to JobManager to retrieve list of jobs"); LOG.info("Connecting to JobManager to retrieve list of jobs");
Future<Object> response = jobManagerGateway.ask( Future<Object> response = jobManagerGateway.ask(
getRequestRunningJobsStatus(), JobManagerMessages.getRequestRunningJobsStatus(),
askTimeout); askTimeout);


Object result; Object result;
Expand Down Expand Up @@ -792,10 +782,8 @@ protected int executeProgramDetached(PackagedProgram program, Client client, int
yarnCluster.stopAfterJob(result.getJobID()); yarnCluster.stopAfterJob(result.getJobID());
yarnCluster.disconnect(); yarnCluster.disconnect();
} }


if (!webFrontend) { System.out.println("Job has been submitted with JobID " + result.getJobID());
System.out.println("Job has been submitted with JobID " + result.getJobID());
}


return 0; return 0;
} }
Expand All @@ -816,7 +804,7 @@ protected int executeProgramBlocking(PackagedProgram program, Client client, int


LOG.info("Program execution finished"); LOG.info("Program execution finished");


if (result instanceof JobExecutionResult && !webFrontend) { if (result instanceof JobExecutionResult) {
JobExecutionResult execResult = (JobExecutionResult) result; JobExecutionResult execResult = (JobExecutionResult) result;
System.out.println("Job with JobID " + execResult.getJobID() + " has finished."); System.out.println("Job with JobID " + execResult.getJobID() + " has finished.");
System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms"); System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms");
Expand Down Expand Up @@ -933,7 +921,6 @@ protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws E
* @param options Command line options which contain JobManager address * @param options Command line options which contain JobManager address
* @param programName Program name * @param programName Program name
* @param userParallelism Given user parallelism * @param userParallelism Given user parallelism
* @return
* @throws Exception * @throws Exception
*/ */
protected Client getClient( protected Client getClient(
Expand Down Expand Up @@ -1035,9 +1022,6 @@ protected Client getClient(
* @return The return code for the process. * @return The return code for the process.
*/ */
private int handleArgException(Exception e) { private int handleArgException(Exception e) {
if (webFrontend) {
throw new RuntimeException(e);
}
LOG.error("Invalid command line arguments." + (e.getMessage() == null ? "" : e.getMessage())); LOG.error("Invalid command line arguments." + (e.getMessage() == null ? "" : e.getMessage()));


System.out.println(e.getMessage()); System.out.println(e.getMessage());
Expand All @@ -1053,9 +1037,6 @@ private int handleArgException(Exception e) {
* @return The return code for the process. * @return The return code for the process.
*/ */
private int handleError(Throwable t) { private int handleError(Throwable t) {
if (webFrontend) {
throw new RuntimeException(t);
}
LOG.error("Error while running the command.", t); LOG.error("Error while running the command.", t);


System.err.println(); System.err.println();
Expand All @@ -1080,9 +1061,7 @@ private int handleError(Throwable t) {


private void logAndSysout(String message) { private void logAndSysout(String message) {
LOG.info(message); LOG.info(message);
if (!webFrontend) { System.out.println(message);
System.out.println(message);
}
} }


// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -1117,9 +1096,6 @@ public int parseParameters(String[] args) {
if (SecurityUtils.isSecurityEnabled()) { if (SecurityUtils.isSecurityEnabled()) {
String message = "Secure Hadoop environment setup detected. Running in secure context."; String message = "Secure Hadoop environment setup detected. Running in secure context.";
LOG.info(message); LOG.info(message);
if (!webFrontend) {
System.out.println(message);
}


try { try {
return SecurityUtils.runSecured(new SecurityUtils.FlinkSecuredRunner<Integer>() { return SecurityUtils.runSecured(new SecurityUtils.FlinkSecuredRunner<Integer>() {
Expand Down Expand Up @@ -1165,14 +1141,6 @@ public Integer run() throws Exception {
} }
} }


public FlinkPlan getFlinkPlan() {
return this.optimizedPlan;
}

public PackagedProgram getPackagedProgram() {
return this.packagedProgram;
}

public void shutdown() { public void shutdown() {
ActorSystem sys = this.actorSystem; ActorSystem sys = this.actorSystem;
if (sys != null) { if (sys != null) {
Expand Down Expand Up @@ -1251,20 +1219,25 @@ else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
return location; return location;
} }


public static List<Tuple2<String, String>> getDynamicProperties(String dynamicPropertiesEncoded) { public static Map<String, String> getDynamicProperties(String dynamicPropertiesEncoded) {
List<Tuple2<String, String>> ret = new ArrayList<Tuple2<String, String>>(); if (dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) {
if(dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) { Map<String, String> properties = new HashMap<>();

String[] propertyLines = dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR); String[] propertyLines = dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
for(String propLine : propertyLines) { for (String propLine : propertyLines) {
if(propLine == null) { if (propLine == null) {
continue; continue;
} }

String[] kv = propLine.split("="); String[] kv = propLine.split("=");
if (kv.length >= 2 && kv[0] != null && kv[1] != null && kv[0].length() > 0) { if (kv.length >= 2 && kv[0] != null && kv[1] != null && kv[0].length() > 0) {
ret.add(new Tuple2<String, String>(kv[0], kv[1])); properties.put(kv[0], kv[1]);
} }
} }
return properties;
}
else {
return Collections.emptyMap();
} }
return ret;
} }
} }
Expand Up @@ -22,11 +22,12 @@
import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser; import org.apache.commons.cli.PosixParser;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.CliFrontend; import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.FlinkYarnSessionCli; import org.apache.flink.client.FlinkYarnSessionCli;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.test.util.TestBaseUtils;

import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
Expand All @@ -35,7 +36,6 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;


public class FlinkYarnSessionCliTest { public class FlinkYarnSessionCliTest {
Expand Down Expand Up @@ -69,10 +69,8 @@ public void testDynamicProperties() throws IOException {


Assert.assertNotNull(flinkYarnClient); Assert.assertNotNull(flinkYarnClient);


List<Tuple2<String, String>> dynProperties = CliFrontend.getDynamicProperties(flinkYarnClient.getDynamicPropertiesEncoded()); Map<String, String> dynProperties = CliFrontend.getDynamicProperties(flinkYarnClient.getDynamicPropertiesEncoded());
Assert.assertEquals(1, dynProperties.size()); Assert.assertEquals(1, dynProperties.size());
Assert.assertEquals("akka.ask.timeout", dynProperties.get(0).f0); Assert.assertEquals("5 min", dynProperties.get("akka.ask.timeout"));
Assert.assertEquals("5 min", dynProperties.get(0).f1);
} }

} }
Expand Up @@ -18,14 +18,14 @@


package org.apache.flink.yarn; package org.apache.flink.yarn;


import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.CliFrontend; import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.FlinkYarnSessionCli; import org.apache.flink.client.FlinkYarnSessionCli;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.RecoveryMode; import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
Expand All @@ -50,6 +50,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;

import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -358,9 +359,9 @@ protected AbstractFlinkYarnCluster deployInternal() throws Exception {


// ------------------ Add dynamic properties to local flinkConfiguraton ------ // ------------------ Add dynamic properties to local flinkConfiguraton ------


List<Tuple2<String, String>> dynProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncoded); Map<String, String> dynProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncoded);
for (Tuple2<String, String> dynProperty : dynProperties) { for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
flinkConfiguration.setString(dynProperty.f0, dynProperty.f1); flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
} }


// ------------------ Check if the specified queue exists -------------- // ------------------ Check if the specified queue exists --------------
Expand Down
Expand Up @@ -244,8 +244,8 @@ abstract class ApplicationMasterBase {


import scala.collection.JavaConverters._ import scala.collection.JavaConverters._


for(property <- dynamicProperties.asScala){ for (property <- dynamicProperties.asScala){
output.println(s"${property.f0}: ${property.f1}") output.println(s"${property._1}: ${property._2}")
} }


output.close() output.close()
Expand All @@ -262,8 +262,8 @@ abstract class ApplicationMasterBase {
// add dynamic properties to JobManager configuration. // add dynamic properties to JobManager configuration.
val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString) val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
for(property <- dynamicProperties.asScala){ for (property <- dynamicProperties.asScala){
configuration.setString(property.f0, property.f1) configuration.setString(property._1, property._2)
} }


configuration configuration
Expand Down

0 comments on commit 17fa6a9

Please sign in to comment.