From 741d1bdfeeeebe14aeb8e9150cf4927418f08460 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Sun, 24 May 2026 04:12:41 +0000 Subject: [PATCH 1/2] [SPARK-57037][CORE][TESTS] Force GC before allocating large array in SorterSuite to fix flaky OOM --- .../spark/util/collection/SorterSuite.scala | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala index 7551327d704b4..585d2941f4c8d 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala @@ -17,13 +17,26 @@ package org.apache.spark.util.collection +import java.lang.ref.WeakReference import java.util.Arrays +import java.util.concurrent.TimeUnit import org.apache.spark.SparkFunSuite import org.apache.spark.util.random.XORShiftRandom class SorterSuite extends SparkFunSuite { + /** Run GC and wait until it has actually run, to free memory used by prior tests. */ + private def runGC(): Unit = { + val weakRef = new WeakReference(new Object()) + val startTimeNs = System.nanoTime() + System.gc() + while (System.nanoTime() - startTimeNs < TimeUnit.SECONDS.toNanos(10) && weakRef.get != null) { + System.gc() + Thread.sleep(200) + } + } + test("equivalent to Arrays.sort") { val rand = new XORShiftRandom(123) val data0 = Array.tabulate[Int](10000) { i => rand.nextInt() } @@ -71,7 +84,6 @@ class SorterSuite extends SparkFunSuite { } test("java.lang.ArrayIndexOutOfBoundsException in TimSort") { - System.gc() // scalastyle:off val runLengths = Array(76405736, 74830360, 1181532, 787688, 1575376, 2363064, 3938440, 6301504, 1181532, 393844, 15753760, 1575376, 787688, 393844, 1969220, 3150752, 1181532,787688, 5513816, 3938440, @@ -140,7 +152,16 @@ class SorterSuite extends SparkFunSuite { 21, 20, 22, 18, 452, 114, 95, 18, 17, 21, 36, 18, 17, 115, 76, 144, 44, 38, 61,20, 19, 21, 17) // scalastyle:on val arrayToSortSize = 1091482190 - val arrayToSort = new Array[Byte](arrayToSortSize) + // Retry once after forcing GC: memory held by the previous test (e.g. the ~256 MB + // int array in "SPARK-5984 TimSort bug") may not be reclaimed before this >1 GB + // allocation, causing flaky OOM in CI. + val arrayToSort = try { + new Array[Byte](arrayToSortSize) + } catch { + case _: OutOfMemoryError => + runGC() + new Array[Byte](arrayToSortSize) + } var sum: Int = -1 for (i <- runLengths) { sum += i From b29d2cad86f261ace58501879e56f3cfff428122 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 26 May 2026 16:50:35 +0000 Subject: [PATCH 2/2] Address review: lift runGC to SparkFunSuite, add retryOnOOM helper --- .../apache/spark/ContextCleanerSuite.scala | 15 ----------- .../org/apache/spark/SparkFunSuite.scala | 25 +++++++++++++++++ .../spark/util/collection/SorterSuite.scala | 27 +++---------------- 3 files changed, 29 insertions(+), 38 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index 05709c9bdd756..813de4132ab2d 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -17,9 +17,6 @@ package org.apache.spark -import java.lang.ref.WeakReference -import java.util.concurrent.TimeUnit - import scala.collection.mutable.HashSet import scala.util.Random @@ -96,18 +93,6 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[So rdd } - /** Run GC and make sure it actually has run */ - protected def runGC(): Unit = { - val weakRef = new WeakReference(new Object()) - val startTimeNs = System.nanoTime() - System.gc() // Make a best effort to run the garbage collection. It *usually* runs GC. - // Wait until a weak reference object has been GCed - while (System.nanoTime() - startTimeNs < TimeUnit.SECONDS.toNanos(10) && weakRef.get != null) { - System.gc() - Thread.sleep(200) - } - } - protected def cleaner = sc.cleaner.get } diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala index 15e150ab8b933..a0f17f8af3f33 100644 --- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark +import java.lang.ref.WeakReference +import java.util.concurrent.TimeUnit + import scala.annotation.tailrec import org.scalactic.source.Position @@ -97,4 +100,26 @@ abstract class SparkFunSuite test(testNamePrefix + s" ${param._1}", testTags: _*)(testFun(param._2)) } } + + /** Run GC and make sure it actually has run. */ + protected def runGC(): Unit = { + val weakRef = new WeakReference(new Object()) + val startTimeNs = System.nanoTime() + System.gc() // Make a best effort to run the garbage collection. It *usually* runs GC. + // Wait until a weak reference object has been GCed + while (System.nanoTime() - startTimeNs < TimeUnit.SECONDS.toNanos(10) && weakRef.get != null) { + System.gc() + Thread.sleep(200) + } + } + + /** Run `body`; if it throws OutOfMemoryError, force a GC and retry once. */ + protected def retryOnOOM[T](body: => T): T = { + try body + catch { + case _: OutOfMemoryError => + runGC() + body + } + } } diff --git a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala index 585d2941f4c8d..2767769924bc8 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala @@ -17,26 +17,13 @@ package org.apache.spark.util.collection -import java.lang.ref.WeakReference import java.util.Arrays -import java.util.concurrent.TimeUnit import org.apache.spark.SparkFunSuite import org.apache.spark.util.random.XORShiftRandom class SorterSuite extends SparkFunSuite { - /** Run GC and wait until it has actually run, to free memory used by prior tests. */ - private def runGC(): Unit = { - val weakRef = new WeakReference(new Object()) - val startTimeNs = System.nanoTime() - System.gc() - while (System.nanoTime() - startTimeNs < TimeUnit.SECONDS.toNanos(10) && weakRef.get != null) { - System.gc() - Thread.sleep(200) - } - } - test("equivalent to Arrays.sort") { val rand = new XORShiftRandom(123) val data0 = Array.tabulate[Int](10000) { i => rand.nextInt() } @@ -152,16 +139,10 @@ class SorterSuite extends SparkFunSuite { 21, 20, 22, 18, 452, 114, 95, 18, 17, 21, 36, 18, 17, 115, 76, 144, 44, 38, 61,20, 19, 21, 17) // scalastyle:on val arrayToSortSize = 1091482190 - // Retry once after forcing GC: memory held by the previous test (e.g. the ~256 MB - // int array in "SPARK-5984 TimSort bug") may not be reclaimed before this >1 GB - // allocation, causing flaky OOM in CI. - val arrayToSort = try { - new Array[Byte](arrayToSortSize) - } catch { - case _: OutOfMemoryError => - runGC() - new Array[Byte](arrayToSortSize) - } + // Memory held by the previous test (e.g. the ~256 MB int array in "SPARK-5984 + // TimSort bug") may not be reclaimed before this >1 GB allocation, causing flaky + // OOM in CI. Force a GC and retry once on OOM. + val arrayToSort = retryOnOOM(new Array[Byte](arrayToSortSize)) var sum: Int = -1 for (i <- runLengths) { sum += i