Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Fix spark/flink starter script error on windows #6435

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
8e08b31
New code commit
GangLiCN Feb 1, 2024
5ac6f7e
Add more comments
GangLiCN Feb 1, 2024
7334d51
fix issue6386
GangLiCN Mar 2, 2024
8216b25
Merge branch 'apache:dev' into dev
GangLiCN Mar 2, 2024
2c084e3
fix issue6386 bases on review comments
GangLiCN Mar 6, 2024
fb1dead
fix issue6386 again: resync SparkStarter.java from base repo's dev br…
GangLiCN Mar 6, 2024
283f37d
Merge pull request #1 from GangLiCN/issue6386-fix
GangLiCN Mar 6, 2024
4a76471
New code commit
GangLiCN Feb 1, 2024
394f2fd
Add more comments
GangLiCN Feb 1, 2024
951028a
fix issue6386
GangLiCN Mar 2, 2024
655ae6b
fix issue6386 bases on review comments
GangLiCN Mar 6, 2024
c5fd966
fix issue6386 again: resync SparkStarter.java from base repo's dev br…
GangLiCN Mar 6, 2024
9e1a42c
Merge branch 'dev' of https://github.com/GangLiCN/seatunnel-win-port …
hailin0 Mar 11, 2024
7bad23d
Revert "Merge branch 'dev' of https://github.com/GangLiCN/seatunnel-w…
hailin0 Mar 11, 2024
ca4c88f
fix issue6386 and windows specific launch cmd script error
GangLiCN Mar 11, 2024
37994c9
Merge branch 'dev' into issue6386_fix_with_unit_test
GangLiCN Mar 11, 2024
b1d904c
Merge pull request #2 from GangLiCN/issue6386_fix_with_unit_test
GangLiCN Mar 11, 2024
88eaa44
fix incorrect version issue from [pom.xml]
GangLiCN Mar 12, 2024
6b5429e
Merge pull request #3 from GangLiCN/fix_pom_xml_version_issue
GangLiCN Mar 12, 2024
d1199de
Merge remote-tracking branch 'upstream/dev' into dev
GangLiCN Mar 13, 2024
f364600
merge dev branch doc fix
GangLiCN Mar 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.apache.seatunnel.core.starter.utils;

import org.apache.commons.lang3.SystemUtils;

public class SystemUtil {

public static String GetOsType() {
String os_type="";

if (SystemUtils.IS_OS_WINDOWS) {
os_type="Windows";
} else if (SystemUtils.IS_OS_MAC) {
os_type="Mac";
} else if (SystemUtils.IS_OS_LINUX) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switch?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean change the code to the code style "switch ... case" ? If this is true, I'll modify as you suggested.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think "switch... case" is needed here, because there are many different properties from SystemUtils(e.g. IS_OS_LINUX, IS_OS_WINDOWS,,,etc), if SystemUtils can return unique property "OS_TYPE" then we can change to Switch... case style easily.

os_type="Linux";
} else if (SystemUtils.IS_OS_SOLARIS) {
os_type="Solaris";
} else {
os_type="Unknown";
}

return os_type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;

import org.apache.seatunnel.core.starter.utils.SystemUtil;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand All @@ -44,6 +46,7 @@ public class FlinkStarter implements Starter {
this.appJar = Common.appStarterDir().resolve(APP_JAR_NAME).toString();
}

@SuppressWarnings("checkstyle:RegexpSingleline")
public static void main(String[] args) {
FlinkStarter flinkStarter = new FlinkStarter(args);
System.out.println(String.join(" ", flinkStarter.buildCommands()));
Expand All @@ -52,8 +55,33 @@ public static void main(String[] args) {
@Override
public List<String> buildCommands() {
List<String> command = new ArrayList<>();
String local_os_type="";

SystemUtil my_system_util=new SystemUtil();
local_os_type=my_system_util.GetOsType();
// debug
// System.out.println("OS type:"+local_os_type);

String cmd_flink="";

// Nothe that "flink.cmd” or "flink.bat" can be retrieved from lower version of flink (e.g. 1.0.9)
// We do not check if this file exists on the box, user needs to make sure this file exists or not.
if (local_os_type.toLowerCase().equals("windows")) {
cmd_flink="%FLINK_HOME%/bin/flink.cmd";
} else if (local_os_type.toLowerCase().equals("linux")) {
cmd_flink="${FLINK_HOME}/bin/flink";
} else if (local_os_type.toLowerCase().equals("unknown")) {
cmd_flink="error";
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed .


// set start command
command.add("${FLINK_HOME}/bin/flink");
if ( ! (cmd_flink.equals("error"))) {
command.add(cmd_flink);
} else {
System.out.println("Error: Can not determine OS type, abort run !");
System.exit(-1);
}

// set deploy mode, run or run-application
command.add(flinkCommandArgs.getDeployMode().getDeployMode());
// set submitted target master
Expand Down Expand Up @@ -91,6 +119,8 @@ public List<String> buildCommands() {
.filter(Objects::nonNull)
.map(String::trim)
.forEach(variable -> command.add("-D" + variable));
// debug
// System.out.println("Whole command string:" + command.toString());
return command;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.seatunnel.core.starter.utils.SystemUtil;

/** A Starter to generate spark-submit command for SeaTunnel job on spark. */
public class SparkStarter implements Starter {

Expand All @@ -80,6 +82,7 @@ private SparkStarter(String[] args, SparkCommandArgs commandArgs) {
this.commandArgs = commandArgs;
}

@SuppressWarnings("checkstyle:RegexpSingleline")
public static void main(String[] args) throws IOException {
SparkStarter starter = getInstance(args);
List<String> command = starter.buildCommands();
Expand Down Expand Up @@ -169,9 +172,9 @@ static Map<String, String> getSparkConf(String configFile) throws FileNotFoundEx
Map.Entry::getKey, e -> e.getValue().unwrapped().toString()));
}

/** return connector's jars, which located in 'connectors/*'. */
/** return connector's jars, which located in 'connectors/spark/*'. */
private List<Path> getConnectorJarDependencies() {
Path pluginRootDir = Common.connectorDir();
Path pluginRootDir = Common.connectorJarDir("seatunnel");
if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir)) {
return Collections.emptyList();
}
Expand All @@ -195,7 +198,30 @@ private List<Path> getConnectorJarDependencies() {
/** build final spark-submit commands */
protected List<String> buildFinal() {
List<String> commands = new ArrayList<>();
commands.add("${SPARK_HOME}/bin/spark-submit");
String local_os_type="";

SystemUtil my_system_util=new SystemUtil();
local_os_type=my_system_util.GetOsType();
// debug
// System.out.println("OS type:"+local_os_type);

String cmd_spark="";

if (local_os_type.toLowerCase().equals("windows")) {
cmd_spark="%SPARK_HOME%/bin/spark-submit.cmd";
} else if (local_os_type.toLowerCase().equals("linux")) {
cmd_spark="${SPARK_HOME}/bin/spark-submit";
} else if (local_os_type.toLowerCase().equals("unknown")) {
cmd_spark="error";
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.


if ( ! (cmd_spark.equals("error"))) {
commands.add(cmd_spark);
} else {
System.out.println("Error: Can not determine OS type, abort run !");
System.exit(-1);
}

appendOption(commands, "--class", SeaTunnelSpark.class.getName());
appendOption(commands, "--name", this.commandArgs.getJobName());
appendOption(commands, "--master", this.commandArgs.getMaster());
Expand All @@ -217,6 +243,9 @@ protected List<String> buildFinal() {
if (this.commandArgs.isCheckConfig()) {
commands.add("--check");
}

// debug
// System.out.println("Whole spark job command string:" + commands.toString());
return commands;
}

Expand Down Expand Up @@ -259,6 +288,7 @@ protected void appendAppJar(List<String> commands) {
Common.appStarterDir().resolve(EngineType.SPARK3.getStarterJarName()).toString());
}

@SuppressWarnings("checkstyle:Indentation")
private List<PluginIdentifier> getPluginIdentifiers(Config config, PluginType... pluginTypes) {
return Arrays.stream(pluginTypes)
.flatMap(
Expand Down
Loading