Skip to content

Commit

Permalink
Merge pull request #11 from davidyuan1223/fix_4186
Browse files Browse the repository at this point in the history
fix_4186
  • Loading branch information
davidyuan1223 committed Sep 26, 2023
2 parents e244029 + 360d183 commit c83836b
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,17 @@ class SQLOperationListener(
spark: SparkSession) extends StatsReportListener with Logging {

private val operationId: String = operation.getHandle.identifier.toString
private lazy val activeJobs = new java.util.HashSet[Int]()
private lazy val activeStages = new ConcurrentHashMap[SparkStageAttempt, SparkStageInfo]()
private var executionId: Option[Long] = None

private lazy val activeJobs = new ConcurrentHashMap[Int, JobInfo]()
private lazy val activeStages = new ConcurrentHashMap[StageAttempt, StageInfo]()

private var executionId: Option[Long] = None
private val conf: KyuubiConf = operation.getSession.sessionManager.getConf
private lazy val consoleProgressBar =
if (conf.get(ENGINE_SPARK_SHOW_PROGRESS)) {
Some(new SparkConsoleProgressBar(
operation,
activeJobs,
activeStages,
conf.get(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL),
conf.get(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT)))
Expand Down Expand Up @@ -82,6 +84,7 @@ class SQLOperationListener(
override def onJobStart(jobStart: SparkListenerJobStart): Unit = activeJobs.synchronized {
if (sameGroupId(jobStart.properties)) {
val jobId = jobStart.jobId
val stageIds = jobStart.stageInfos.map(_.stageId).toList
val stageSize = jobStart.stageInfos.size
if (executionId.isEmpty) {
executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY))
Expand All @@ -94,7 +97,10 @@ class SQLOperationListener(
}
}
withOperationLog {
activeJobs.add(jobId)
activeJobs.put(
jobId,
new JobInfo(stageSize, stageIds)
)
info(s"Query [$operationId]: Job $jobId started with $stageSize stages," +
s" ${activeJobs.size()} active jobs running")
}
Expand All @@ -103,7 +109,7 @@ class SQLOperationListener(

override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = activeJobs.synchronized {
val jobId = jobEnd.jobId
if (activeJobs.remove(jobId)) {
if (activeJobs.remove(jobId) != null ) {
val hint = jobEnd.jobResult match {
case JobSucceeded => "succeeded"
case _ => "failed" // TODO: Handle JobFailed(exception: Exception)
Expand Down Expand Up @@ -134,14 +140,21 @@ class SQLOperationListener(

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
val stageInfo = stageCompleted.stageInfo
val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber())
val stageAttempt = StageAttempt(stageInfo.stageId, stageInfo.attemptNumber())
activeJobs.forEach((_, jobInfo) => {
if (jobInfo.stageIds.contains(stageInfo.stageId)) {
jobInfo.numCompleteStages += 1
}
})

activeStages.synchronized {
if (activeStages.remove(stageAttempt) != null) {
withOperationLog(super.onStageCompleted(stageCompleted))
}
}
}


override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = activeStages.synchronized {
val stageAttempt = SparkStageAttempt(taskStart.stageId, taskStart.stageAttemptId)
if (activeStages.containsKey(stageAttempt)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import org.apache.kyuubi.operation.Operation

class SparkConsoleProgressBar(
operation: Operation,
liveStages: ConcurrentHashMap[SparkStageAttempt, SparkStageInfo],
liveJobs: ConcurrentHashMap[Int, JobInfo],
liveStages: ConcurrentHashMap[StageAttempt, StageInfo],
updatePeriodMSec: Long,
timeFormat: String)
extends Logging {
Expand Down Expand Up @@ -80,6 +81,13 @@ class SparkConsoleProgressBar(
private def show(now: Long, stages: Seq[SparkStageInfo]): Unit = {
val width = TerminalWidth / stages.size
val bar = stages.map { s =>
// build job log info
val jobId: Option[Int] = liveJobs.asScala.find {
case (jobId, jobInfo) => jobInfo.stageIds.contains(s.stageId)
}.map(_._1)
val jobInfoHeader = s"[Job ${jobId} " +
s"(${liveJobs.get(jobId).numCompleteStages} / ${liveJobs.get(jobId).numStages}) Stages] "
// build stage log info
val total = s.numTasks
val header = s"[Stage ${s.stageId}:"
val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]"
Expand All @@ -93,7 +101,7 @@ class SparkConsoleProgressBar(
} else {
""
}
header + bar + tailer
jobInfoHeader + header + bar + tailer
}.mkString("")

// only refresh if it's changed OR after 1 minute (or the ssh connection will be closed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ case class SparkStageAttempt(stageId: Int, stageAttemptId: Int) {
override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)"
}

class SparkStageInfo(val stageId: Int, val numTasks: Int) {
var numActiveTasks = new AtomicInteger(0)
var numCompleteTasks = new AtomicInteger(0)
class JobInfo(val numStages: Int, val stageIds: Seq[Int]) {
var numCompleteStages = 0
}

class StageInfo(val stageId: Int, val numTasks: Int) {
var numActiveTasks = 0
var numCompleteTasks = 0
}

0 comments on commit c83836b

Please sign in to comment.