Skip to content

Commit

Permalink
[FLINK-15488][runtime] Obtain JVM and TM params correctly
Browse files Browse the repository at this point in the history
This avoids that log messages to stdout intermingle with BashJavaUtils'
computation result which is also printed to stdout.

This closes #10804.
  • Loading branch information
KarmaGYZ authored and GJL committed Jan 13, 2020
1 parent 2ad4e74 commit d76f216
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 22 deletions.
20 changes: 15 additions & 5 deletions flink-dist/src/main/flink-bin/bin/config.sh
Expand Up @@ -605,26 +605,36 @@ runBashJavaUtilsCmd() {
local cmd=$1
local class_path=$2
local conf_dir=$3
local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"

local output="`${JAVA_RUN} -classpath ${class_path} org.apache.flink.runtime.util.BashJavaUtils ${cmd} --configDir ${conf_dir} 2> /dev/null`"
local output=`${JAVA_RUN} -classpath ${class_path} org.apache.flink.runtime.util.BashJavaUtils ${cmd} --configDir ${conf_dir} | tail -n 1`
if [[ $? -ne 0 ]]; then
echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}."
echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." 1>&2
# Print the output in case the user redirect the log to console.
echo $output 1>&2
exit 1
fi

echo ${output}
if ! [[ $output =~ ^${EXECUTION_PREFIX}.* ]]; then
echo "[ERROR] Unexpected result: $output" 1>&2
echo "[ERROR] The last line of the BashJavaUtils outputs is expected to be the execution result, following the prefix '${EXECUTION_PREFIX}'" 1>&2
echo $output 1>&2
exit 1
fi

echo ${output} | sed "s/$EXECUTION_PREFIX//g"
}

getTmResourceJvmParams() {
local class_path=`constructFlinkClassPath`
class_path=`manglePathList ${class_path}`

echo $(runBashJavaUtilsCmd GET_TM_RESOURCE_JVM_PARAMS ${class_path} ${FLINK_CONF_DIR})
runBashJavaUtilsCmd GET_TM_RESOURCE_JVM_PARAMS ${class_path} ${FLINK_CONF_DIR}
}

getTmResourceDynamicConfigs() {
local class_path=`constructFlinkClassPath`
class_path=`manglePathList ${class_path}`

echo $(runBashJavaUtilsCmd GET_TM_RESOURCE_DYNAMIC_CONFIGS ${class_path} ${FLINK_CONF_DIR})
runBashJavaUtilsCmd GET_TM_RESOURCE_DYNAMIC_CONFIGS ${class_path} ${FLINK_CONF_DIR}
}
8 changes: 8 additions & 0 deletions flink-dist/src/main/flink-bin/bin/taskmanager.sh
Expand Up @@ -50,10 +50,18 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
# Startup parameters

jvm_params=$(getTmResourceJvmParams)
if [[ $? -ne 0 ]]; then
echo "[ERROR] Could not get JVM parameters properly."
exit 1
fi
export JVM_ARGS="${JVM_ARGS} ${jvm_params}"

IFS=$" "
dynamic_configs=($(getTmResourceDynamicConfigs))
if [[ $? -ne 0 ]]; then
echo "[ERROR] Could not get dynamic configurations properly."
exit 1
fi
ARGS+=("--configDir" "${FLINK_CONF_DIR}" ${dynamic_configs[@]})
fi

Expand Down
Expand Up @@ -23,6 +23,8 @@

import org.junit.Test;

import java.io.IOException;

import static org.junit.Assert.assertNotNull;

/**
Expand All @@ -34,21 +36,30 @@ public class BashJavaUtilsITCase extends JavaBashTestBase {

private static final String RUN_BASH_JAVA_UTILS_CMD_SCRIPT = "src/test/bin/runBashJavaUtilsCmd.sh";

/**
* Executes the given shell script wrapper and returns the last line.
*/
private String executeScriptAndFetchLastLine(final String command) throws IOException {
String[] commands = {RUN_BASH_JAVA_UTILS_CMD_SCRIPT, command};
String[] lines = executeScript(commands).split(System.lineSeparator());
if (lines.length == 0) {
return "";
} else {
return lines[lines.length - 1];
}
}

@Test
public void testGetTmResourceDynamicConfigs() throws Exception {
String[] command = {RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
BashJavaUtils.Command.GET_TM_RESOURCE_DYNAMIC_CONFIGS.toString()};
String result = executeScript(command);
String result = executeScriptAndFetchLastLine(BashJavaUtils.Command.GET_TM_RESOURCE_DYNAMIC_CONFIGS.toString());

assertNotNull(result);
ConfigurationUtils.parseTmResourceDynamicConfigs(result);
}

@Test
public void testGetTmResourceJvmParams() throws Exception {
String[] command = {RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
BashJavaUtils.Command.GET_TM_RESOURCE_JVM_PARAMS.toString()};
String result = executeScript(command);
String result = executeScriptAndFetchLastLine(BashJavaUtils.Command.GET_TM_RESOURCE_JVM_PARAMS.toString());

assertNotNull(result);
ConfigurationUtils.parseTmResourceJvmParams(result);
Expand Down
Expand Up @@ -21,12 +21,11 @@
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.TestLogger;

import org.apache.commons.io.IOUtils;
import org.junit.Assume;
import org.junit.BeforeClass;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

/**
* Abstract test class for executing bash scripts.
Expand All @@ -49,12 +48,6 @@ protected String executeScript(final String[] command) throws IOException {
ProcessBuilder pb = new ProcessBuilder(command);
pb.redirectErrorStream(true);
Process process = pb.start();
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
StringBuilder sb = new StringBuilder();
String s;
while ((s = reader.readLine()) != null) {
sb.append(s);
}
return sb.toString();
return IOUtils.toString(process.getInputStream());
}
}
Expand Up @@ -33,6 +33,8 @@
*/
public class BashJavaUtils {

private static final String EXECUTION_PREFIX = "BASH_JAVA_UTILS_EXEC_RESULT:";

public static void main(String[] args) throws Exception {
checkArgument(args.length > 0, "Command not specified.");

Expand All @@ -52,13 +54,13 @@ public static void main(String[] args) throws Exception {
private static void getTmResourceDynamicConfigs(String[] args) throws Exception {
Configuration configuration = getConfigurationForStandaloneTaskManagers(args);
TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
System.out.println(TaskExecutorResourceUtils.generateDynamicConfigsStr(taskExecutorResourceSpec));
System.out.println(EXECUTION_PREFIX + TaskExecutorResourceUtils.generateDynamicConfigsStr(taskExecutorResourceSpec));
}

private static void getTmResourceJvmParams(String[] args) throws Exception {
Configuration configuration = getConfigurationForStandaloneTaskManagers(args);
TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
System.out.println(TaskExecutorResourceUtils.generateJvmParametersStr(taskExecutorResourceSpec));
System.out.println(EXECUTION_PREFIX + TaskExecutorResourceUtils.generateJvmParametersStr(taskExecutorResourceSpec));
}

private static Configuration getConfigurationForStandaloneTaskManagers(String[] args) throws Exception {
Expand Down

0 comments on commit d76f216

Please sign in to comment.