diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala index 1a57fcf2994..00bcf591389 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala @@ -44,15 +44,15 @@ class SQLOperationListener( spark: SparkSession) extends StatsReportListener with Logging { private val operationId: String = operation.getHandle.identifier.toString - private lazy val activeJobs = new java.util.HashSet[Int]() + private lazy val activeJobs = new ConcurrentHashMap[Int, JobInfo]() private lazy val activeStages = new ConcurrentHashMap[StageAttempt, StageInfo]() private var executionId: Option[Long] = None - private val conf: KyuubiConf = operation.getSession.sessionManager.getConf private lazy val consoleProgressBar = if (conf.get(ENGINE_SPARK_SHOW_PROGRESS)) { Some(new SparkConsoleProgressBar( operation, + activeJobs, activeStages, conf.get(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL), conf.get(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT))) @@ -82,6 +82,7 @@ class SQLOperationListener( override def onJobStart(jobStart: SparkListenerJobStart): Unit = activeJobs.synchronized { if (sameGroupId(jobStart.properties)) { val jobId = jobStart.jobId + val stageIds = jobStart.stageInfos.map(_.stageId).toList val stageSize = jobStart.stageInfos.size if (executionId.isEmpty) { executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY)) @@ -94,7 +95,10 @@ class SQLOperationListener( } } withOperationLog { - activeJobs.add(jobId) + activeJobs.put( + jobId, + new JobInfo(stageSize, stageIds) + ) info(s"Query [$operationId]: Job $jobId started with $stageSize stages," + s" ${activeJobs.size()} active jobs running") } @@ -103,7 +107,7 @@ class SQLOperationListener( override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = activeJobs.synchronized { val jobId = jobEnd.jobId - if (activeJobs.remove(jobId)) { + if (activeJobs.remove(jobId) != null ) { val hint = jobEnd.jobResult match { case JobSucceeded => "succeeded" case _ => "failed" // TODO: Handle JobFailed(exception: Exception) @@ -135,6 +139,11 @@ class SQLOperationListener( override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { val stageInfo = stageCompleted.stageInfo val stageAttempt = StageAttempt(stageInfo.stageId, stageInfo.attemptNumber()) + activeJobs.forEach((_, jobInfo) => { + if (jobInfo.stageIds.contains(stageInfo.stageId)) { + jobInfo.numCompleteStages += 1 + } + }) activeStages.synchronized { if (activeStages.remove(stageAttempt) != null) { withOperationLog(super.onStageCompleted(stageCompleted)) @@ -142,6 +151,7 @@ class SQLOperationListener( } } + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = activeStages.synchronized { val stageAttempt = StageAttempt(taskStart.stageId, taskStart.stageAttemptId) if (activeStages.containsKey(stageAttempt)) { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index fc2ebd5f8c8..3fb859617a6 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -29,6 +29,7 @@ import org.apache.kyuubi.operation.Operation class SparkConsoleProgressBar( operation: Operation, + liveJobs: ConcurrentHashMap[Int, JobInfo], liveStages: ConcurrentHashMap[StageAttempt, StageInfo], updatePeriodMSec: Long, timeFormat: String) @@ -80,6 +81,13 @@ class SparkConsoleProgressBar( private def show(now: Long, stages: Seq[StageInfo]): Unit = { val width = TerminalWidth / stages.size val bar = stages.map { s => + // build job log info + val jobId: Option[Int] = liveJobs.asScala.find { + case (jobId, jobInfo) => jobInfo.stageIds.contains(s.stageId) + }.map(_._1) + val jobInfoHeader = s"[Job ${jobId} " + + s"(${liveJobs.get(jobId).numCompleteStages} / ${liveJobs.get(jobId).numStages}) Stages] " + // build stage log info val total = s.numTasks val header = s"[Stage ${s.stageId}:" val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]" @@ -93,7 +101,7 @@ class SparkConsoleProgressBar( } else { "" } - header + bar + tailer + jobInfoHeader + header + bar + tailer }.mkString("") // only refresh if it's changed OR after 1 minute (or the ssh connection will be closed diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala index 14457086254..ae9f2ff9d82 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala @@ -21,6 +21,9 @@ case class StageAttempt(stageId: Int, stageAttemptId: Int) { override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" } +class JobInfo(val numStages: Int, val stageIds: Seq[Int]) { + var numCompleteStages = 0 +} class StageInfo(val stageId: Int, val numTasks: Int) { var numActiveTasks = 0 var numCompleteTasks = 0