Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2521] Broadcast RDD object (instead of sending it along with every task). #1452

Closed
wants to merge 5 commits into from

Conversation

rxin
Copy link
Contributor

@rxin rxin commented Jul 16, 2014

Currently (as of Spark 1.0.1), Spark sends RDD object (which contains closures) using Akka along with the task itself to the executors. This is inefficient because all tasks in the same stage use the same RDD object, but we have to send RDD object multiple times to the executors. This is especially bad when a closure references some variable that is very large. The current design led to users having to explicitly broadcast large variables.

The patch uses broadcast to send RDD objects and the closures to executors, and use Akka to only send a reference to the broadcast RDD/closure along with the partition specific information for the task. For those of you who know more about the internals, Spark already relies on broadcast to send the Hadoop JobConf every time it uses the Hadoop input, because the JobConf is large.

The user-facing impact of the change include:

  1. Users won't need to decide what to broadcast anymore, unless they would want to use a large object multiple times in different operations
  2. Task size will get smaller, resulting in faster scheduling and higher task dispatch throughput.

In addition, the change will simplify some internals of Spark, eliminating the need to maintain task caches and the complex logic to broadcast JobConf (which also led to a deadlock recently).

A simple way to test this:

val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a);
sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x }.count

Numbers on 3 r3.8xlarge instances on EC2

master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s
with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s

* task gets a different copy of the RDD. This provides stronger isolation between tasks that
* might modify state of objects referenced in their closures.
*/
@transient private[spark] lazy val broadcasted = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get the type here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure :)

@rxin rxin changed the title [SPARK-2521] Broadcast RDD object once per TaskSet (instead of sending it for every task). [SPARK-2521] Broadcast RDD object (instead of sending it along with every task). Jul 17, 2014
@SparkQA
Copy link

SparkQA commented Jul 17, 2014

QA tests have started for PR 1452. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16753/consoleFull

*/
@transient private[spark] lazy val broadcasted = {
val ser = SparkEnv.get.closureSerializer.newInstance()
sc.broadcast(ser.serialize(this).array())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be compressed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes broadcast compresses it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the byte array isn't compressed. That's probably okay but it could also be a slight regression compared to the current task caches.

@mateiz
Copy link
Contributor

mateiz commented Jul 17, 2014

How does this interact with state cleanup, I guess the broadcast var becomes dereferenced when the RDD does? We may want to add some tests for that.

* Broadcasted copy of this RDD, used to dispatch tasks to executors. Note that we broadcast
* the serialized copy of the RDD and for each task we will deserialize it, which means each
* task gets a different copy of the RDD. This provides stronger isolation between tasks that
* might modify state of objects referenced in their closures.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should look into removing this restriction sometime but it's probably too risky to do in 1.1.

@mateiz
Copy link
Contributor

mateiz commented Jul 17, 2014

Also, this doesn't deal with the closure for an action being large. That can be done as a separate JIRA but have you looked at that?

@rxin
Copy link
Contributor Author

rxin commented Jul 17, 2014

That was actually my main concern from the beginning with this change. From my initial observation everything does seem work. I intentionally avoided keeping references to the broadcast variables outside of RDDs, so the broadcast variable should have the same scope as the RDD itself, and thus gets cleaned up as long as we properly clean up RDDs and broadcast variables.

@rxin
Copy link
Contributor Author

rxin commented Jul 17, 2014

Yes - actions were intentionally not broadcast for now. It makes it more complicated ... let's do that in a separate PR. (there was two TODOs related to this)

@SparkQA
Copy link

SparkQA commented Jul 18, 2014

QA tests have started for PR 1452. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16813/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 18, 2014

QA tests have started for PR 1452. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16814/consoleFull

* where the JobConf/Configuration object is not thread-safe.
*/
@transient private[spark] lazy val broadcasted: Broadcast[Array[Byte]] = {
// TODO: Warn users about very large RDDs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to add this in this patch, we can just choose a threshold

@mateiz
Copy link
Contributor

mateiz commented Jul 18, 2014

This looks good to me. It might be good for @tdas to look over the test.

@ash211
Copy link
Contributor

ash211 commented Jul 18, 2014

I'm interested in this for the JobConf/Configuration threadsafety issue here: https://issues.apache.org/jira/browse/SPARK-2546 but agree that should go in a separate commit after this PR is merged.

@pwendell
Copy link
Contributor

@ash211 I created https://issues.apache.org/jira/browse/SPARK-2585 to track this.

@SparkQA
Copy link

SparkQA commented Jul 18, 2014

QA tests have started for PR 1452. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16832/consoleFull

@ash211
Copy link
Contributor

ash211 commented Jul 18, 2014

Thanks Patrick!

On Fri, Jul 18, 2014 at 3:07 PM, Patrick Wendell notifications@github.com
wrote:

@ash211 https://github.com/ash211 I created
https://issues.apache.org/jira/browse/SPARK-2585 to track this.


Reply to this email directly or view it on GitHub
#1452 (comment).

*
* @param stageId id of the stage this task belongs to
* @param rdd input to func
* @param rddBinary broadcast version of of the serialized RDD
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*of - ha!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also past tense -- broadcasted

@aarondav
Copy link
Contributor

Only a set of very minor comments, LGTM.

@rxin
Copy link
Contributor Author

rxin commented Jul 19, 2014

Thanks for taking a look. I'm merging this one as is, and will submit a small PR to fix the issues.

@asfgit asfgit closed this in 7b8cd17 Jul 19, 2014
@rxin rxin deleted the broadcast-task branch July 19, 2014 07:57
@pwendell
Copy link
Contributor

@rxin @mateiz this has broken the master build so we should revert it. If you look here there was never actually a "success" message from SparkQA - I think the tests are hanging.

@rxin
Copy link
Contributor Author

rxin commented Jul 19, 2014

Apparently this broke the build. Reverting and will work on a fix.

@mateiz
Copy link
Contributor

mateiz commented Jul 20, 2014

Hah, the new Spark QA messages are really confusing! Is there no timeout on the build?

rxin added a commit to rxin/spark that referenced this pull request Jul 30, 2014
…very task).

Currently (as of Spark 1.0.1), Spark sends RDD object (which contains closures) using Akka along with the task itself to the executors. This is inefficient because all tasks in the same stage use the same RDD object, but we have to send RDD object multiple times to the executors. This is especially bad when a closure references some variable that is very large. The current design led to users having to explicitly broadcast large variables.

The patch uses broadcast to send RDD objects and the closures to executors, and use Akka to only send a reference to the broadcast RDD/closure along with the partition specific information for the task. For those of you who know more about the internals, Spark already relies on broadcast to send the Hadoop JobConf every time it uses the Hadoop input, because the JobConf is large.

The user-facing impact of the change include:

1. Users won't need to decide what to broadcast anymore, unless they would want to use a large object multiple times in different operations
2. Task size will get smaller, resulting in faster scheduling and higher task dispatch throughput.

In addition, the change will simplify some internals of Spark, eliminating the need to maintain task caches and the complex logic to broadcast JobConf (which also led to a deadlock recently).

A simple way to test this:
```scala
val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a);
sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x }.count
```

Numbers on 3 r3.8xlarge instances on EC2
```
master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s
with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s
```

Author: Reynold Xin <rxin@apache.org>

Closes apache#1452 from rxin/broadcast-task and squashes the following commits:

762e0be [Reynold Xin] Warn large broadcasts.
ade6eac [Reynold Xin] Log broadcast size.
c3b6f11 [Reynold Xin] Added a unit test for clean up.
754085f [Reynold Xin] Explain why broadcasting serialized copy of the task.
04b17f0 [Reynold Xin] [SPARK-2521] Broadcast RDD object once per TaskSet (instead of sending it for every task).

(cherry picked from commit 7b8cd17)
Signed-off-by: Reynold Xin <rxin@apache.org>
rxin added a commit to rxin/spark that referenced this pull request Jul 30, 2014
…very task).

Currently (as of Spark 1.0.1), Spark sends RDD object (which contains closures) using Akka along with the task itself to the executors. This is inefficient because all tasks in the same stage use the same RDD object, but we have to send RDD object multiple times to the executors. This is especially bad when a closure references some variable that is very large. The current design led to users having to explicitly broadcast large variables.

The patch uses broadcast to send RDD objects and the closures to executors, and use Akka to only send a reference to the broadcast RDD/closure along with the partition specific information for the task. For those of you who know more about the internals, Spark already relies on broadcast to send the Hadoop JobConf every time it uses the Hadoop input, because the JobConf is large.

The user-facing impact of the change include:

1. Users won't need to decide what to broadcast anymore, unless they would want to use a large object multiple times in different operations
2. Task size will get smaller, resulting in faster scheduling and higher task dispatch throughput.

In addition, the change will simplify some internals of Spark, eliminating the need to maintain task caches and the complex logic to broadcast JobConf (which also led to a deadlock recently).

A simple way to test this:
```scala
val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a);
sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x }.count
```

Numbers on 3 r3.8xlarge instances on EC2
```
master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s
with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s
```

Author: Reynold Xin <rxin@apache.org>

Closes apache#1452 from rxin/broadcast-task and squashes the following commits:

762e0be [Reynold Xin] Warn large broadcasts.
ade6eac [Reynold Xin] Log broadcast size.
c3b6f11 [Reynold Xin] Added a unit test for clean up.
754085f [Reynold Xin] Explain why broadcasting serialized copy of the task.
04b17f0 [Reynold Xin] [SPARK-2521] Broadcast RDD object once per TaskSet (instead of sending it for every task).

(cherry picked from commit 7b8cd17)
Signed-off-by: Reynold Xin <rxin@apache.org>
asfgit pushed a commit that referenced this pull request Jul 30, 2014
…very task)

This is a resubmission of #1452. It was reverted because it broke the build.

Currently (as of Spark 1.0.1), Spark sends RDD object (which contains closures) using Akka along with the task itself to the executors. This is inefficient because all tasks in the same stage use the same RDD object, but we have to send RDD object multiple times to the executors. This is especially bad when a closure references some variable that is very large. The current design led to users having to explicitly broadcast large variables.

The patch uses broadcast to send RDD objects and the closures to executors, and use Akka to only send a reference to the broadcast RDD/closure along with the partition specific information for the task. For those of you who know more about the internals, Spark already relies on broadcast to send the Hadoop JobConf every time it uses the Hadoop input, because the JobConf is large.

The user-facing impact of the change include:

1. Users won't need to decide what to broadcast anymore, unless they would want to use a large object multiple times in different operations
2. Task size will get smaller, resulting in faster scheduling and higher task dispatch throughput.

In addition, the change will simplify some internals of Spark, eliminating the need to maintain task caches and the complex logic to broadcast JobConf (which also led to a deadlock recently).

A simple way to test this:
```scala
val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a);
sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x }.count
```

Numbers on 3 r3.8xlarge instances on EC2
```
master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s
with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s
```

Author: Reynold Xin <rxin@apache.org>

Closes #1498 from rxin/broadcast-task and squashes the following commits:

f7364db [Reynold Xin] Code review feedback.
f8535dc [Reynold Xin] Fixed the style violation.
252238d [Reynold Xin] Serialize the final task closure as well as ShuffleDependency in taskBinary.
111007d [Reynold Xin] Fix broadcast tests.
797c247 [Reynold Xin] Properly send SparkListenerStageSubmitted and SparkListenerStageCompleted.
bab1d8b [Reynold Xin] Check for NotSerializableException in submitMissingTasks.
cf38450 [Reynold Xin] Use TorrentBroadcastFactory.
991c002 [Reynold Xin] Use HttpBroadcast.
de779f8 [Reynold Xin] Fix TaskContextSuite.
cc152fc [Reynold Xin] Don't cache the RDD broadcast variable.
d256b45 [Reynold Xin] Fixed unit test failures. One more to go.
cae0af3 [Reynold Xin] [SPARK-2521] Broadcast RDD object (instead of sending it along with every task).
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
…very task).

Currently (as of Spark 1.0.1), Spark sends RDD object (which contains closures) using Akka along with the task itself to the executors. This is inefficient because all tasks in the same stage use the same RDD object, but we have to send RDD object multiple times to the executors. This is especially bad when a closure references some variable that is very large. The current design led to users having to explicitly broadcast large variables.

The patch uses broadcast to send RDD objects and the closures to executors, and use Akka to only send a reference to the broadcast RDD/closure along with the partition specific information for the task. For those of you who know more about the internals, Spark already relies on broadcast to send the Hadoop JobConf every time it uses the Hadoop input, because the JobConf is large.

The user-facing impact of the change include:

1. Users won't need to decide what to broadcast anymore, unless they would want to use a large object multiple times in different operations
2. Task size will get smaller, resulting in faster scheduling and higher task dispatch throughput.

In addition, the change will simplify some internals of Spark, eliminating the need to maintain task caches and the complex logic to broadcast JobConf (which also led to a deadlock recently).

A simple way to test this:
```scala
val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a);
sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x }.count
```

Numbers on 3 r3.8xlarge instances on EC2
```
master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s
with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s
```

Author: Reynold Xin <rxin@apache.org>

Closes apache#1452 from rxin/broadcast-task and squashes the following commits:

762e0be [Reynold Xin] Warn large broadcasts.
ade6eac [Reynold Xin] Log broadcast size.
c3b6f11 [Reynold Xin] Added a unit test for clean up.
754085f [Reynold Xin] Explain why broadcasting serialized copy of the task.
04b17f0 [Reynold Xin] [SPARK-2521] Broadcast RDD object once per TaskSet (instead of sending it for every task).
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
…very task)

This is a resubmission of apache#1452. It was reverted because it broke the build.

Currently (as of Spark 1.0.1), Spark sends RDD object (which contains closures) using Akka along with the task itself to the executors. This is inefficient because all tasks in the same stage use the same RDD object, but we have to send RDD object multiple times to the executors. This is especially bad when a closure references some variable that is very large. The current design led to users having to explicitly broadcast large variables.

The patch uses broadcast to send RDD objects and the closures to executors, and use Akka to only send a reference to the broadcast RDD/closure along with the partition specific information for the task. For those of you who know more about the internals, Spark already relies on broadcast to send the Hadoop JobConf every time it uses the Hadoop input, because the JobConf is large.

The user-facing impact of the change include:

1. Users won't need to decide what to broadcast anymore, unless they would want to use a large object multiple times in different operations
2. Task size will get smaller, resulting in faster scheduling and higher task dispatch throughput.

In addition, the change will simplify some internals of Spark, eliminating the need to maintain task caches and the complex logic to broadcast JobConf (which also led to a deadlock recently).

A simple way to test this:
```scala
val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a);
sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x }.count
```

Numbers on 3 r3.8xlarge instances on EC2
```
master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s
with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s
```

Author: Reynold Xin <rxin@apache.org>

Closes apache#1498 from rxin/broadcast-task and squashes the following commits:

f7364db [Reynold Xin] Code review feedback.
f8535dc [Reynold Xin] Fixed the style violation.
252238d [Reynold Xin] Serialize the final task closure as well as ShuffleDependency in taskBinary.
111007d [Reynold Xin] Fix broadcast tests.
797c247 [Reynold Xin] Properly send SparkListenerStageSubmitted and SparkListenerStageCompleted.
bab1d8b [Reynold Xin] Check for NotSerializableException in submitMissingTasks.
cf38450 [Reynold Xin] Use TorrentBroadcastFactory.
991c002 [Reynold Xin] Use HttpBroadcast.
de779f8 [Reynold Xin] Fix TaskContextSuite.
cc152fc [Reynold Xin] Don't cache the RDD broadcast variable.
d256b45 [Reynold Xin] Fixed unit test failures. One more to go.
cae0af3 [Reynold Xin] [SPARK-2521] Broadcast RDD object (instead of sending it along with every task).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants