Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
boolean isExample = false;
List<String> submitArgs = args;
this.userArgs = Collections.emptyList();
isRemote |= "connect".equalsIgnoreCase(getApiMode(conf));

if (args.size() > 0) {
switch (args.get(0)) {
Expand Down Expand Up @@ -549,11 +550,15 @@ protected boolean handle(String opt, String value) {
checkArgument(value != null, "Missing argument to %s", CONF);
String[] setConf = value.split("=", 2);
checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value);
conf.put(setConf[0], setConf[1]);
// If both spark.remote and spark.mater are set, the error will be thrown later when
// the application is started.
isRemote |= conf.containsKey("spark.remote");
isRemote |= "connect".equalsIgnoreCase(getApiMode(conf));
if (setConf[0].equals("spark.remote")) {
isRemote = true;
} else if (setConf[0].equals(SparkLauncher.SPARK_API_MODE)) {
// Respects if the API mode is explicitly set.
isRemote = setConf[1].equalsIgnoreCase("connect");
}
conf.put(setConf[0], setConf[1]);
}
case CLASS -> {
// The special classes require some special command line handling, since they allow
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def launch_gateway(conf=None, popen_kwargs=None):
os.unlink(conn_info_file)

env = dict(os.environ)
env["SPARK_CONNECT_MODE"] = "0"
env["_PYSPARK_DRIVER_CONN_INFO_PATH"] = conn_info_file

# Launch the Java gateway.
Expand Down
4 changes: 0 additions & 4 deletions python/pyspark/sql/connect/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1074,13 +1074,11 @@ def _start_connect_server(master: str, opts: Dict[str, Any]) -> None:
overwrite_conf["spark.connect.grpc.binding.port"] = "0"

origin_remote = os.environ.get("SPARK_REMOTE", None)
origin_connect_mode = os.environ.get("SPARK_CONNECT_MODE", None)
try:
# So SparkSubmit thinks no remote is set in order to
# start the regular PySpark session.
if origin_remote is not None:
del os.environ["SPARK_REMOTE"]
os.environ["SPARK_CONNECT_MODE"] = "0"

# The regular PySpark session is registered as an active session
# so would not be garbage-collected.
Expand All @@ -1098,8 +1096,6 @@ def _start_connect_server(master: str, opts: Dict[str, Any]) -> None:
finally:
if origin_remote is not None:
os.environ["SPARK_REMOTE"] = origin_remote
if origin_connect_mode is not None:
os.environ["SPARK_CONNECT_MODE"] = origin_connect_mode
else:
raise PySparkRuntimeError(
errorClass="SESSION_OR_CONTEXT_EXISTS",
Expand Down