diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala index d1b2d47a07099..ae4bed2cf1111 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala @@ -35,6 +35,8 @@ private[spark] class BroadcastManager( securityManager: SecurityManager) extends Logging { + val cleanQueryBroadcast = conf.getBoolean("spark.broadcast.autoClean.enabled", false) + private var initialized = false private var broadcastFactory: BroadcastFactory = null var cachedBroadcast = new ConcurrentHashMap[String, ListBuffer[Long]]() @@ -65,15 +67,16 @@ private[spark] class BroadcastManager( } def cleanBroadCast(executionId: String): Unit = { - if (cachedBroadcast.containsKey(executionId)) { - cachedBroadcast.get(executionId).foreach(broadcastId => unbroadcast(broadcastId, true, false)) - cachedBroadcast.remove(executionId) - } + if (cachedBroadcast.containsKey(executionId)) { + cachedBroadcast.get(executionId) + .foreach(broadcastId => unbroadcast(broadcastId, true, false)) + cachedBroadcast.remove(executionId) + } } def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, executionId: String): Broadcast[T] = { val broadcastId = nextBroadcastId.getAndIncrement() - if (executionId != null) { + if (executionId != null && cleanQueryBroadcast) { if (cachedBroadcast.containsKey(executionId)) { cachedBroadcast.get(executionId) += broadcastId } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 036fa38ce2407..09fc896d48940 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -692,7 +692,8 @@ private[spark] class DAGScheduler( // Return immediately if the job is running 0 tasks return new JobWaiter[U](this, jobId, 0, resultHandler) } - + val executionId = sc.getLocalProperty("spark.sql.execution.id") + logInfo(s"submit job : $jobId, executionId is $executionId") assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) @@ -1082,6 +1083,7 @@ private[spark] class DAGScheduler( /** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") + logInfo(s"submit stage ${stage.id} with jobId: $jobId") // First figure out the indexes of partition ids to compute. val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index e28e5cedaba26..fe1f550fc4467 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -530,18 +530,18 @@ private[spark] class TaskSetManager( private def maybeFinishTaskSet() { if (isZombie && runningTasks == 0) { sched.taskSetFinished(this) - val broadcastId = taskSet.tasks.head match { - case resultTask: ResultTask[Any, Any] => - resultTask.taskBinary.id - case shuffleMapTask: ShuffleMapTask => - shuffleMapTask.taskBinary.id - } - SparkEnv.get.broadcastManager.unbroadcast(broadcastId, true, false) if (tasksSuccessful == numTasks) { - blacklistTracker.foreach(_.updateBlacklistForSuccessfulTaskSet( - taskSet.stageId, - taskSet.stageAttemptId, - taskSetBlacklistHelperOpt.get.execToFailures)) + val broadcastId = taskSet.tasks.head match { + case resultTask: ResultTask[Any, Any] => + resultTask.taskBinary.id + case shuffleMapTask: ShuffleMapTask => + shuffleMapTask.taskBinary.id + } + SparkEnv.get.broadcastManager.unbroadcast(broadcastId, true, false) + blacklistTracker.foreach(_.updateBlacklistForSuccessfulTaskSet( + taskSet.stageId, + taskSet.stageAttemptId, + taskSetBlacklistHelperOpt.get.execToFailures)) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index a463536c3c5f6..f5ecd7ce5d6f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -21,10 +21,12 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} -object SQLExecution { + +object SQLExecution extends Logging{ val EXECUTION_ID_KEY = "spark.sql.execution.id" @@ -62,6 +64,7 @@ object SQLExecution { val sc = sparkSession.sparkContext val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY) val executionId = SQLExecution.nextExecutionId + logInfo(s"Execution Id is $executionId ") sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString) executionIdToQueryExecution.put(executionId, queryExecution) try {