Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12920][SQL] Honor "spark.ui.retainedStages" to reduce mem-pressure #10846

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -140,7 +140,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
/** If stages is too large, remove and garbage collect old stages */
private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
val toRemove = (stages.size - retainedStages)
stages.take(toRemove).foreach { s =>
stageIdToData.remove((s.stageId, s.attemptId))
stageIdToInfo.remove(s.stageId)
Expand All @@ -152,7 +152,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
/** If jobs is too large, remove and garbage collect old jobs */
private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized {
if (jobs.size > retainedJobs) {
val toRemove = math.max(retainedJobs / 10, 1)
val toRemove = (jobs.size - retainedJobs)
jobs.take(toRemove).foreach { job =>
// Remove the job's UI data, if it exists
jobIdToData.remove(job.jobId).foreach { removedJob =>
Expand Down
Expand Up @@ -84,18 +84,27 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
}

test("test LRU eviction of stages") {
def runWithListener(listener: JobProgressListener) : Unit = {
for (i <- 1 to 50) {
listener.onStageSubmitted(createStageStartEvent(i))
listener.onStageCompleted(createStageEndEvent(i))
}
assertActiveJobsStateIsEmpty(listener)
}
val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString)
val listener = new JobProgressListener(conf)

for (i <- 1 to 50) {
listener.onStageSubmitted(createStageStartEvent(i))
listener.onStageCompleted(createStageEndEvent(i))
}
assertActiveJobsStateIsEmpty(listener)
var listener = new JobProgressListener(conf)

// Test with 5 retainedStages
runWithListener(listener)
listener.completedStages.size should be (5)
listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46))

// Test with 0 retainedStages
conf.set("spark.ui.retainedStages", 0.toString)
listener = new JobProgressListener(conf)
runWithListener(listener)
listener.completedStages.size should be (0)
}

test("test clearing of stageIdToActiveJobs") {
Expand All @@ -121,20 +130,29 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
}

test("test clearing of jobGroupToJobIds") {
def runWithListener (listener: JobProgressListener): Unit = {
// Run 50 jobs, each with one stage
for (jobId <- 0 to 50) {
listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = Some(jobId.toString)))
listener.onStageSubmitted(createStageStartEvent(0))
listener.onStageCompleted(createStageEndEvent(0, failed = false))
listener.onJobEnd(createJobEndEvent(jobId, false))
}
assertActiveJobsStateIsEmpty(listener)
}
val conf = new SparkConf()
conf.set("spark.ui.retainedJobs", 5.toString)
val listener = new JobProgressListener(conf)

// Run 50 jobs, each with one stage
for (jobId <- 0 to 50) {
listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = Some(jobId.toString)))
listener.onStageSubmitted(createStageStartEvent(0))
listener.onStageCompleted(createStageEndEvent(0, failed = false))
listener.onJobEnd(createJobEndEvent(jobId, false))
}
assertActiveJobsStateIsEmpty(listener)
var listener = new JobProgressListener(conf)
runWithListener(listener)
// This collection won't become empty, but it should be bounded by spark.ui.retainedJobs
listener.jobGroupToJobIds.size should be (5)

// Test with 0 jobs
conf.set("spark.ui.retainedJobs", 0.toString)
listener = new JobProgressListener(conf)
runWithListener(listener)
listener.jobGroupToJobIds.size should be (0)
}

test("test LRU eviction of jobs") {
Expand Down