From 0e318acd0cc3b42e8be9cb2a53cccfdc4a0805f9 Mon Sep 17 00:00:00 2001 From: Yogesh Garg <1059168+yogeshg@users.noreply.github.com> Date: Sat, 3 Nov 2018 14:03:50 +0800 Subject: [PATCH] [SPARK-25901][CORE] Use only one thread in BarrierTaskContext companion object ## What changes were proposed in this pull request? Now we use only one `timer` (and thus a backing thread) in `BarrierTaskContext` companion object, and the objects can add `timerTasks` to that `timer`. ## How was this patch tested? This was tested manually by generating logs and seeing that they look the same as ones before, namely, that is, a partition waiting on another partition for 5seconds generates 4-5 log messages when the frequency of logging is set to 1second. Closes #22912 from yogeshg/thread. Authored-by: Yogesh Garg <1059168+yogeshg@users.noreply.github.com> Signed-off-by: Xingbo Jiang --- .../main/scala/org/apache/spark/BarrierTaskContext.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala index 90a5c4130f799..7ce421e5479ee 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala @@ -41,14 +41,14 @@ import org.apache.spark.util._ class BarrierTaskContext private[spark] ( taskContext: TaskContext) extends TaskContext with Logging { + import BarrierTaskContext._ + // Find the driver side RPCEndpointRef of the coordinator that handles all the barrier() calls. private val barrierCoordinator: RpcEndpointRef = { val env = SparkEnv.get RpcUtils.makeDriverRef("barrierSync", env.conf, env.rpcEnv) } - private val timer = new Timer("Barrier task timer for barrier() calls.") - // Local barrierEpoch that identify a barrier() call from current task, it shall be identical // with the driver side epoch. private var barrierEpoch = 0 @@ -234,4 +234,7 @@ object BarrierTaskContext { @Experimental @Since("2.4.0") def get(): BarrierTaskContext = TaskContext.get().asInstanceOf[BarrierTaskContext] + + private val timer = new Timer("Barrier task timer for barrier() calls.") + }