Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2316] Avoid O(blocks) operations in listeners #1679

Closed
wants to merge 20 commits into from

Conversation

andrewor14
Copy link
Contributor

The existing code in StorageUtils is not the most efficient. Every time we want to update an RDDInfo we end up iterating through all blocks on all block managers just to discard most of them. The symptoms manifest themselves in the bountiful UI bugs observed in the wild. Many of these bugs are caused by the slow consumption of events in LiveListenerBus, which frequently leads to the event queue overflowing and SparkListenerEvents being dropped on the floor. The changes made in this PR avoid this by first filtering out only the blocks relevant to us before computing storage information from them.

It's worth a mention that this corner of the Spark code is also not very well-tested at all. The bulk of the changes in this PR (more than 60%) is actually test cases for the various logic in StorageUtils.scala as well as StorageTab.scala. These will eventually be extended to cover the various listeners that constitute the SparkUI.

This commit refactors storage status to keep around a set of RDD
IDs which have blocks stored in the status' block manager. The
purpose is such that we don't have to linearly scan through every
single storage status' blocks if it doesn't even contain blocks
for the RDD we're interested in in the first place.

This commit also adds a bunch of tests for StorageStatus and
StorageUtils methods. There were previously a few minor bugs in
StorageUtils.blockLocationsFromStorageStatus and
StorageUtils.filterStorageStatusByRDD that are now fixed and tested.

Going forward, we need to first cleanup the method signatures to
reflect what they actually do. Then we will make things more
efficient now that we've set the stage.
This just makes it easier to create one with a source of blocks.
The existing implementation of blockLocationFromStorageStatus relies
on a groupBy, which is somewhat expensive. The new code creates a map
from the get go and adds the block locations by iterating through the
storage statuses' blocks.

This commit also cleans up StorageUtils method signatures by removing
unnecessary methods and renaming others with long-winded names.
This particular commit is the whole point of this PR. In the existing
code we unconditionally iterate through all blocks in all block managers
whenever we want to update an RDDInfo. Now, we filter out only the
blocks of interest to us in advance, so we don't end up constructing
a huge map and doing a groupBy on it.
@SparkQA
Copy link

SparkQA commented Jul 31, 2014

QA tests have started for PR 1679. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17547/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 31, 2014

QA results for PR 1679:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17547/consoleFull

@aarondav
Copy link
Contributor

Were you able to test the performance characteristics of this versus the old stuff? Was this indeed the main cause of the live listener bus overflowing, or is that still a problem?

@andrewor14
Copy link
Contributor Author

I still need to do some benchmarking, but this seems like the most expensive operation the listeners have to carry out, especially when there are thousands of blocks on each executor.

@SparkQA
Copy link

SparkQA commented Jul 31, 2014

QA tests have started for PR 1679. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17596/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 31, 2014

QA results for PR 1679:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17596/consoleFull

Chaining filter / map / flatMaps etc. can be expensive if the
underlying collection is huge, because each of these operations
creates a copy of the collection and applies a function to it.

In terms of actual code, updateRddInfo is rewritten to iterate
through each collection only once. We used to do a filter, then
a flatMap, then another filter on all existing blocks. Now, we
move the filter into the flatMap, and replace it with filterKeys,
which does not copy the underlying map.

Unfortunately this sacrifices code readability a little bit, but
the performance gain should be worth it.
@SparkQA
Copy link

SparkQA commented Jul 31, 2014

QA tests have started for PR 1679. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17607/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 31, 2014

QA tests have started for PR 1679. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17608/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 31, 2014

QA results for PR 1679:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17607/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 31, 2014

QA results for PR 1679:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17608/consoleFull

@concretevitamin
Copy link
Contributor

This patch is much appreciated -- thanks for working on this!

On Thu, Jul 31, 2014 at 3:02 PM, Apache Spark QA notifications@github.com
wrote:

QA results for PR 1679:

  • This patch PASSES unit tests.
  • This patch merges cleanly
  • This patch adds the following public classes (experimental):
    class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {

For more information see test ouptut:

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17607/consoleFull


Reply to this email directly or view it on GitHub
#1679 (comment).

Previously we were still linearly traversing all the blocks held
by each storage status. Now we index by the RDD ID and return only
the blocks of interest to us.
This tests just about every single method in StorageStatus. In
addition, a few methods in StorageStatus are changed to return
Map instead of Seq. This commit also adds a few comments indicating
that StorageStatus#blocks is expensive and there are cheaper
alternatives.
@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA tests have started for PR 1679. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17636/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA results for PR 1679:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17636/consoleFull

@andrewor14
Copy link
Contributor Author

test this please

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA tests have started for PR 1679. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17644/consoleFull

@andrewor14 andrewor14 changed the title [SPARK-2316] Avoid O(blocks) operations in listeners [WIP][SPARK-2316] Avoid O(blocks) operations in listeners Aug 1, 2014
@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA results for PR 1679:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17644/consoleFull

Prior to this commit, the changes in the PR actually demonstrate
little performance improvement under all workloads. This is because
we update all RDDInfos, rather than only the ones whose blocks are
being updated. Thus, even though the new filter logic in StorageStatus
is correct, we still iterate through all the RDD blocks every time
a task has an updated block.

This commit avoids this by only calling StorageLevel.updateRDDInfo
on the RDDs that need to be updated.
@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA tests have started for PR 1679. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17659/consoleFull

@andrewor14
Copy link
Contributor Author

I did some benchmarking by running the following job 100 times one immediately after another. Each job launches a many short-lived tasks, each of which persists a single block. The minimality of each task allows the listener bus to keep posting events very quickly while placing a lot of stress on the listeners on consuming the events.

sc.parallelize(1 to 20000, 100).persist().count()

Before: The max queue length observed reaches 10000 at around the 65th job, and finally reaches 16730 after the last job. Before this PR, this is enough to cause the queue to start dropping events. The average time spent in StorageUtils.updateRddInfo (this was renamed) is 176.25ms.

After: The max queue length never went above 130, and the average time spent in StorageUtils.updateRddInfo is 15.47ms, more than 10 times faster than before. As a baseline for comparison, running the same job 100 times without persisting anything yields a max queue length of 55.

The dark side of the story (there is always a dark side), however, is that this improvement is only observed for RDDs with not too many partitions. Although the new code iterates through only a few RDDs' blocks instead of all RDD blocks known to mankind, it is still slow if say a single RDD contains all the blocks, in which case we still have to iterate through all the RDD blocks. For instance, this will be just as slow as before if we had executed sc.parallelize(1 to 20000, 10000).persist().count() once.

Long story short, we have reduced this from O(blocks) to O(blocks in the updated RDDs), which is a significant decrease for many but not all workloads. Maybe we can do better.


/** Return the memory used by this block manager. */
def memUsed: Long =
_nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).fold(0L)(_ + _)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can replace all of these aggregations with sum

@pwendell
Copy link
Contributor

pwendell commented Aug 2, 2014

@shivaram did your thing finish alright?

@SparkQA
Copy link

SparkQA commented Aug 2, 2014

QA results for PR 1679:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17731/consoleFull

@pwendell
Copy link
Contributor

pwendell commented Aug 2, 2014

If you fix the sum thing I think this is ready to go

@shivaram
Copy link
Contributor

shivaram commented Aug 2, 2014

@pwendell @andrewor14

Yes - the run went fine. I didn't see any listener bus overflows and the UI was fine. Also I used to previously see 1 CPU fully occupied by the StorageStatus stuff -- This time the CPU usage remained lower.

@andrewor14
Copy link
Contributor Author

Great to know @shivaram. Thanks for testing.

@pwendell
Copy link
Contributor

pwendell commented Aug 2, 2014

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Aug 2, 2014

QA tests have started for PR 1679. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17755/consoleFull

@pwendell
Copy link
Contributor

pwendell commented Aug 2, 2014

LGTM pending tests.

@SparkQA
Copy link

SparkQA commented Aug 2, 2014

QA results for PR 1679:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17755/consoleFull

@pwendell
Copy link
Contributor

pwendell commented Aug 2, 2014

I've merged this, thanks. It could be worth back porting into branch-1.0 as well, but I didn't do that yet.

@asfgit asfgit closed this in d934801 Aug 2, 2014
asfgit pushed a commit that referenced this pull request Aug 3, 2014
Minor fixes on top of #1679.

Author: Andrew Or <andrewor14@gmail.com>

Closes #1736 from andrewor14/amend-#1679 and squashes the following commits:

3b46f5e [Andrew Or] Minor fixes
(cherry picked from commit 3dc55fd)

Signed-off-by: Patrick Wendell <pwendell@gmail.com>
asfgit pushed a commit that referenced this pull request Aug 3, 2014
Minor fixes on top of #1679.

Author: Andrew Or <andrewor14@gmail.com>

Closes #1736 from andrewor14/amend-#1679 and squashes the following commits:

3b46f5e [Andrew Or] Minor fixes
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
The existing code in `StorageUtils` is not the most efficient. Every time we want to update an `RDDInfo` we end up iterating through all blocks on all block managers just to discard most of them. The symptoms manifest themselves in the bountiful UI bugs observed in the wild. Many of these bugs are caused by the slow consumption of events in `LiveListenerBus`, which frequently leads to the event queue overflowing and `SparkListenerEvent`s being dropped on the floor. The changes made in this PR avoid this by first filtering out only the blocks relevant to us before computing storage information from them.

It's worth a mention that this corner of the Spark code is also not very well-tested at all. The bulk of the changes in this PR (more than 60%) is actually test cases for the various logic in `StorageUtils.scala` as well as `StorageTab.scala`. These will eventually be extended to cover the various listeners that constitute the `SparkUI`.

Author: Andrew Or <andrewor14@gmail.com>

Closes apache#1679 from andrewor14/fix-drop-events and squashes the following commits:

f80c1fa [Andrew Or] Rewrite fold and reduceOption as sum
e132d69 [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-drop-events
14fa1c3 [Andrew Or] Simplify some code + update a few comments
a91be46 [Andrew Or] Make ExecutorsPage blazingly fast
bf6f09b [Andrew Or] Minor changes
8981de1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-drop-events
af19bc0 [Andrew Or] *UsedByRDD -> *UsedByRdd (minor)
6970bc8 [Andrew Or] Add extensive tests for StorageListener and the new code in StorageUtils
e080b9e [Andrew Or] Reduce run time of StorageUtils.updateRddInfo to near constant
2c3ef6a [Andrew Or] Actually filter out only the relevant RDDs
6fef86a [Andrew Or] Add extensive tests for new code in StorageStatus
b66b6b0 [Andrew Or] Use more efficient underlying data structures for blocks
6a7b7c0 [Andrew Or] Avoid chained operations on TraversableLike
a9ec384 [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-drop-events
b12fcd7 [Andrew Or] Fix tests + simplify sc.getRDDStorageInfo
da8e322 [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-drop-events
8e91921 [Andrew Or] Iterate through a filtered set of blocks when updating RDDInfo
7b2c4aa [Andrew Or] Rewrite blockLocationsFromStorageStatus + clean up method signatures
41fa50d [Andrew Or] Add a legacy constructor for StorageStatus
53af15d [Andrew Or] Refactor StorageStatus + add a bunch of tests
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
Minor fixes on top of apache#1679.

Author: Andrew Or <andrewor14@gmail.com>

Closes apache#1736 from andrewor14/amend-#1679 and squashes the following commits:

3b46f5e [Andrew Or] Minor fixes
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants