Skip to content

Commit

Permalink
Merge pull request #237 from edbalogh/bugfix/fix_broken_unit_test
Browse files Browse the repository at this point in the history
Unit tests for ApplicationStats
  • Loading branch information
dafreels committed May 20, 2021
2 parents 343efdd + 650b729 commit 0c2e291
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,44 +54,60 @@ case class ApplicationStats(jobs: mutable.Map[Int, JobDetails]) {
val duration = if(j._2.end.isDefined) {
j._2.end.get - j._2.start
} else {
""
-1
}

Map(
"jobId" -> j._1, "pipelineId" -> j._2.pipelineId, "stepId" -> j._2.stepId, "groupId" -> j._2.groupId, "status" -> j._2.status,
"start" -> j._2.start, "end" -> j._2.end, "durationMs" -> duration, "stages" -> stageStats.toList
"jobId" -> j._1, "pipelineId" -> j._2.pipelineId.getOrElse(""), "stepId" -> j._2.stepId.getOrElse(""),
"groupId" -> j._2.groupId.getOrElse(""), "status" -> j._2.status.getOrElse(""), "start" -> j._2.start,
"end" -> j._2.end.getOrElse(""), "durationMs" -> duration, "stages" -> stageStats.toList
)
}).toList
results
}

private def stageStatsToMap(stage: StageInfo): Map[String, Any] = {
val task = stage.taskMetrics
val in = task.inputMetrics
val out= task.outputMetrics
val clockTime = if (stage.completionTime.isDefined && stage.submissionTime.isDefined) {
val duration = if (stage.completionTime.isDefined && stage.submissionTime.isDefined) {
stage.completionTime.get - stage.submissionTime.get
} else { -1 }
val duration = if(stage.submissionTime.isDefined && stage.completionTime.isDefined) {
stage.completionTime.get - stage.submissionTime.get
} else { "" }
Map(
}

val basicStats = Map(
"stageId" -> stage.stageId, "stageName" -> stage.name, "attemptNumber" -> stage.attemptNumber,
"startTime" -> stage.submissionTime, "endTime" -> stage.completionTime, "clockTime" -> clockTime,
"bytesRead" -> in.bytesRead, "recordsRead" -> in.recordsRead,
"bytesWritten" -> out.bytesWritten, "recordsWritten" -> out.recordsWritten,
"cpuTime" -> task.executorCpuTime, "gcTime" -> task.jvmGCTime,
"executorRunTime" -> task.executorRunTime, "executorCpuTime" -> task.executorCpuTime,
"peakExecutorMemory" -> task.peakExecutionMemory, "failureReason" -> stage.failureReason,
"startTime" -> stage.submissionTime, "endTime" -> stage.completionTime, "failureReason" -> stage.failureReason,
"tasks" -> stage.numTasks, "parentIds" -> stage.parentIds, "start" -> stage.submissionTime,
"end" -> stage.completionTime, "durationMs" -> duration
)

// add task stats if available
if (stage.taskMetrics == null) {
basicStats
} else {
val task = stage.taskMetrics

val inMetrics = if (task.inputMetrics != null) {
Map("bytesRead" -> task.inputMetrics.bytesRead, "recordsRead" -> task.inputMetrics.recordsRead)
} else {
Map()
}

val outMetrics = if (task.outputMetrics != null) {
val out = task.outputMetrics
Map(
"bytesWritten" -> out.bytesWritten, "recordsWritten" -> out.recordsWritten
)
} else {
Map()
}

Map(
"cpuTime" -> task.executorCpuTime, "gcTime" -> task.jvmGCTime,
"executorRunTime" -> task.executorRunTime, "executorCpuTime" -> task.executorCpuTime,
"peakExecutorMemory" -> task.peakExecutionMemory
) ++ inMetrics ++ outMetrics ++ basicStats
}
}
}

case class ExecutorDetails(executorId: String, active: Boolean, start: Long, host: String, totalCores: Int,
end: Option[Long], removedReason: Option[String], updates: Option[Map[Long, Any]])

case class JobDetails(jobId: Int, start: Long, end: Option[Long], status: Option[String], pipelineId: Option[String],
stepId: Option[String], groupId: Option[String], stages: mutable.Map[Int, StageInfo])

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package com.acxiom.pipeline

import org.apache.spark.scheduler._
import org.joda.time.DateTime
import org.scalatest.FunSpec
import scala.collection.mutable


class ApplicationStatsTests extends FunSpec {
describe("ApplicationStats - Basic Tests") {
it("Should create and manage ApplicationStats object") {
val stats = ApplicationStats(mutable.Map())
assert(stats.jobs.isEmpty)
assert(!stats.isActive)

// shared execInfo
val currTime = new DateTime()
val execInfo = PipelineExecutionInfo(Some("step-1"), Some("pipeline-1"), None, Some("group-1"))

// create stages for job 0
val initStagesJob0 = (0 to 4).map(x => {
new StageInfo(x, 1, s"test-stage-$x", 1, Seq(), Seq(), s"init-details"
)
})
val jobStart = SparkListenerJobStart(0, currTime.getMillis, initStagesJob0)

stats.startJob(jobStart, execInfo)
// make sure job is added
assert(stats.jobs.size == 1)
assert(stats.isActive)
assert(stats.jobs(0).stages.size == 5)
stats.jobs(0).stages.foreach(s => {
assert(s._2.details == "init-details")
})

// create stages for job 1
val initStagesJob1 = (5 to 7).map(x => {
new StageInfo(x, 2, s"test-stage-$x", 1, Seq(), Seq(), s"init-details"
)
})

stats.startJob(jobStart.copy(jobId = 1, stageInfos = initStagesJob1), execInfo.copy(stepId=Some("step-2")))
// make sure job is added
assert(stats.jobs.size == 2)
assert(stats.jobs(1).stages.size == 3)

// create stages for job 2 (for step-1)
val initStagesJob2 = (8 to 9).map(x => {
new StageInfo(x, 2, s"test-stage-$x", 1, Seq(), Seq(), s"init-details"
)
})

stats.startJob(jobStart.copy(jobId = 2, stageInfos = initStagesJob2), execInfo.copy(stepId=Some("step-2")))
// make sure job is added
assert(stats.jobs.size == 3)
assert(stats.jobs(2).stages.size == 2)

// update the stages
(0 to 6).foreach(x => {
val updatedStageInfo = new StageInfo(x, 1, s"test-stage-$x", 1, Seq(), Seq(), s"updated-details")
stats.endStage(SparkListenerStageCompleted(updatedStageInfo))
})

val endTime = currTime.getMillis + 1000L
// end the job
stats.endJob(SparkListenerJobEnd(0, endTime, org.apache.spark.scheduler.JobSucceeded))
stats.endJob(SparkListenerJobEnd(1, endTime + 1000L, org.apache.spark.scheduler.JobSucceeded))
stats.endJob(SparkListenerJobEnd(2, endTime + 2000L, org.apache.spark.scheduler.JobSucceeded))

val job0 = stats.jobs(0)
assert(job0.start == currTime.getMillis)
assert(job0.pipelineId == execInfo.pipelineId)
assert(job0.stepId == execInfo.stepId)
assert(job0.groupId == execInfo.groupId)
assert(job0.stages.size == 5)
assert(job0.end.contains(endTime))
assert(job0.status.nonEmpty)
(0 to 4).foreach(x => {
assert(job0.stages.contains(x))
assert(job0.stages(x).details == "updated-details")
})

// verify the summary for step 1
val step1 = stats.getSummary(Some("pipeline-1"), Some("step-1"), Some("group-1"))
assert(step1.exists(s => {
s.getOrElse("jobId", -1).asInstanceOf[Int] == 0
s.getOrElse("stepId", "").asInstanceOf[String] == "step-1"
s.getOrElse("groupId", "").asInstanceOf[String] == "group-1"
s.getOrElse("durationMs", 0L).asInstanceOf[Long] == 1000L
s.getOrElse("stages", List()).asInstanceOf[List[Any]].length == 5
}))

val job1 = stats.jobs(1)
assert(job1.start == currTime.getMillis)
assert(job1.pipelineId == execInfo.pipelineId)
assert(job1.stepId.contains("step-2"))
assert(job1.groupId == execInfo.groupId)
assert(job1.end.contains(endTime + 1000L))
assert(job1.stages.size == 3)
assert(job1.status.nonEmpty)

(5 to 7).foreach(x => {
assert(job1.stages.contains(x))
if (x == 7) {
assert(job1.stages(x).details == "init-details")
} else {
assert(job1.stages(x).details == "updated-details")
}
})

val job2 = stats.jobs(2)
assert(job2.start == currTime.getMillis)
assert(job2.pipelineId == execInfo.pipelineId)
assert(job2.stepId.contains("step-2"))
assert(job2.groupId == execInfo.groupId)
assert(job2.end.contains(endTime + 2000L))
assert(job2.stages.size == 2)
assert(job2.status.nonEmpty)

(8 to 9).foreach(x => {
assert(job2.stages.contains(x))
})

// verify summary for step-2
val step2 = stats.getSummary(Some("pipeline-1"), Some("step-2"), Some("group-1"))
assert(step2.exists(s => {
s.getOrElse("jobId", -1).asInstanceOf[Int] == 1
s.getOrElse("stepId", "").asInstanceOf[String] == "step-2"
s.getOrElse("groupId", "").asInstanceOf[String] == "group-1"
s.getOrElse("durationMs", 0L).asInstanceOf[Long] == 1000L
s.getOrElse("stages", List()).asInstanceOf[List[Any]].length == 3
}))

assert(step2.exists(s => {
s.getOrElse("jobId", -1).asInstanceOf[Int] == 2
s.getOrElse("stepId", "").asInstanceOf[String] == "step-2"
s.getOrElse("groupId", "").asInstanceOf[String] == "group-1"
s.getOrElse("durationMs", 0L).asInstanceOf[Long] == 2000L
s.getOrElse("stages", List()).asInstanceOf[List[Any]].length == 2
}))

stats.reset()
assert(!stats.isActive)
}
}
}

0 comments on commit 0c2e291

Please sign in to comment.