From 20cee2583921b09613df73caf90d91944bcb3794 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Fri, 9 May 2025 16:44:27 +0900 Subject: [PATCH 1/2] followup --- .../org/apache/spark/launcher/SparkSubmitCommandBuilder.java | 1 + 1 file changed, 1 insertion(+) diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java index 8eaefc6364a9..bd057e947122 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java @@ -126,6 +126,7 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder { boolean isExample = false; List submitArgs = args; this.userArgs = Collections.emptyList(); + isRemote = "connect".equalsIgnoreCase(getApiMode(conf)); if (args.size() > 0) { switch (args.get(0)) { From f9ba6ee6011a5fc40e62ce1a1c0908ef84a5c672 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Fri, 9 May 2025 18:55:26 +0900 Subject: [PATCH 2/2] fixup --- .../spark/launcher/SparkSubmitCommandBuilder.java | 12 ++++++++---- python/pyspark/java_gateway.py | 1 + python/pyspark/sql/connect/session.py | 4 ---- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java index bd057e947122..5efa3bef78bc 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java @@ -126,7 +126,7 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder { boolean isExample = false; List submitArgs = args; this.userArgs = Collections.emptyList(); - isRemote = "connect".equalsIgnoreCase(getApiMode(conf)); + isRemote |= "connect".equalsIgnoreCase(getApiMode(conf)); if (args.size() > 0) { switch (args.get(0)) { @@ -550,11 +550,15 @@ protected boolean handle(String opt, String value) { checkArgument(value != null, "Missing argument to %s", CONF); String[] setConf = value.split("=", 2); checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value); - conf.put(setConf[0], setConf[1]); // If both spark.remote and spark.mater are set, the error will be thrown later when // the application is started. - isRemote |= conf.containsKey("spark.remote"); - isRemote |= "connect".equalsIgnoreCase(getApiMode(conf)); + if (setConf[0].equals("spark.remote")) { + isRemote = true; + } else if (setConf[0].equals(SparkLauncher.SPARK_API_MODE)) { + // Respects if the API mode is explicitly set. + isRemote = setConf[1].equalsIgnoreCase("connect"); + } + conf.put(setConf[0], setConf[1]); } case CLASS -> { // The special classes require some special command line handling, since they allow diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 3dd8123d581c..6303a4361857 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -83,6 +83,7 @@ def launch_gateway(conf=None, popen_kwargs=None): os.unlink(conn_info_file) env = dict(os.environ) + env["SPARK_CONNECT_MODE"] = "0" env["_PYSPARK_DRIVER_CONN_INFO_PATH"] = conn_info_file # Launch the Java gateway. diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py index efa9ce7c2c43..303b9c9aac12 100644 --- a/python/pyspark/sql/connect/session.py +++ b/python/pyspark/sql/connect/session.py @@ -1074,13 +1074,11 @@ def _start_connect_server(master: str, opts: Dict[str, Any]) -> None: overwrite_conf["spark.connect.grpc.binding.port"] = "0" origin_remote = os.environ.get("SPARK_REMOTE", None) - origin_connect_mode = os.environ.get("SPARK_CONNECT_MODE", None) try: # So SparkSubmit thinks no remote is set in order to # start the regular PySpark session. if origin_remote is not None: del os.environ["SPARK_REMOTE"] - os.environ["SPARK_CONNECT_MODE"] = "0" # The regular PySpark session is registered as an active session # so would not be garbage-collected. @@ -1098,8 +1096,6 @@ def _start_connect_server(master: str, opts: Dict[str, Any]) -> None: finally: if origin_remote is not None: os.environ["SPARK_REMOTE"] = origin_remote - if origin_connect_mode is not None: - os.environ["SPARK_CONNECT_MODE"] = origin_connect_mode else: raise PySparkRuntimeError( errorClass="SESSION_OR_CONTEXT_EXISTS",