Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19146][Core]Drop more elements when stageData.taskData.size > retainedTasks #16527

Closed
wants to merge 6 commits into from
Closed

Conversation

wangyum
Copy link
Member

@wangyum wangyum commented Jan 10, 2017

What changes were proposed in this pull request?

Drop more elements when stageData.taskData.size > retainedTasks to reduce the number of times on call drop function.

How was this patch tested?

Jenkins

@SparkQA
Copy link

SparkQA commented Jan 10, 2017

Test build #71118 has finished for PR 16527 at commit acb6d68.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -409,7 +409,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {

// If Tasks is too large, remove and garbage collect old tasks
if (stageData.taskData.size > retainedTasks) {
stageData.taskData = stageData.taskData.drop(stageData.taskData.size - retainedTasks)
stageData.taskData = stageData.taskData.drop(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it maybe a little clearer to write something like

val targetSize = 0.9 * retainedTasks
stageData.taskData = stageData.taskData.drop(stageData.taskData.size - targetSizse)

(And yeah does it make sense to drop more than just 1% extra, to avoid the overhead you're concerned about?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen 1% is only a conservative value, 1/10 is better.

@SparkQA
Copy link

SparkQA commented Jan 10, 2017

Test build #71127 has finished for PR 16527 at commit c306cdc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Jan 12, 2017

@wangyum one more though. I note that there are actually three instances in this file where a collection is trimmed to match a max retained number of things. See this quite related change: https://github.com/apache/spark/pull/10846/files

Originally, these trims always reduced the collection by 10% of the max retained size (which is quite consistent with your change). This could be too few in some cases and that issue fixed it. Your change goes farther to always remove at least this amount.

I was going to suggest that all three of these should be consistent. But perhaps the instance you are changing is the only one where repeatedly removing just 1 element is expensive?

CC @rajeshbalamohan and @ajbozarth -- is it fairly safe to delete a bit more than is necessary here -- it seems like it's something that is there to support the UI for informational purposes, and hasn't mattered so much exactly how much is retained.

@ajbozarth
Copy link
Member

My only concern would be users misunderstanding how spark.ui.retainedTasks works from its documentation. But I agree that this is a good change to reduce friction, and updating all the retained config uses to match like @srowen mentioned might be a good addition for consistency.

@wangyum
Copy link
Member Author

wangyum commented Jan 13, 2017

I use following code log trim stages/jobs time consuming:

   /** If stages is too large, remove and garbage collect old stages */
  private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
    if (stages.size > retainedStages) {
      val start = System.currentTimeMillis()
      val toRemove = (stages.size - retainedStages)
      stages.take(toRemove).foreach { s =>
        stageIdToData.remove((s.stageId, s.attemptId))
        stageIdToInfo.remove(s.stageId)
      }
      stages.trimStart(toRemove)
      logInfo(s"Trim stages time consuming: ${System.currentTimeMillis() - start}")
    }
  }

  /** If jobs is too large, remove and garbage collect old jobs */
  private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized {
    if (jobs.size > retainedJobs) {
      val start = System.currentTimeMillis()
      val toRemove = (jobs.size - retainedJobs)
      jobs.take(toRemove).foreach { job =>
        // Remove the job's UI data, if it exists
        jobIdToData.remove(job.jobId).foreach { removedJob =>
          // A null jobGroupId is used for jobs that are run without a job group
          val jobGroupId = removedJob.jobGroup.orNull
          // Remove the job group -> job mapping entry, if it exists
          jobGroupToJobIds.get(jobGroupId).foreach { jobsInGroup =>
            jobsInGroup.remove(job.jobId)
            // If this was the last job in this job group, remove the map entry for the job group
            if (jobsInGroup.isEmpty) {
              jobGroupToJobIds.remove(jobGroupId)
            }
          }
        }
      }
      jobs.trimStart(toRemove)
      logInfo(s"Trim jobs time consuming: ${System.currentTimeMillis() - start}")
    }
  }

and the result is:

tail -f test-time-consuming.log | grep time
17/01/13 10:03:39 INFO JobProgressListener: Trim stages time consuming: 3
17/01/13 10:03:39 INFO JobProgressListener: Trim jobs time consuming: 4
17/01/13 10:03:39 INFO JobProgressListener: Trim stages time consuming: 0
17/01/13 10:03:47 INFO JobProgressListener: Trim stages time consuming: 0
17/01/13 10:03:47 INFO JobProgressListener: Trim jobs time consuming: 0
17/01/13 10:03:47 INFO JobProgressListener: Trim stages time consuming: 0
17/01/13 10:03:56 INFO JobProgressListener: Trim stages time consuming: 1
17/01/13 10:03:56 INFO JobProgressListener: Trim jobs time consuming: 0
17/01/13 10:03:56 INFO JobProgressListener: Trim stages time consuming: 0
17/01/13 10:04:04 INFO JobProgressListener: Trim stages time consuming: 0
17/01/13 10:04:04 INFO JobProgressListener: Trim jobs time consuming: 0
17/01/13 10:04:04 INFO JobProgressListener: Trim stages time consuming: 0

It may be fine just change retainedTasks.

@srowen
Copy link
Member

srowen commented Jan 13, 2017

How about this: change all three instances to trim the retained data to the configured maximum, but, to remove at least (0.1 * maxRetained) items.

If max is 100 and there are 150 items, it would trim to leave 100 rather than all the way to 90.
If max is 100 and there are 101 items it would trim 10 (0.1 * 100) to leave 91, not just 100.

That addresses the concern, maintains current behavior in more cases, and actually restores some behavior from before the previous change.

That and a small note in the docs that this is a target maximum and that fewer elements may be retained in some circumstances.

@wangyum
Copy link
Member Author

wangyum commented Jan 14, 2017

I don't quit understand @srowen mentioned. so I simply changed it to drop dataSize - retainedSize + retainedSize / 10 items at a time.

If max is 100 and there are 150 items, it would drop (150 - 100 + 100 / 10 = 60).
If max is 100 and there are 101 items, it would drop (101 - 100 + 100 / 10 = 11).

@SparkQA
Copy link

SparkQA commented Jan 14, 2017

Test build #71373 has finished for PR 16527 at commit ddbec1e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Jan 15, 2017

Yes, I'm suggesting the behavior you describe isn't desirable. Right now, if there are 150 items and max is 100, it will delete 50. That's good. There's no good reason to delete 10 more than needed for performance. The problem case is where there is just 101 and the max is 100. Then it makes sense to enforce a minimum number removed.

I'm suggesting dropping math.max((retainedSize * 0.1).toInt, dataSize - retainedSize) in each case.

@SparkQA
Copy link

SparkQA commented Jan 15, 2017

Test build #71395 has finished for PR 16527 at commit c51c514.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ajbozarth
Copy link
Member

The latest code looks good to me, but you still need to update the documentation to match the new behavior

@srowen
Copy link
Member

srowen commented Jan 20, 2017

Ping @wangyum to update docs

@SparkQA
Copy link

SparkQA commented Jan 21, 2017

Test build #71753 has finished for PR 16527 at commit 89721cd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except minor comments, looks reasonable

How many jobs the Spark UI and status APIs remember before garbage
collecting.
How many jobs the Spark UI and status APIs remember before garbage collecting.
This is a target maximum and that fewer elements may be retained in some circumstances.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maximum and that fewer ... -> maximum, and fewer ...

@@ -431,6 +432,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
}

/**
* Remove at least (maxRetained / 10) items to reduce friction.
*/
def removedCount(dataSize: Int, retainedSize: Int): Int = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private? and since it isn't reporting a number that were removed, but a number to remove, maybe calculateNumberToRemove? or something?

@SparkQA
Copy link

SparkQA commented Jan 21, 2017

Test build #71769 has finished for PR 16527 at commit 8e4954f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Jan 23, 2017

Merged to master

@asfgit asfgit closed this in c994921 Jan 23, 2017
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
… retainedTasks

## What changes were proposed in this pull request?

Drop more elements when `stageData.taskData.size > retainedTasks` to reduce the number of times on call drop function.

## How was this patch tested?

Jenkins

Author: Yuming Wang <wgyumg@gmail.com>

Closes apache#16527 from wangyum/SPARK-19146.
cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 15, 2017
… retainedTasks

## What changes were proposed in this pull request?

Drop more elements when `stageData.taskData.size > retainedTasks` to reduce the number of times on call drop function.

## How was this patch tested?

Jenkins

Author: Yuming Wang <wgyumg@gmail.com>

Closes apache#16527 from wangyum/SPARK-19146.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants