From 53ad810087fa6b1564af4092a74c87ee45c23885 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 26 Nov 2025 21:00:11 +0800 Subject: [PATCH 1/2] Close URLClassLoader eagerly to avoid OOM --- .../org/apache/spark/executor/Executor.scala | 31 +++++++++++++++++-- .../spark/internal/config/package.scala | 9 ++++++ .../sql/connect/test/RemoteSparkSession.scala | 2 ++ .../spark/sql/artifact/ArtifactManager.scala | 15 ++++++++- 4 files changed, 54 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index fc22107e008b..72f7a98e2188 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -20,7 +20,7 @@ package org.apache.spark.executor import java.io.{File, NotSerializableException} import java.lang.Thread.UncaughtExceptionHandler import java.lang.management.ManagementFactory -import java.net.{URI, URL} +import java.net.{URI, URL, URLClassLoader} import java.nio.ByteBuffer import java.util.{Locale, Properties} import java.util.concurrent._ @@ -212,7 +212,7 @@ private[spark] class Executor( val defaultSessionState: IsolatedSessionState = newSessionState(JobArtifactState("default", None)) val isolatedSessionCache: Cache[String, IsolatedSessionState] = CacheBuilder.newBuilder() - .maximumSize(100) + .maximumSize(conf.get(EXECUTOR_ISOLATED_SESSION_CACHE_SIZE)) .expireAfterAccess(30, TimeUnit.MINUTES) .removalListener(new RemovalListener[String, IsolatedSessionState]() { override def onRemoval( @@ -220,6 +220,33 @@ private[spark] class Executor( val state = notification.getValue // Cache is always used for isolated sessions. assert(!isDefaultState(state.sessionUUID)) + // Close classloaders to release resources. + try { + state.replClassLoader match { + case urlClassLoader: URLClassLoader => + urlClassLoader.close() + logInfo(log"Closed replClassLoader (URLClassLoader) for evicted session " + + log"${MDC(SESSION_ID, state.sessionUUID)}") + case _ => + } + } catch { + case NonFatal(e) => + logWarning(log"Failed to close replClassLoader for session " + + log"${MDC(SESSION_ID, state.sessionUUID)}", e) + } + try { + state.urlClassLoader match { + case urlClassLoader: URLClassLoader => + urlClassLoader.close() + logInfo(log"Closed urlClassLoader (URLClassLoader) for evicted session " + + log"${MDC(SESSION_ID, state.sessionUUID)}") + case _ => + } + } catch { + case NonFatal(e) => + logWarning(log"Failed to close urlClassLoader for session " + + log"${MDC(SESSION_ID, state.sessionUUID)}", e) + } val sessionBasedRoot = new File(SparkFiles.getRootDirectory(), state.sessionUUID) if (sessionBasedRoot.isDirectory && sessionBasedRoot.exists()) { Utils.deleteRecursively(sessionBasedRoot) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 1e5b2e8018f0..331d798a3d76 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -358,6 +358,15 @@ package object config { .intConf .createWithDefault(60) + private[spark] val EXECUTOR_ISOLATED_SESSION_CACHE_SIZE = + ConfigBuilder("spark.executor.isolatedSessionCache.size") + .doc("Maximum number of isolated sessions to cache in the executor. Each cached session " + + "maintains its own classloader for artifact isolation.") + .version("4.1.0") + .intConf + .checkValue(_ > 0, "The cache size must be positive.") + .createWithDefault(100) + private[spark] val EXECUTOR_PROCESS_TREE_METRICS_ENABLED = ConfigBuilder("spark.executor.processTreeMetrics.enabled") .doc("Whether to collect process tree metrics (from the /proc filesystem) when collecting " + diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala index 6d8d2edcf082..efac3bc7561f 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala @@ -135,6 +135,8 @@ object SparkConnectServerUtils { "spark.connect.execute.reattachable.senderMaxStreamSize=123", // Testing SPARK-49673, setting maxBatchSize to 10MiB s"spark.connect.grpc.arrow.maxBatchSize=${10 * 1024 * 1024}", + // Cache less sessions to save memory. + "spark.executor.isolatedSessionCache.size=5", // Disable UI "spark.ui.enabled=false").flatMap(v => "--conf" :: v :: Nil) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala index 5889fe581d4e..346cdb832c3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala @@ -450,7 +450,20 @@ class ArtifactManager(session: SparkSession) extends AutoCloseable with Logging pythonIncludeList.clear() sparkContextRelativePaths.clear() - // Removed cached classloader + // Close and remove cached classloader + cachedClassLoader.foreach { + case urlClassLoader: URLClassLoader => + try { + urlClassLoader.close() + logDebug(log"Closed URLClassLoader for session " + + log"${MDC(LogKeys.SESSION_ID, session.sessionUUID)}") + } catch { + case e: IOException => + logWarning(log"Failed to close URLClassLoader for session " + + log"${MDC(LogKeys.SESSION_ID, session.sessionUUID)}", e) + } + case _ => + } cachedClassLoader = None } From 3d2b9e38905ecf01f4b58673af9d7421dfecbe89 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 26 Nov 2025 23:08:17 +0800 Subject: [PATCH 2/2] Update Executor.scala --- .../org/apache/spark/executor/Executor.scala | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 72f7a98e2188..04e966294336 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -220,20 +220,7 @@ private[spark] class Executor( val state = notification.getValue // Cache is always used for isolated sessions. assert(!isDefaultState(state.sessionUUID)) - // Close classloaders to release resources. - try { - state.replClassLoader match { - case urlClassLoader: URLClassLoader => - urlClassLoader.close() - logInfo(log"Closed replClassLoader (URLClassLoader) for evicted session " + - log"${MDC(SESSION_ID, state.sessionUUID)}") - case _ => - } - } catch { - case NonFatal(e) => - logWarning(log"Failed to close replClassLoader for session " + - log"${MDC(SESSION_ID, state.sessionUUID)}", e) - } + // Close the urlClassLoader to release resources. try { state.urlClassLoader match { case urlClassLoader: URLClassLoader =>