Skip to content

Commit

Permalink
Small changes to make code more consistent.
Browse files Browse the repository at this point in the history
Change System.err to System.out calls for regular flow messages in FlinkYarnSessionCli.
Code style add spaces between if-else and open parentheses and curly braces.
Wrap very long lines in some classes.
Remove unnecessary return statement at the end of a method.
  • Loading branch information
hsaputra committed Mar 20, 2015
1 parent 79000c8 commit 3838dd1
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 46 deletions.
Expand Up @@ -100,11 +100,11 @@ public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {

AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
if(flinkYarnClient == null) {
if (flinkYarnClient == null) {
return null;
}

if(!cmd.hasOption(CONTAINER.getOpt())) { // number of containers is required option!
if (!cmd.hasOption(CONTAINER.getOpt())) { // number of containers is required option!
LOG.error("Missing required argument " + CONTAINER.getOpt());
printUsage();
return null;
Expand All @@ -113,7 +113,7 @@ public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {

// Jar Path
Path localJarPath;
if(cmd.hasOption(FLINK_JAR.getOpt())) {
if (cmd.hasOption(FLINK_JAR.getOpt())) {
String userPath = cmd.getOptionValue(FLINK_JAR.getOpt());
if(!userPath.startsWith("file://")) {
userPath = "file://" + userPath;
Expand All @@ -139,7 +139,7 @@ public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
flinkYarnClient.setFlinkConfigurationObject(flinkConfiguration);
flinkYarnClient.setConfigurationDirectory(confDirPath);
File confFile = new File(confDirPath + File.separator + CONFIG_FILE_NAME);
if(!confFile.exists()) {
if (!confFile.exists()) {
LOG.error("Unable to locate configuration file in "+confFile);
return null;
}
Expand All @@ -149,10 +149,10 @@ public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {

List<File> shipFiles = new ArrayList<File>();
// path to directory to ship
if(cmd.hasOption(SHIP_PATH.getOpt())) {
if (cmd.hasOption(SHIP_PATH.getOpt())) {
String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt());
File shipDir = new File(shipPath);
if(shipDir.isDirectory()) {
if (shipDir.isDirectory()) {
shipFiles = new ArrayList<File>(Arrays.asList(shipDir.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
Expand All @@ -165,19 +165,19 @@ public boolean accept(File dir, String name) {
}

//check if there is a logback or log4j file
if(confDirPath.length() > 0) {
if (confDirPath.length() > 0) {
File logback = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOGBACK_NAME);
if(logback.exists()) {
if (logback.exists()) {
shipFiles.add(logback);
flinkYarnClient.setFlinkLoggingConfigurationPath(new Path(logback.toURI()));
}
File log4j = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOG4J_NAME);
if(log4j.exists()) {
if (log4j.exists()) {
shipFiles.add(log4j);
if(flinkYarnClient.getFlinkLoggingConfigurationPath() != null) {
if (flinkYarnClient.getFlinkLoggingConfigurationPath() != null) {
// this means there is already a logback configuration file --> fail
LOG.warn("The configuration directory ('" + confDirPath + "') contains both LOG4J and Logback configuration files." +
"Please delete or rename one of them.");
LOG.warn("The configuration directory ('" + confDirPath + "') contains both LOG4J and " +
"Logback configuration files. Please delete or rename one of them.");
} // else
flinkYarnClient.setFlinkLoggingConfigurationPath(new Path(log4j.toURI()));
}
Expand All @@ -186,36 +186,37 @@ public boolean accept(File dir, String name) {
flinkYarnClient.setShipFiles(shipFiles);

// queue
if(cmd.hasOption(QUEUE.getOpt())) {
if (cmd.hasOption(QUEUE.getOpt())) {
flinkYarnClient.setQueue(cmd.getOptionValue(QUEUE.getOpt()));
}

// JobManager Memory
if(cmd.hasOption(JM_MEMORY.getOpt())) {
if (cmd.hasOption(JM_MEMORY.getOpt())) {
int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
flinkYarnClient.setJobManagerMemory(jmMemory);
}

// Task Managers memory
if(cmd.hasOption(TM_MEMORY.getOpt())) {
if (cmd.hasOption(TM_MEMORY.getOpt())) {
int tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt()));
flinkYarnClient.setTaskManagerMemory(tmMemory);
}

if(cmd.hasOption(SLOTS.getOpt())) {
if (cmd.hasOption(SLOTS.getOpt())) {
int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt()));
flinkYarnClient.setTaskManagerSlots(slots);
}

String[] dynamicProperties = null;
if(cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) {
if (cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) {
dynamicProperties = cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt());
}
String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties,
CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);

flinkYarnClient.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);

if(cmd.hasOption(DETACHED.getOpt())) {
if (cmd.hasOption(DETACHED.getOpt())) {
detachedMode = true;
flinkYarnClient.setDetachedMode(detachedMode);
}
Expand Down Expand Up @@ -248,11 +249,14 @@ private void printUsage() {
public static AbstractFlinkYarnClient getFlinkYarnClient() {
AbstractFlinkYarnClient yarnClient = null;
try {
Class<? extends AbstractFlinkYarnClient> yarnClientClass = Class.forName("org.apache.flink.yarn.FlinkYarnClient").asSubclass(AbstractFlinkYarnClient.class);
Class<? extends AbstractFlinkYarnClient> yarnClientClass =
Class.forName("org.apache.flink.yarn.FlinkYarnClient").asSubclass(AbstractFlinkYarnClient.class);
yarnClient = InstantiationUtil.instantiate(yarnClientClass, AbstractFlinkYarnClient.class);
}
catch (ClassNotFoundException e) {
System.err.println("Unable to locate the Flink YARN Client. Please ensure that you are using a Flink build with Hadoop2/YARN support. Message: "+e.getMessage());
System.err.println("Unable to locate the Flink YARN Client. " +
"Please ensure that you are using a Flink build with Hadoop2/YARN support. Message: " +
e.getMessage());
e.printStackTrace(System.err);
return null; // make it obvious
}
Expand Down Expand Up @@ -281,21 +285,21 @@ public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster) {
// ------------------ check if there are updates by the cluster -----------

FlinkYarnClusterStatus status = yarnCluster.getClusterStatus();
if(status != null && numTaskmanagers != status.getNumberOfTaskManagers()) {
System.err.println("Number of connected TaskManagers changed to "+status.getNumberOfTaskManagers()+". "
+ "Slots available: "+status.getNumberOfSlots());
if (status != null && numTaskmanagers != status.getNumberOfTaskManagers()) {
System.err.println("Number of connected TaskManagers changed to " +
status.getNumberOfTaskManagers() + ". Slots available: " + status.getNumberOfSlots());
numTaskmanagers = status.getNumberOfTaskManagers();
}

List<String> messages = yarnCluster.getNewMessages();
if(messages != null && messages.size() > 0) {
if (messages != null && messages.size() > 0) {
System.err.println("New messages from the YARN cluster: ");
for(String msg : messages) {
System.err.println(msg);
}
}

if(yarnCluster.hasFailed()) {
if (yarnCluster.hasFailed()) {
System.err.println("The YARN cluster has failed");
yarnCluster.shutdown();
}
Expand All @@ -310,22 +314,21 @@ public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster) {

if (in.ready()) {
String command = in.readLine();
if(command.equals("quit") || command.equals("stop")) {
if (command.equals("quit") || command.equals("stop")) {
break; // leave loop, cli will stop cluster.
} else if(command.equals("help")) {
} else if (command.equals("help")) {
System.err.println(HELP);
} else {
System.err.println("Unknown command '"+command+"'. Showing help: \n"+HELP);
}
}
if(yarnCluster.hasBeenStopped()) {
if (yarnCluster.hasBeenStopped()) {
LOG.info("Stopping interactive command line interface, YARN cluster has been stopped.");
break;
}
}
} catch(Exception e) {
LOG.warn("Exception while running the interactive command line interface", e);
return;
}
}

Expand Down Expand Up @@ -366,7 +369,7 @@ public int run(String[] args) {
}

// Query cluster for metrics
if(cmd.hasOption(QUERY.getOpt())) {
if (cmd.hasOption(QUERY.getOpt())) {
AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
String description = null;
try {
Expand All @@ -381,7 +384,7 @@ public int run(String[] args) {
} else {
AbstractFlinkYarnClient flinkYarnClient = createFlinkYarnClient(cmd);

if(flinkYarnClient == null) {
if (flinkYarnClient == null) {
System.err.println("Error while starting the YARN Client. Please check log output!");
return 1;
}
Expand All @@ -395,32 +398,36 @@ public int run(String[] args) {
return 1;
}
//------------------ Cluster deployed, handle connection details
String jobManagerAddress = yarnCluster.getJobManagerAddress().getHostName() + ":" +yarnCluster.getJobManagerAddress().getPort();
System.err.println("Flink JobManager is now running on " + jobManagerAddress);
System.err.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL());
String jobManagerAddress = yarnCluster.getJobManagerAddress().getHostName() + ":" + yarnCluster.getJobManagerAddress().getPort();
System.out.println("Flink JobManager is now running on " + jobManagerAddress);
System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL());
// file that we write into the conf/ dir containing the jobManager address and the dop.
String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
File yarnPropertiesFile = new File(confDirPath + File.separator + CliFrontend.YARN_PROPERTIES_FILE);

Properties yarnProps = new Properties();
yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress);
if(flinkYarnClient.getTaskManagerSlots() != -1) {
yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DOP, Integer.toString(flinkYarnClient.getTaskManagerSlots() * flinkYarnClient.getTaskManagerCount()) );
if (flinkYarnClient.getTaskManagerSlots() != -1) {
String degreeOfParallelism =
Integer.toString(flinkYarnClient.getTaskManagerSlots() * flinkYarnClient.getTaskManagerCount());
yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DOP, degreeOfParallelism);
}
// add dynamic properties
if(flinkYarnClient.getDynamicPropertiesEncoded() != null) {
yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, flinkYarnClient.getDynamicPropertiesEncoded());
if (flinkYarnClient.getDynamicPropertiesEncoded() != null) {
yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING,
flinkYarnClient.getDynamicPropertiesEncoded());
}
writeYarnProperties(yarnProps, yarnPropertiesFile);

//------------------ Cluster running, let user control it ------------

if(detachedMode) {
if (detachedMode) {
// print info and quit:
LOG.info("The Flink YARN client has been started in detached mode. In order to stop" +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill "+yarnCluster.getApplicationId()+"\n" +
"Please also note that the temporary files of the YARN session in {} will not be removed.", flinkYarnClient.getSessionFilesDir());
"Please also note that the temporary files of the YARN session in {} will not be removed.",
flinkYarnClient.getSessionFilesDir());
} else {
runInteractiveCli(yarnCluster);

Expand All @@ -443,10 +450,9 @@ public int run(String[] args) {
* Utility method for tests.
*/
public void stop() {
if(yarnCluster != null) {
if (yarnCluster != null) {
LOG.info("Command line interface is shutting down the yarnCluster");
yarnCluster.shutdown();
}
}
}

Expand Up @@ -513,7 +513,8 @@ protected void run() throws Exception {
if (this.stub != null) {
// collect the counters from the stub
if (FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext) != null) {
Map<String, Accumulator<?, ?>> accumulators = FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators();
Map<String, Accumulator<?, ?>> accumulators =
FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators();
RegularPactTask.reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks);
}
}
Expand Down
Expand Up @@ -311,10 +311,10 @@ trait ApplicationMasterActor extends ActorLogMessages {
}

private def runningContainerIds(): mutable.MutableList[ContainerId] = {
return runningContainersList map { runningCont => runningCont.getId}
runningContainersList map { runningCont => runningCont.getId}
}
private def allocatedContainerIds(): mutable.MutableList[ContainerId] = {
return allocatedContainersList map { runningCont => runningCont.getId}
allocatedContainersList map { runningCont => runningCont.getId}
}

private def startYarnSession(conf: Configuration,
Expand Down

0 comments on commit 3838dd1

Please sign in to comment.