diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala index 5e7e33712ec..4e4a940d295 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala @@ -44,17 +44,15 @@ class SQLOperationListener( spark: SparkSession) extends StatsReportListener with Logging { private val operationId: String = operation.getHandle.identifier.toString - - private lazy val activeJobs = new ConcurrentHashMap[Int, JobInfo]() - private lazy val activeStages = new ConcurrentHashMap[StageAttempt, StageInfo]() - + private lazy val activeJobs = new java.util.HashSet[Int]() + private lazy val activeStages = new ConcurrentHashMap[SparkStageAttempt, SparkStageInfo]() private var executionId: Option[Long] = None + private val conf: KyuubiConf = operation.getSession.sessionManager.getConf private lazy val consoleProgressBar = if (conf.get(ENGINE_SPARK_SHOW_PROGRESS)) { Some(new SparkConsoleProgressBar( operation, - activeJobs, activeStages, conf.get(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL), conf.get(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT))) @@ -84,7 +82,6 @@ class SQLOperationListener( override def onJobStart(jobStart: SparkListenerJobStart): Unit = activeJobs.synchronized { if (sameGroupId(jobStart.properties)) { val jobId = jobStart.jobId - val stageIds = jobStart.stageInfos.map(_.stageId).toList val stageSize = jobStart.stageInfos.size if (executionId.isEmpty) { executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY)) @@ -97,10 +94,7 @@ class SQLOperationListener( } } withOperationLog { - activeJobs.put( - jobId, - new JobInfo(stageSize, stageIds) - ) + activeJobs.add(jobId) info(s"Query [$operationId]: Job $jobId started with $stageSize stages," + s" ${activeJobs.size()} active jobs running") } @@ -109,7 +103,7 @@ class SQLOperationListener( override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = activeJobs.synchronized { val jobId = jobEnd.jobId - if (activeJobs.remove(jobId) != null ) { + if (activeJobs.remove(jobId)) { val hint = jobEnd.jobResult match { case JobSucceeded => "succeeded" case _ => "failed" // TODO: Handle JobFailed(exception: Exception) @@ -140,13 +134,7 @@ class SQLOperationListener( override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { val stageInfo = stageCompleted.stageInfo - val stageAttempt = StageAttempt(stageInfo.stageId, stageInfo.attemptNumber()) - activeJobs.forEach((_, jobInfo) => { - if (jobInfo.stageIds.contains(stageInfo.stageId)) { - jobInfo.numCompleteStages += 1 - } - }) - + val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber()) activeStages.synchronized { if (activeStages.remove(stageAttempt) != null) { withOperationLog(super.onStageCompleted(stageCompleted)) @@ -154,7 +142,6 @@ class SQLOperationListener( } } - override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = activeStages.synchronized { val stageAttempt = SparkStageAttempt(taskStart.stageId, taskStart.stageAttemptId) if (activeStages.containsKey(stageAttempt)) { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index 148427214fa..dc8b493cc04 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -29,8 +29,7 @@ import org.apache.kyuubi.operation.Operation class SparkConsoleProgressBar( operation: Operation, - liveJobs: ConcurrentHashMap[Int, JobInfo], - liveStages: ConcurrentHashMap[StageAttempt, StageInfo], + liveStages: ConcurrentHashMap[SparkStageAttempt, SparkStageInfo], updatePeriodMSec: Long, timeFormat: String) extends Logging { @@ -81,13 +80,6 @@ class SparkConsoleProgressBar( private def show(now: Long, stages: Seq[SparkStageInfo]): Unit = { val width = TerminalWidth / stages.size val bar = stages.map { s => - // build job log info - val jobId: Option[Int] = liveJobs.asScala.find { - case (jobId, jobInfo) => jobInfo.stageIds.contains(s.stageId) - }.map(_._1) - val jobInfoHeader = s"[Job ${jobId} " + - s"(${liveJobs.get(jobId).numCompleteStages} / ${liveJobs.get(jobId).numStages}) Stages] " - // build stage log info val total = s.numTasks val header = s"[Stage ${s.stageId}:" val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]" @@ -101,7 +93,7 @@ class SparkConsoleProgressBar( } else { "" } - jobInfoHeader + header + bar + tailer + header + bar + tailer }.mkString("") // only refresh if it's changed OR after 1 minute (or the ssh connection will be closed diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala index 1e78e1c2f4a..2ea9c3fdae6 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala @@ -23,11 +23,7 @@ case class SparkStageAttempt(stageId: Int, stageAttemptId: Int) { override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" } -class JobInfo(val numStages: Int, val stageIds: Seq[Int]) { - var numCompleteStages = 0 -} - -class StageInfo(val stageId: Int, val numTasks: Int) { - var numActiveTasks = 0 - var numCompleteTasks = 0 +class SparkStageInfo(val stageId: Int, val numTasks: Int) { + var numActiveTasks = new AtomicInteger(0) + var numCompleteTasks = new AtomicInteger(0) }