Skip to content

Commit

Permalink
fix_4186
Browse files Browse the repository at this point in the history
  • Loading branch information
davidyuan1223 committed Oct 10, 2023
1 parent e1d213e commit 56b91a3
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class SQLOperationListener(
spark: SparkSession) extends StatsReportListener with Logging {

private val operationId: String = operation.getHandle.identifier.toString
private lazy val activeJobs = new java.util.HashSet[Int]()
private lazy val activeJobs = new ConcurrentHashMap[Int, SparkJobInfo]()
private lazy val activeStages = new ConcurrentHashMap[SparkStageAttempt, SparkStageInfo]()
private var executionId: Option[Long] = None

Expand All @@ -53,6 +53,7 @@ class SQLOperationListener(
if (conf.get(ENGINE_SPARK_SHOW_PROGRESS)) {
Some(new SparkConsoleProgressBar(
operation,
activeJobs,
activeStages,
conf.get(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL),
conf.get(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT)))
Expand All @@ -79,37 +80,45 @@ class SQLOperationListener(
}
}

override def onJobStart(jobStart: SparkListenerJobStart): Unit = activeJobs.synchronized {
if (sameGroupId(jobStart.properties)) {
val jobId = jobStart.jobId
val stageSize = jobStart.stageInfos.size
if (executionId.isEmpty) {
executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY))
.map(_.toLong)
consoleProgressBar
operation match {
case executeStatement: ExecuteStatement =>
executeStatement.setCompiledStateIfNeeded()
case _ =>
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
activeJobs.synchronized {
if (sameGroupId(jobStart.properties)) {
val jobId = jobStart.jobId
val stageIds = jobStart.stageInfos.map(_.stageId)
val stageSize = jobStart.stageInfos.size
if (executionId.isEmpty) {
executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY))
.map(_.toLong)
consoleProgressBar
operation match {
case executeStatement: ExecuteStatement =>
executeStatement.setCompiledStateIfNeeded()
case _ =>
}
}
activeJobs.put(
jobId,
new SparkJobInfo(stageSize, stageIds)
)
withOperationLog {
info(s"Query [$operationId]: Job $jobId started with $stageSize stages," +
s" ${activeJobs.size()} active jobs running")
}
}
withOperationLog {
activeJobs.add(jobId)
info(s"Query [$operationId]: Job $jobId started with $stageSize stages," +
s" ${activeJobs.size()} active jobs running")
}
}
}

override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = activeJobs.synchronized {
val jobId = jobEnd.jobId
if (activeJobs.remove(jobId)) {
val hint = jobEnd.jobResult match {
case JobSucceeded => "succeeded"
case _ => "failed" // TODO: Handle JobFailed(exception: Exception)
}
withOperationLog {
info(s"Query [$operationId]: Job $jobId $hint, ${activeJobs.size()} active jobs running")
activeJobs.synchronized {
if (activeJobs.remove(jobId) != null) {
val hint = jobEnd.jobResult match {
case JobSucceeded => "succeeded"
case _ => "failed" // TODO: Handle JobFailed(exception: Exception)
}
withOperationLog {
info(s"Query [$operationId]: Job $jobId $hint, ${activeJobs.size()} active jobs running")
}
}
}
}
Expand All @@ -134,9 +143,17 @@ class SQLOperationListener(

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
val stageInfo = stageCompleted.stageInfo
val stageId = stageInfo.stageId
val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber())
activeStages.synchronized {
if (activeStages.remove(stageAttempt) != null) {
activeJobs.synchronized {
activeJobs.forEach((jobId, sparkJobInfo) => {
if (sparkJobInfo.stageIds.contains(stageId)) {
sparkJobInfo.numCompleteStages.getAndIncrement()
}
})
}
withOperationLog(super.onStageCompleted(stageCompleted))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.kyuubi.operation.Operation

class SparkConsoleProgressBar(
operation: Operation,
liveJobs: ConcurrentHashMap[Int, SparkJobInfo],
liveStages: ConcurrentHashMap[SparkStageAttempt, SparkStageInfo],
updatePeriodMSec: Long,
timeFormat: String)
Expand Down Expand Up @@ -71,7 +72,14 @@ class SparkConsoleProgressBar(
show(now, stages.take(3)) // display at most 3 stages in same time
}
}

private def findJobId(stageId: Int): Int = {
liveJobs.forEach((jobId, sparkJobInfo) => {
if (sparkJobInfo.stageIds.contains(stageId)) {
return jobId
}
})
-1
}
/**
* Show progress bar in console. The progress bar is displayed in the next line
* after your last output, keeps overwriting itself to hold in one line. The logging will follow
Expand All @@ -81,7 +89,13 @@ class SparkConsoleProgressBar(
val width = TerminalWidth / stages.size
val bar = stages.map { s =>
val total = s.numTasks
val header = s"[Stage ${s.stageId}:"
val jobId = findJobId(s.stageId)
var jobHeader = s"[There is no job about this stage]"
if (jobId != -1) {
jobHeader = s"[Job $jobId (${liveJobs.get(jobId).numCompleteStages} " +
s"/ ${liveJobs.get(jobId).numStages}) Stages] "
}
val header = jobHeader + s"[Stage ${s.stageId}:"
val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]"
val w = width - header.length - tailer.length
val bar =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,8 @@ class SparkStageInfo(val stageId: Int, val numTasks: Int) {
var numActiveTasks = new AtomicInteger(0)
var numCompleteTasks = new AtomicInteger(0)
}


class SparkJobInfo(val numStages: Int, val stageIds: Seq[Int]) {
var numCompleteStages = new AtomicInteger(0)
}

0 comments on commit 56b91a3

Please sign in to comment.