Skip to content

Commit

Permalink
[FLINK-15488] Obtain the JVM and TM param correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
KarmaGYZ committed Jan 9, 2020
1 parent efb12fe commit 7ba78be
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 22 deletions.
18 changes: 13 additions & 5 deletions flink-dist/src/main/flink-bin/bin/config.sh
Expand Up @@ -606,25 +606,33 @@ runBashJavaUtilsCmd() {
local class_path=$2
local conf_dir=$3

local output="`${JAVA_RUN} -classpath ${class_path} org.apache.flink.runtime.util.BashJavaUtils ${cmd} --configDir ${conf_dir} 2> /dev/null`"
local output=`${JAVA_RUN} -classpath ${class_path} org.apache.flink.runtime.util.BashJavaUtils ${cmd} --configDir ${conf_dir} | tail -n 1`
if [[ $? -ne 0 ]]; then
echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}."
echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." 1>&2
# Print the output in case the user redirect the log to console.
echo $output 1>&2
exit 1
fi

echo ${output}
if ! [[ $output =~ ^BASH_JAVA_UTILS_EXEC_RESULT:.* ]]; then
echo "[ERROR] The last line of the output does not start with the certain prefix." 1>&2
echo $output 1>&2
exit 1
fi

echo ${output} | sed 's/BASH_JAVA_UTILS_EXEC_RESULT://g'
}

getTmResourceJvmParams() {
local class_path=`constructFlinkClassPath`
class_path=`manglePathList ${class_path}`

echo $(runBashJavaUtilsCmd GET_TM_RESOURCE_JVM_PARAMS ${class_path} ${FLINK_CONF_DIR})
runBashJavaUtilsCmd GET_TM_RESOURCE_JVM_PARAMS ${class_path} ${FLINK_CONF_DIR}
}

getTmResourceDynamicConfigs() {
local class_path=`constructFlinkClassPath`
class_path=`manglePathList ${class_path}`

echo $(runBashJavaUtilsCmd GET_TM_RESOURCE_DYNAMIC_CONFIGS ${class_path} ${FLINK_CONF_DIR})
runBashJavaUtilsCmd GET_TM_RESOURCE_DYNAMIC_CONFIGS ${class_path} ${FLINK_CONF_DIR}
}
8 changes: 8 additions & 0 deletions flink-dist/src/main/flink-bin/bin/taskmanager.sh
Expand Up @@ -50,10 +50,18 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
# Startup parameters

jvm_params=$(getTmResourceJvmParams)
if [[ $? -ne 0 ]]; then
echo "[ERROR] Could not get JVM parameters properly."
exit 1
fi
export JVM_ARGS="${JVM_ARGS} ${jvm_params}"

IFS=$" "
dynamic_configs=($(getTmResourceDynamicConfigs))
if [[ $? -ne 0 ]]; then
echo "[ERROR] Could not get dynamic configurations properly."
exit 1
fi
ARGS+=("--configDir" "${FLINK_CONF_DIR}" ${dynamic_configs[@]})
fi

Expand Down
Expand Up @@ -23,6 +23,8 @@

import org.junit.Test;

import java.io.IOException;

import static org.junit.Assert.assertNotNull;

/**
Expand All @@ -34,21 +36,30 @@ public class BashJavaUtilsITCase extends JavaBashTestBase {

private static final String RUN_BASH_JAVA_UTILS_CMD_SCRIPT = "src/test/bin/runBashJavaUtilsCmd.sh";

/**
* Executes the given shell script wrapper and returns the last line.
*/
private String executeScriptAndFetchLastLine(final String command) throws IOException {
String[] commands = {RUN_BASH_JAVA_UTILS_CMD_SCRIPT, command};
String[] lines = executeScript(commands).split(System.lineSeparator());
if (lines.length == 0) {
return "";
} else {
return lines[lines.length - 1];
}
}

@Test
public void testGetTmResourceDynamicConfigs() throws Exception {
String[] command = {RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
BashJavaUtils.Command.GET_TM_RESOURCE_DYNAMIC_CONFIGS.toString()};
String result = executeScript(command);
String result = executeScriptAndFetchLastLine(BashJavaUtils.Command.GET_TM_RESOURCE_DYNAMIC_CONFIGS.toString());

assertNotNull(result);
ConfigurationUtils.parseTmResourceDynamicConfigs(result);
}

@Test
public void testGetTmResourceJvmParams() throws Exception {
String[] command = {RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
BashJavaUtils.Command.GET_TM_RESOURCE_JVM_PARAMS.toString()};
String result = executeScript(command);
String result = executeScriptAndFetchLastLine(BashJavaUtils.Command.GET_TM_RESOURCE_JVM_PARAMS.toString());

assertNotNull(result);
ConfigurationUtils.parseTmResourceJvmParams(result);
Expand Down
Expand Up @@ -21,12 +21,11 @@
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.TestLogger;

import org.apache.commons.io.IOUtils;
import org.junit.Assume;
import org.junit.BeforeClass;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

/**
* Abstract test class for executing bash scripts.
Expand All @@ -49,12 +48,6 @@ protected String executeScript(final String[] command) throws IOException {
ProcessBuilder pb = new ProcessBuilder(command);
pb.redirectErrorStream(true);
Process process = pb.start();
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
StringBuilder sb = new StringBuilder();
String s;
while ((s = reader.readLine()) != null) {
sb.append(s);
}
return sb.toString();
return IOUtils.toString(process.getInputStream());
}
}
Expand Up @@ -33,6 +33,8 @@
*/
public class BashJavaUtils {

private static final String EXECUTION_PREFIX = "BASH_JAVA_UTILS_EXEC_RESULT:";

public static void main(String[] args) throws Exception {
checkArgument(args.length > 0, "Command not specified.");

Expand All @@ -52,13 +54,13 @@ public static void main(String[] args) throws Exception {
private static void getTmResourceDynamicConfigs(String[] args) throws Exception {
Configuration configuration = getConfigurationForStandaloneTaskManagers(args);
TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
System.out.println(TaskExecutorResourceUtils.generateDynamicConfigsStr(taskExecutorResourceSpec));
System.out.println(EXECUTION_PREFIX + TaskExecutorResourceUtils.generateDynamicConfigsStr(taskExecutorResourceSpec));
}

private static void getTmResourceJvmParams(String[] args) throws Exception {
Configuration configuration = getConfigurationForStandaloneTaskManagers(args);
TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
System.out.println(TaskExecutorResourceUtils.generateJvmParametersStr(taskExecutorResourceSpec));
System.out.println(EXECUTION_PREFIX + TaskExecutorResourceUtils.generateJvmParametersStr(taskExecutorResourceSpec));
}

private static Configuration getConfigurationForStandaloneTaskManagers(String[] args) throws Exception {
Expand Down

0 comments on commit 7ba78be

Please sign in to comment.