Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25998] [CORE] Change TorrentBroadcast to hold weak reference of broadcast object #22995

Closed
wants to merge 8 commits into from

Conversation

bkrieger
Copy link

@bkrieger bkrieger commented Nov 9, 2018

What changes were proposed in this pull request?

This PR changes the broadcast object in TorrentBroadcast from a strong reference to a weak reference. This allows it to be garbage collected even if the Dataset is held in memory. This is ok, because the broadcast object can always be re-read.

How was this patch tested?

Tested in Spark shell by taking a heap dump, full repro steps listed in https://issues.apache.org/jira/browse/SPARK-25998.

@mridulm
Copy link
Contributor

mridulm commented Nov 16, 2018

ok to test

@@ -93,7 +96,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
private var checksums: Array[Int] = _

override protected def getValue() = {
_value
val memoized: T = if (_value == null) null.asInstanceOf[T] else _value.get
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose there is a race condition here, in that several threads could end up simultaneously setting the reference. It won't be incorrect as the data ought to be the same. I am not sure of the access pattern for this object; maybe it's always single-threaded. But if lots are reading, you can imagine them all causing a call to readBroadcastBlock() simultaneously.

Introducing another object to lock on is safe and not too much extra legwork. Might be worth it.

Isn't WeakReference cleared on any GC? would SoftReference be better to hold out until memory is exhausted? to avoid re-reading. There's a tradeoff there.

Good idea, just surprisingly full of possible gotchas.

Nit: isn't val memoized = if (_value == null) null else _value.get sufficient?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I'll make it synchronized, so it only loads one at a time.

Re: WeakReference, sure, I can change it to SoftReference. That'll be closer to the original behavior, and should still give the improvement we want.

When I try with .asInstanceOf[T] it fails to compile with:

[error] /Users/bkrieger/Documents/git/spark/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala:98: type mismatch;
[error]  found   : Null(null)
[error]  required: T
[error]     val memoized: T = if (_value == null) null else _value.get
[error]                                           ^
[info] Null(null) <: T?
[info] false
[error] one error found

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, weird, I thought it would work based on a little local example, but yeah leave the cast in of course.

@bkrieger
Copy link
Author

@srowen @mridulm for some reason it looks like tests aren't being triggered, can one of you trigger?

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll trigger tests

@@ -92,8 +95,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
/** The checksum for all the blocks. */
private var checksums: Array[Int] = _

override protected def getValue() = {
_value
override protected def getValue() = synchronized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that below, the readBroadcastBlock() method is synchronized on the companion object, which makes me nervous. However a lazy val is also implemented with this.synchronized, so I suspect this is fine. We pay the cost of synchronization on every call now, but I think that is OK here, as a simple flag won't help us figure out whether the SoftReference has been cleared.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove the TorrentBroadcast.synchronized in readBroadcastBlock, since we're already synchronizing in its only caller? Though I'm not sure why it was necessary in the first place, as readBroadcastBlock should only have been called once before this PR.

Regardless, I agree that the perf hit should be ok. Let me know if you want any of this changed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, that TorrentBroadcast.synchronized is from a very old version of the code in 2013, when more things used that lock. I'm pretty certain it's obsolete. However this code accesses broadcastCache and that needs synchronization. (It's kind of unfortunate where this object is). I think we could actually improve this by locking on broadcastCache for basically the whole block. I think that's a safe improvement as nothing else uses broadcastCache, nothing else is synchronized here, and should behave just the same.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean switching TorrentBroadcast.synchronized to broadcastCache.synchronized inside readBroadcastBlock, or changing this.synchronized to broadcastCache.synchronized inside getValue() (and getting rid of the lock in readBroadcastBlock?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need two locks, one to protect the local reference to the block, and one to protect the shared cache object. I was thinking of the former.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@SparkQA
Copy link

SparkQA commented Nov 27, 2018

Test build #4443 has finished for PR 22995 at commit 09ae762.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 27, 2018

Test build #4444 has finished for PR 22995 at commit 09ae762.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Nov 28, 2018

Merged to master

@asfgit asfgit closed this in 87bd9c7 Nov 28, 2018
vinooganesh pushed a commit to palantir/spark that referenced this pull request Jan 15, 2019
… broadcast object

## What changes were proposed in this pull request?

This PR changes the broadcast object in TorrentBroadcast from a strong reference to a weak reference. This allows it to be garbage collected even if the Dataset is held in memory. This is ok, because the broadcast object can always be re-read.

## How was this patch tested?

Tested in Spark shell by taking a heap dump, full repro steps listed in https://issues.apache.org/jira/browse/SPARK-25998.

Closes apache#22995 from bkrieger/bk/torrent-broadcast-weak.

Authored-by: Brandon Krieger <bkrieger@palantir.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
bulldozer-bot bot pushed a commit to palantir/spark that referenced this pull request Jan 16, 2019
… broadcast object (#469)

… broadcast object

## What changes were proposed in this pull request?

This PR changes the broadcast object in TorrentBroadcast from a strong reference to a weak reference. This allows it to be garbage collected even if the Dataset is held in memory. This is ok, because the broadcast object can always be re-read.

## How was this patch tested?

Tested in Spark shell by taking a heap dump, full repro steps listed in https://issues.apache.org/jira/browse/SPARK-25998.

Closes apache#22995 from bkrieger/bk/torrent-broadcast-weak.

Authored-by: Brandon Krieger <bkrieger@palantir.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
… broadcast object

## What changes were proposed in this pull request?

This PR changes the broadcast object in TorrentBroadcast from a strong reference to a weak reference. This allows it to be garbage collected even if the Dataset is held in memory. This is ok, because the broadcast object can always be re-read.

## How was this patch tested?

Tested in Spark shell by taking a heap dump, full repro steps listed in https://issues.apache.org/jira/browse/SPARK-25998.

Closes apache#22995 from bkrieger/bk/torrent-broadcast-weak.

Authored-by: Brandon Krieger <bkrieger@palantir.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants