From acb6d68d94ae4576633c67341926b6a0a8e570a4 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 10 Jan 2017 16:34:04 +0800 Subject: [PATCH] Drop more elements when stageData.taskData.size > retainedTasks --- .../spark/ui/jobs/JobProgressListener.scala | 3 ++- .../ui/jobs/JobProgressListenerSuite.scala | 22 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 83dc5d874589e..a759bd6bdd4af 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -409,7 +409,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { // If Tasks is too large, remove and garbage collect old tasks if (stageData.taskData.size > retainedTasks) { - stageData.taskData = stageData.taskData.drop(stageData.taskData.size - retainedTasks) + stageData.taskData = stageData.taskData.drop( + stageData.taskData.size - retainedTasks + (retainedTasks * 0.01).toInt) } for ( diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index da853f1be8b95..f4afea2945164 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -408,4 +408,26 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with val newTaskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo) assert(newTaskInfo.accumulables === Seq(userAccum)) } + + test("SPARK-19146 drop more elements when stageData.taskData.size > retainedTasks") { + val conf = new SparkConf() + conf.set("spark.ui.retainedTasks", "100") + val listener = new JobProgressListener(conf) + val taskMetrics = TaskMetrics.empty + taskMetrics.mergeShuffleReadMetrics() + + val task = new ShuffleMapTask(0) + val taskType = Utils.getFormattedClassName(task) + + for (t <- 1 to 101) { + val taskInfo = new TaskInfo(t, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) + taskInfo.finishTime = 1 + listener.onTaskEnd( + SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics)) + } + + // 101 - (101 - 100 + 100 * 0.01) = 99 + assert(listener.stageIdToData((task.stageId, task.stageAttemptId)).taskData.size === 99) + } + }