From 1387af7847cb4e40b82ac978b991bf5ffedf6ed4 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 29 Jun 2022 18:09:10 -0500 Subject: [PATCH] [SPARK-39553][CORE] Multi-thread unregister shuffle shouldn't throw NPE when using Scala 2.13 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This pr add a `shuffleStatus != null` condition to `o.a.s.MapOutputTrackerMaster#unregisterShuffle` method to avoid throwing NPE when using Scala 2.13. Ensure that no NPE is thrown when `o.a.s.MapOutputTrackerMaster#unregisterShuffle` is called by multiple threads, this pr is only for Scala 2.13. `o.a.s.MapOutputTrackerMaster#unregisterShuffle` method will be called concurrently by the following two paths: - BlockManagerStorageEndpoint: https://github.com/apache/spark/blob/6f1046afa40096f477b29beecca5ca6286dfa7f3/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala#L56-L62 - ContextCleaner: https://github.com/apache/spark/blob/6f1046afa40096f477b29beecca5ca6286dfa7f3/core/src/main/scala/org/apache/spark/ContextCleaner.scala#L234-L241 When test with Scala 2.13, for example `sql/core` module, there are many log as follows,although these did not cause UTs failure: ``` 17:44:09.957 WARN org.apache.spark.storage.BlockManagerMaster: Failed to remove shuffle 87 - null java.lang.NullPointerException at org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1(MapOutputTracker.scala:882) at org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1$adapted(MapOutputTracker.scala:881) at scala.Option.foreach(Option.scala:437) at org.apache.spark.MapOutputTrackerMaster.unregisterShuffle(MapOutputTracker.scala:881) at org.apache.spark.storage.BlockManagerStorageEndpoint$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$3(BlockManagerStorageEndpoint.scala:59) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.scala:17) at org.apache.spark.storage.BlockManagerStorageEndpoint.$anonfun$doAsync$1(BlockManagerStorageEndpoint.scala:89) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:678) at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 17:44:09.958 ERROR org.apache.spark.ContextCleaner: Error cleaning shuffle 94 java.lang.NullPointerException at org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1(MapOutputTracker.scala:882) at org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1$adapted(MapOutputTracker.scala:881) at scala.Option.foreach(Option.scala:437) at org.apache.spark.MapOutputTrackerMaster.unregisterShuffle(MapOutputTracker.scala:881) at org.apache.spark.ContextCleaner.doCleanupShuffle(ContextCleaner.scala:241) at org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3(ContextCleaner.scala:202) at org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3$adapted(ContextCleaner.scala:195) at scala.Option.foreach(Option.scala:437) at org.apache.spark.ContextCleaner.$anonfun$keepCleaning$1(ContextCleaner.scala:195) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1432) at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:189) at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:79) ``` I think this is a bug of Scala 2.13.8 and already submit an issue to https://github.com/scala/bug/issues/12613, this PR is only for protection, we should remove this protection after Scala 2.13(maybe https://github.com/scala/scala/pull/9957) fixes this issue. No - Pass GA - Add new test `SPARK-39553: Multi-thread unregister shuffle shouldn't throw NPE` to `MapOutputTrackerSuite`, we can test manually as follows: ``` dev/change-scala-version.sh 2.13 mvn clean install -DskipTests -pl core -am -Pscala-2.13 mvn clean test -pl core -Pscala-2.13 -Dtest=none -DwildcardSuites=org.apache.spark.MapOutputTrackerSuite ``` **Before** ``` - SPARK-39553: Multi-thread unregister shuffle shouldn't throw NPE *** FAILED *** 3 did not equal 0 (MapOutputTrackerSuite.scala:971) Run completed in 17 seconds, 505 milliseconds. Total number of tests run: 25 Suites: completed 2, aborted 0 Tests: succeeded 24, failed 1, canceled 0, ignored 1, pending 0 *** 1 TEST FAILED *** ``` **After** ``` - SPARK-39553: Multi-thread unregister shuffle shouldn't throw NPE Run completed in 17 seconds, 996 milliseconds. Total number of tests run: 25 Suites: completed 2, aborted 0 Tests: succeeded 25, failed 0, canceled 0, ignored 1, pending 0 All tests passed. ``` Closes #37024 from LuciferYang/SPARK-39553. Authored-by: yangjie01 Signed-off-by: Sean Owen (cherry picked from commit 29258964cae45cea43617ade971fb4ea9fe2902a) Signed-off-by: Sean Owen --- .../org/apache/spark/MapOutputTracker.scala | 8 +++-- .../apache/spark/MapOutputTrackerSuite.scala | 34 +++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index af26abc09892f..e469c9989f2cd 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -832,8 +832,12 @@ private[spark] class MapOutputTrackerMaster( /** Unregister shuffle data */ def unregisterShuffle(shuffleId: Int): Unit = { shuffleStatuses.remove(shuffleId).foreach { shuffleStatus => - shuffleStatus.invalidateSerializedMapOutputStatusCache() - shuffleStatus.invalidateSerializedMergeOutputStatusCache() + // SPARK-39553: Add protection for Scala 2.13 due to https://github.com/scala/bug/issues/12613 + // We should revert this if Scala 2.13 solves this issue. + if (shuffleStatus != null) { + shuffleStatus.invalidateSerializedMapOutputStatusCache() + shuffleStatus.invalidateSerializedMergeOutputStatusCache() + } } } diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 0ee2c77997973..980d0835661d9 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark +import java.util.concurrent.atomic.LongAdder + import scala.collection.mutable.ArrayBuffer import org.mockito.ArgumentMatchers.any @@ -910,4 +912,36 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { rpcEnv.shutdown() slaveRpcEnv.shutdown() } + + test("SPARK-39553: Multi-thread unregister shuffle shouldn't throw NPE") { + val rpcEnv = createRpcEnv("test") + val tracker = newTrackerMaster() + tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, + new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf)) + val shuffleIdRange = 0 until 100 + shuffleIdRange.foreach { shuffleId => + tracker.registerShuffle(shuffleId, 2, MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES) + } + val npeCounter = new LongAdder() + // More threads will help to reproduce the problem + val threads = new Array[Thread](5) + threads.indices.foreach { i => + threads(i) = new Thread() { + override def run(): Unit = { + shuffleIdRange.foreach { shuffleId => + try { + tracker.unregisterShuffle(shuffleId) + } catch { + case _: NullPointerException => npeCounter.increment() + } + } + } + } + } + threads.foreach(_.start()) + threads.foreach(_.join()) + tracker.stop() + rpcEnv.shutdown() + assert(npeCounter.intValue() == 0) + } }