From 6c886ade7c02840423552025f5d5da535ae40d02 Mon Sep 17 00:00:00 2001 From: Yangze Guo Date: Thu, 9 Jan 2020 13:16:01 +0800 Subject: [PATCH] [FLINK-15488][runtime] Obtain JVM and TM params correctly This avoids that log messages to stdout intermingle with BashJavaUtils' computation result which is also printed to stdout. This closes #10804. --- flink-dist/src/main/flink-bin/bin/config.sh | 20 ++++++++++++---- .../src/main/flink-bin/bin/taskmanager.sh | 8 +++++++ .../flink/dist/BashJavaUtilsITCase.java | 23 ++++++++++++++----- .../apache/flink/dist/JavaBashTestBase.java | 11 ++------- .../flink/runtime/util/BashJavaUtils.java | 6 +++-- 5 files changed, 46 insertions(+), 22 deletions(-) diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh index af4b15f374795..10bb8ac4db3e0 100755 --- a/flink-dist/src/main/flink-bin/bin/config.sh +++ b/flink-dist/src/main/flink-bin/bin/config.sh @@ -605,26 +605,36 @@ runBashJavaUtilsCmd() { local cmd=$1 local class_path=$2 local conf_dir=$3 + local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:" - local output="`${JAVA_RUN} -classpath ${class_path} org.apache.flink.runtime.util.BashJavaUtils ${cmd} --configDir ${conf_dir} 2> /dev/null`" + local output=`${JAVA_RUN} -classpath ${class_path} org.apache.flink.runtime.util.BashJavaUtils ${cmd} --configDir ${conf_dir} | tail -n 1` if [[ $? -ne 0 ]]; then - echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." + echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." 1>&2 + # Print the output in case the user redirect the log to console. + echo $output 1>&2 exit 1 fi - echo ${output} + if ! [[ $output =~ ^${EXECUTION_PREFIX}.* ]]; then + echo "[ERROR] Unexpected result: $output" 1>&2 + echo "[ERROR] The last line of the BashJavaUtils outputs is expected to be the execution result, following the prefix '${EXECUTION_PREFIX}'" 1>&2 + echo $output 1>&2 + exit 1 + fi + + echo ${output} | sed "s/$EXECUTION_PREFIX//g" } getTmResourceJvmParams() { local class_path=`constructFlinkClassPath` class_path=`manglePathList ${class_path}` - echo $(runBashJavaUtilsCmd GET_TM_RESOURCE_JVM_PARAMS ${class_path} ${FLINK_CONF_DIR}) + runBashJavaUtilsCmd GET_TM_RESOURCE_JVM_PARAMS ${class_path} ${FLINK_CONF_DIR} } getTmResourceDynamicConfigs() { local class_path=`constructFlinkClassPath` class_path=`manglePathList ${class_path}` - echo $(runBashJavaUtilsCmd GET_TM_RESOURCE_DYNAMIC_CONFIGS ${class_path} ${FLINK_CONF_DIR}) + runBashJavaUtilsCmd GET_TM_RESOURCE_DYNAMIC_CONFIGS ${class_path} ${FLINK_CONF_DIR} } diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh index db685cdf21c99..1871b736be776 100755 --- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh +++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh @@ -50,10 +50,18 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then # Startup parameters jvm_params=$(getTmResourceJvmParams) + if [[ $? -ne 0 ]]; then + echo "[ERROR] Could not get JVM parameters properly." + exit 1 + fi export JVM_ARGS="${JVM_ARGS} ${jvm_params}" IFS=$" " dynamic_configs=($(getTmResourceDynamicConfigs)) + if [[ $? -ne 0 ]]; then + echo "[ERROR] Could not get dynamic configurations properly." + exit 1 + fi ARGS+=("--configDir" "${FLINK_CONF_DIR}" ${dynamic_configs[@]}) fi diff --git a/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java index 1766a2655a4e8..5be92a1dd6273 100644 --- a/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java +++ b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java @@ -23,6 +23,8 @@ import org.junit.Test; +import java.io.IOException; + import static org.junit.Assert.assertNotNull; /** @@ -34,11 +36,22 @@ public class BashJavaUtilsITCase extends JavaBashTestBase { private static final String RUN_BASH_JAVA_UTILS_CMD_SCRIPT = "src/test/bin/runBashJavaUtilsCmd.sh"; + /** + * Executes the given shell script wrapper and returns the last line. + */ + private String executeScriptAndFetchLastLine(final String command) throws IOException { + String[] commands = {RUN_BASH_JAVA_UTILS_CMD_SCRIPT, command}; + String[] lines = executeScript(commands).split(System.lineSeparator()); + if (lines.length == 0) { + return ""; + } else { + return lines[lines.length - 1]; + } + } + @Test public void testGetTmResourceDynamicConfigs() throws Exception { - String[] command = {RUN_BASH_JAVA_UTILS_CMD_SCRIPT, - BashJavaUtils.Command.GET_TM_RESOURCE_DYNAMIC_CONFIGS.toString()}; - String result = executeScript(command); + String result = executeScriptAndFetchLastLine(BashJavaUtils.Command.GET_TM_RESOURCE_DYNAMIC_CONFIGS.toString()); assertNotNull(result); ConfigurationUtils.parseTmResourceDynamicConfigs(result); @@ -46,9 +59,7 @@ public void testGetTmResourceDynamicConfigs() throws Exception { @Test public void testGetTmResourceJvmParams() throws Exception { - String[] command = {RUN_BASH_JAVA_UTILS_CMD_SCRIPT, - BashJavaUtils.Command.GET_TM_RESOURCE_JVM_PARAMS.toString()}; - String result = executeScript(command); + String result = executeScriptAndFetchLastLine(BashJavaUtils.Command.GET_TM_RESOURCE_JVM_PARAMS.toString()); assertNotNull(result); ConfigurationUtils.parseTmResourceJvmParams(result); diff --git a/flink-dist/src/test/java/org/apache/flink/dist/JavaBashTestBase.java b/flink-dist/src/test/java/org/apache/flink/dist/JavaBashTestBase.java index 63faaa201df6a..4d2f0c6683462 100644 --- a/flink-dist/src/test/java/org/apache/flink/dist/JavaBashTestBase.java +++ b/flink-dist/src/test/java/org/apache/flink/dist/JavaBashTestBase.java @@ -21,12 +21,11 @@ import org.apache.flink.util.OperatingSystem; import org.apache.flink.util.TestLogger; +import org.apache.commons.io.IOUtils; import org.junit.Assume; import org.junit.BeforeClass; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; /** * Abstract test class for executing bash scripts. @@ -49,12 +48,6 @@ protected String executeScript(final String[] command) throws IOException { ProcessBuilder pb = new ProcessBuilder(command); pb.redirectErrorStream(true); Process process = pb.start(); - BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); - StringBuilder sb = new StringBuilder(); - String s; - while ((s = reader.readLine()) != null) { - sb.append(s); - } - return sb.toString(); + return IOUtils.toString(process.getInputStream()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java index cd8b398d857ec..c31e05c52d51e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java @@ -33,6 +33,8 @@ */ public class BashJavaUtils { + private static final String EXECUTION_PREFIX = "BASH_JAVA_UTILS_EXEC_RESULT:"; + public static void main(String[] args) throws Exception { checkArgument(args.length > 0, "Command not specified."); @@ -52,13 +54,13 @@ public static void main(String[] args) throws Exception { private static void getTmResourceDynamicConfigs(String[] args) throws Exception { Configuration configuration = getConfigurationForStandaloneTaskManagers(args); TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration); - System.out.println(TaskExecutorResourceUtils.generateDynamicConfigsStr(taskExecutorResourceSpec)); + System.out.println(EXECUTION_PREFIX + TaskExecutorResourceUtils.generateDynamicConfigsStr(taskExecutorResourceSpec)); } private static void getTmResourceJvmParams(String[] args) throws Exception { Configuration configuration = getConfigurationForStandaloneTaskManagers(args); TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration); - System.out.println(TaskExecutorResourceUtils.generateJvmParametersStr(taskExecutorResourceSpec)); + System.out.println(EXECUTION_PREFIX + TaskExecutorResourceUtils.generateJvmParametersStr(taskExecutorResourceSpec)); } private static Configuration getConfigurationForStandaloneTaskManagers(String[] args) throws Exception {