Skip to content

Conversation

@andrewor14
Copy link
Contributor

Bug: After the following command sc.parallelize(1 to 1000).persist.map(_ + 1).count() is run, the the persisted RDD is missing from the storage tab of the SparkUI.

Cause: The command creates two RDDs in one stage, a ParallelCollectionRDD and a MappedRDD. However, the existing StageInfo only keeps the RDDInfo of the last RDD associated with the stage (MappedRDD), and so all RDD information regarding the first RDD (ParallelCollectionRDD) is discarded. In this case, we persist the first RDD, but the StorageTab doesn't know about this RDD because it is not encoded in the StageInfo.

Fix: Record information of all RDDs in StageInfo, instead of just the last RDD (i.e. stage.rdd). Since stage boundaries are marked by shuffle dependencies, the solution is to traverse the last RDD's dependency tree, visiting only ancestor RDDs related through a sequence of narrow dependencies.


This PR also moves RDDInfo to its own file, includes a few style fixes, and adds a unit test for constructing StageInfos.

The Stage boundary is marked by shuffle dependencies. When one or more RDD
are related by narrow dependencies, they should all be associated with the
same Stage. Following backward narrow dependency pointers allows StageInfo
to hold the information of all relevant RDDs, rather than just the last one
associated with the Stage.

This commit also moves RDDInfo to its own file.
@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from StorageUtils.scala

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14306/

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14310/

@pwendell
Copy link
Contributor

Jenkins, test this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14313/

@pwendell
Copy link
Contributor

Jenkins, retest this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this syntax is a bit strange, is this the preferred approach instead of:

val narrowDependencies = dependencies.filter(_.isInstanceOf[NarrowDependency])

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I was trying to avoid isInstanceOf, but I guess in this case that's clearer

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14317/

@pwendell
Copy link
Contributor

@andrewor14 this seems to have legitimate failures...

@andrewor14
Copy link
Contributor Author

Wait, isn't it still a build timeout?

@andrewor14
Copy link
Contributor Author

Never mind, I am able to reproduce the infinite loop in the GraphX tests locally.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: but it might be nicer if this function didn't expose the ancestors to the outside world, since there is no real reason to have that in the public contract of the function. Could you instead write an inner function and recurse using that, instead of recursing down the top level function? If you did that you could probably avoid passing ancestors around at all, and instead just define the ancestors set in the outer function and the inner function will have a direct reference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way you need you pass in the ancestors set to avoid visiting nodes you've already visited. Right now, because of the default argument of mutable.Set.empty, the outsider calls this function this way: rdd.getNarrowAncestors(). Not sure if I understand what you're suggesting but do you mean have the outsider call it with rdd.getNarrowAncestors instead (without the parentheses)? In particular, you're suggesting something like

// Exposed to outsiders (i.e. the rest of Spark)
def getNarrowAncestors = {
  getNarrowDependencies(new empty set)
}

// Private helper method
private def getNarrowAncestors(ancestors) = {
  // actual recursive logic
}

right?

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14340/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14347/

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14346/

@andrewor14
Copy link
Contributor Author

@pwendell This should be ready for merge, unless we catch anything else.

@pwendell
Copy link
Contributor

Thanks Andrew, I've merged this.

@asfgit asfgit closed this in 2de5738 Apr 23, 2014
asfgit pushed a commit that referenced this pull request Apr 23, 2014
**Bug**: After the following command `sc.parallelize(1 to 1000).persist.map(_ + 1).count()` is run, the the persisted RDD is missing from the storage tab of the SparkUI.

**Cause**: The command creates two RDDs in one stage, a `ParallelCollectionRDD` and a `MappedRDD`. However, the existing StageInfo only keeps the RDDInfo of the last RDD associated with the stage (`MappedRDD`), and so all RDD information regarding the first RDD (`ParallelCollectionRDD`) is discarded. In this case, we persist the first RDD,  but the StorageTab doesn't know about this RDD because it is not encoded in the StageInfo.

**Fix**: Record information of all RDDs in StageInfo, instead of just the last RDD (i.e. `stage.rdd`). Since stage boundaries are marked by shuffle dependencies, the solution is to traverse the last RDD's dependency tree, visiting only ancestor RDDs related through a sequence of narrow dependencies.

---

This PR also moves RDDInfo to its own file, includes a few style fixes, and adds a unit test for constructing StageInfos.

Author: Andrew Or <andrewor14@gmail.com>

Closes #469 from andrewor14/storage-ui-fix and squashes the following commits:

07fc7f0 [Andrew Or] Add back comment that was accidentally removed (minor)
5d799fe [Andrew Or] Add comment to justify testing of getNarrowAncestors with cycles
9d0e2b8 [Andrew Or] Hide details of getNarrowAncestors from outsiders
d2bac8a [Andrew Or] Deal with cycles in RDD dependency graph + add extensive tests
2acb177 [Andrew Or] Move getNarrowAncestors to RDD.scala
bfe83f0 [Andrew Or] Backtrace RDD dependency tree to find all RDDs that belong to a Stage
(cherry picked from commit 2de5738)

Signed-off-by: Patrick Wendell <pwendell@gmail.com>
rxin pushed a commit to rxin/spark that referenced this pull request Apr 24, 2014
The two modified tests may fail if the race condition does not bid in our favor...

Author: Andrew Or <andrewor14@gmail.com>

Closes apache#516 from andrewor14/stage-info-test-fix and squashes the following commits:

b4b6100 [Andrew Or] Add/replace missing waitUntilEmpty() calls to listener bus
asfgit pushed a commit that referenced this pull request Apr 24, 2014
The two modified tests may fail if the race condition does not bid in our favor...

Author: Andrew Or <andrewor14@gmail.com>

Closes #516 from andrewor14/stage-info-test-fix and squashes the following commits:

b4b6100 [Andrew Or] Add/replace missing waitUntilEmpty() calls to listener bus

(cherry picked from commit 4b2bab1)
Signed-off-by: Reynold Xin <rxin@apache.org>
@andrewor14 andrewor14 deleted the storage-ui-fix branch April 29, 2014 21:40
pwendell pushed a commit to pwendell/spark that referenced this pull request May 12, 2014
…in-tests-for-mllib

[MLlib] Use a LocalSparkContext trait in test suites

Replaces the 9 instances of

```scala
class XXXSuite extends FunSuite with BeforeAndAfterAll {
  @transient private var sc: SparkContext = _

  override def beforeAll() {
    sc = new SparkContext("local", "test")
  }

  override def afterAll() {
    sc.stop()
    System.clearProperty("spark.driver.port")
  }
```

with

```scala
class XXXSuite extends FunSuite with LocalSparkContext {
```
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
**Bug**: After the following command `sc.parallelize(1 to 1000).persist.map(_ + 1).count()` is run, the the persisted RDD is missing from the storage tab of the SparkUI.

**Cause**: The command creates two RDDs in one stage, a `ParallelCollectionRDD` and a `MappedRDD`. However, the existing StageInfo only keeps the RDDInfo of the last RDD associated with the stage (`MappedRDD`), and so all RDD information regarding the first RDD (`ParallelCollectionRDD`) is discarded. In this case, we persist the first RDD,  but the StorageTab doesn't know about this RDD because it is not encoded in the StageInfo.

**Fix**: Record information of all RDDs in StageInfo, instead of just the last RDD (i.e. `stage.rdd`). Since stage boundaries are marked by shuffle dependencies, the solution is to traverse the last RDD's dependency tree, visiting only ancestor RDDs related through a sequence of narrow dependencies.

---

This PR also moves RDDInfo to its own file, includes a few style fixes, and adds a unit test for constructing StageInfos.

Author: Andrew Or <andrewor14@gmail.com>

Closes apache#469 from andrewor14/storage-ui-fix and squashes the following commits:

07fc7f0 [Andrew Or] Add back comment that was accidentally removed (minor)
5d799fe [Andrew Or] Add comment to justify testing of getNarrowAncestors with cycles
9d0e2b8 [Andrew Or] Hide details of getNarrowAncestors from outsiders
d2bac8a [Andrew Or] Deal with cycles in RDD dependency graph + add extensive tests
2acb177 [Andrew Or] Move getNarrowAncestors to RDD.scala
bfe83f0 [Andrew Or] Backtrace RDD dependency tree to find all RDDs that belong to a Stage
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
The two modified tests may fail if the race condition does not bid in our favor...

Author: Andrew Or <andrewor14@gmail.com>

Closes apache#516 from andrewor14/stage-info-test-fix and squashes the following commits:

b4b6100 [Andrew Or] Add/replace missing waitUntilEmpty() calls to listener bus
j-esse pushed a commit to j-esse/spark that referenced this pull request Jan 24, 2019
… broadcast object (apache#469)

… broadcast object

## What changes were proposed in this pull request?

This PR changes the broadcast object in TorrentBroadcast from a strong reference to a weak reference. This allows it to be garbage collected even if the Dataset is held in memory. This is ok, because the broadcast object can always be re-read.

## How was this patch tested?

Tested in Spark shell by taking a heap dump, full repro steps listed in https://issues.apache.org/jira/browse/SPARK-25998.

Closes apache#22995 from bkrieger/bk/torrent-broadcast-weak.

Authored-by: Brandon Krieger <bkrieger@palantir.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
bzhaoopenstack pushed a commit to bzhaoopenstack/spark that referenced this pull request Sep 11, 2019
This patch added the job for machine learning test.
GPU support will be added once openlab has gpu resource pool

Related-Bug: theopenlab/openlab#197
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants