From 6ce0822f76e11447487d5f6b3cce94a894f2ceef Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Fri, 7 Apr 2023 10:14:07 +0900 Subject: [PATCH] [SPARK-39696][CORE] Fix data race in access to TaskMetrics.externalAccums ### What changes were proposed in this pull request? This PR fixes a data race around concurrent access to `TaskMetrics.externalAccums`. The race occurs between the `executor-heartbeater` thread and the thread executing the task. This data race is not known to cause issues on 2.12 but in 2.13 ~due this change https://github.com/scala/scala/pull/9258~ (LuciferYang bisected this to first cause failures in scala 2.13.7 one possible reason could be https://github.com/scala/scala/pull/9786) leads to an uncaught exception in the `executor-heartbeater` thread, which means that the executor will eventually be terminated due to missing hearbeats. This fix of using of using `CopyOnWriteArrayList` is cherry picked from https://github.com/apache/spark/pull/37206 where is was suggested as a fix by LuciferYang since `TaskMetrics.externalAccums` is also accessed from outside the class `TaskMetrics`. The old PR was closed because at that point there was no clear understanding of the race condition. JoshRosen commented here https://github.com/apache/spark/pull/37206#issuecomment-1189930626 saying that there should be no such race based on because all accumulators should be deserialized as part of task deserialization here: https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L507-L508 and therefore no writes should occur while the hearbeat thread will read the accumulators. But my understanding is that is incorrect as accumulators will also be deserialized as part of the taskBinary here: https://github.com/apache/spark/blob/169f828b1efe10d7f21e4b71a77f68cdd1d706d6/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala#L87-L88 which will happen while the heartbeater thread is potentially reading the accumulators. This can both due to user code using accumulators (see the new test case) but also when using the Dataframe/Dataset API as sql metrics will also be `externalAccums`. One way metrics will be sent as part of the taskBinary is when the dep is a `ShuffleDependency`: https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/core/src/main/scala/org/apache/spark/Dependency.scala#L85 with a ShuffleWriteProcessor that comes from https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L411-L422 ### Why are the changes needed? The current code has a data race. ### Does this PR introduce _any_ user-facing change? It will fix an uncaught exception in the `executor-hearbeater` thread when using scala 2.13. ### How was this patch tested? This patch adds a new test case, that before the fix was applied consistently produces the uncaught exception in the heartbeater thread when using scala 2.13. Closes #40663 from eejbyfeldt/SPARK-39696. Lead-authored-by: Emil Ejbyfeldt Co-authored-by: yangjie01 Signed-off-by: Hyukjin Kwon --- .../apache/spark/executor/TaskMetrics.scala | 10 ++++++--- .../apache/spark/executor/ExecutorSuite.scala | 22 +++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 1ca8590b1c90c..78b39b0cbda68 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -17,6 +17,8 @@ package org.apache.spark.executor +import java.util.concurrent.CopyOnWriteArrayList + import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, LinkedHashMap} @@ -262,10 +264,12 @@ class TaskMetrics private[spark] () extends Serializable { /** * External accumulators registered with this task. */ - @transient private[spark] lazy val externalAccums = new ArrayBuffer[AccumulatorV2[_, _]] + @transient private[spark] lazy val _externalAccums = new CopyOnWriteArrayList[AccumulatorV2[_, _]] + + private[spark] def externalAccums = _externalAccums.asScala private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = { - externalAccums += a + _externalAccums.add(a) } private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums @@ -331,7 +335,7 @@ private[spark] object TaskMetrics extends Logging { tmAcc.metadata = acc.metadata tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]]) } else { - tm.externalAccums += acc + tm._externalAccums.add(acc) } } tm diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index bef36d08e8aee..46f41195ebd87 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -30,6 +30,7 @@ import scala.collection.mutable.{ArrayBuffer, Map} import scala.concurrent.duration._ import com.google.common.cache.{CacheBuilder, CacheLoader} +import org.apache.logging.log4j._ import org.mockito.ArgumentCaptor import org.mockito.ArgumentMatchers.{any, eq => meq} import org.mockito.Mockito.{inOrder, verify, when} @@ -270,6 +271,27 @@ class ExecutorSuite extends SparkFunSuite heartbeatZeroAccumulatorUpdateTest(false) } + test("SPARK-39696: Using accumulators should not cause heartbeat to fail") { + val conf = new SparkConf().setMaster("local").setAppName("executor suite test") + conf.set(EXECUTOR_HEARTBEAT_INTERVAL.key, "1ms") + sc = new SparkContext(conf) + + val accums = (1 to 10).map(i => sc.longAccumulator(s"mapperRunAccumulator$i")) + val input = sc.parallelize(1 to 10, 10) + var testRdd = input.map(i => (i, i)) + (0 to 10).foreach( i => + testRdd = testRdd.map(x => { accums.foreach(_.add(1)); (x._1 * i, x._2) }).reduceByKey(_ + _) + ) + + val logAppender = new LogAppender("heartbeat thread should not die") + withLogAppender(logAppender, level = Some(Level.ERROR)) { + val _ = testRdd.count() + } + val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage) + .filter(_.contains("Uncaught exception in thread executor-heartbeater")) + assert(logs.isEmpty) + } + private def withMockHeartbeatReceiverRef(executor: Executor) (func: RpcEndpointRef => Unit): Unit = { val executorClass = classOf[Executor]