Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3119] Re-implementation of TorrentBroadcast. #2030

Closed
wants to merge 5 commits into from

Conversation

rxin
Copy link
Contributor

@rxin rxin commented Aug 19, 2014

This is a re-implementation of TorrentBroadcast, with the following changes:

  1. Removes most of the mutable, transient state from TorrentBroadcast (e.g. totalBytes, num of blocks fetched).
  2. Removes TorrentInfo and TorrentBlock
  3. Replaces the BlockManager.getSingle call in readObject with a getLocal, resuling in one less RPC call to the BlockManagerMasterActor to find the location of the block.
  4. Removes the metadata block, resulting in one less block to fetch.
  5. Removes an extra memory copy for deserialization (by using Java's SequenceInputStream).

Basically for a regular broadcasted object with only one block, the number of RPC calls goes from 5+1 to 2+1).

Old TorrentBroadcast for object of a single block:
1 RPC to ask for location of the broadcast variable
1 RPC to ask for location of the metadata block
1 RPC to fetch the metadata block
1 RPC to ask for location of the first data block
1 RPC to fetch the first data block
1 RPC to tell the driver we put the first data block in
i.e. 5 + 1

New TorrentBroadcast for object of a single block:
1 RPC to ask for location of the first data block
1 RPC to get the first data block
1 RPC to tell the driver we put the first data block in
i.e. 2 + 1

This is a re-implementation of TorrentBroadcast, with the following changes:

1. Removes most of the mutable, transient state from TorrentBroadcast (e.g. totalBytes, num of blocks fetched).
2. Removes TorrentInfo and TorrentBlock
3. Replaces the BlockManager.getSingle call in readObject with a getLocal, resuling in one less RPC call to the BlockManagerMasterActor to find the location of the block.
4. Removes the metadata block, resulting in one less block to fetch.
5. Removes an extra memory copy for deserialization (by using Java's SequenceInputStream).
@rxin
Copy link
Contributor Author

rxin commented Aug 19, 2014

cc @shivaram @mosharaf

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have started for PR 2030 at commit c1185cd.

  • This patch merges cleanly.

/** Fetch torrent blocks from the driver and/or other executors. */
private def readBlocks(): Array[Array[Byte]] = {
// Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported
// to the driver, so other executors can pull these thunks from this executor as well.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: thunks -> chunks

@JoshRosen
Copy link
Contributor

Ran into some task failures when testing this commit on EC2 with the SchedulerThroughputTest:

14/08/19 07:01:24 WARN scheduler.TaskSetManager: Lost task 5.0 in stage 995.0 (TID 9974, ip-172-31-24-16.us-west-2.compute.internal): java.io.IOException: FAILED_TO_UNCOMPRESS(5)
        org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
        org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
        org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
        org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
        org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:127)
        org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
        org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
        org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:127)
        org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:214)
        org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:151)
        sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
        sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        java.lang.reflect.Method.invoke(Method.java:606)
        java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
        org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)

arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
hasBlocks += 1
_value = TorrentBroadcast.unBlockifyObject[T](blocks)
// Store the merged copy in BlockManager so other tasks on this executor doesn't
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: doesn't -> don't

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have started for PR 2030 at commit 3670f00.

  • This patch merges cleanly.

@shivaram
Copy link
Contributor

@rxin -- Nice work in reducing this to 2 RPCs. The patch looks good in terms of maintaining the same functionality as before. I'll wait for the Snappy fix and for Jenkins and then take another look.
Also it'll be cool to have results from spark-perf (Thanks @JoshRosen !).

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have finished for PR 2030 at commit c1185cd.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case Some(x) =>
blocks(pid) = x.asInstanceOf[Array[Byte]]
numBlocksAvailable += 1
SparkEnv.get.blockManager.putBytes(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self - this is a bug

@rxin
Copy link
Contributor Author

rxin commented Aug 19, 2014

Ok I pushed a new version that should've addressed all the comments and fixed the bug.

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have started for PR 2030 at commit 2d6a5fb.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have finished for PR 2030 at commit 3670f00.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have started for PR 2030 at commit 0d8ed5b.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have finished for PR 2030 at commit 2d6a5fb.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have started for PR 2030 at commit 5bacb9d.

  • This patch merges cleanly.

@JoshRosen
Copy link
Contributor

Benchmarked as of 0d8ed5b and the results aren't conclusively faster than master; the good news is that we've narrowed the gap that I saw earlier between master and v1.0.2 for small jobs.

Each bar here represents a test where I ran 100 back-to-back jobs, each with 10 tasks, and varied the size of the task’s closure (each bar is the average of 10 runs, ignoring the first run to allow for JIT / warmup). The closure sizes (x-axis) are empty (well, whatever the minimum size was), 1 megabyte, and 10 megabytes; y-axis is time (seconds). This is running on 10 r3.2xlarge nodes in EC2. The test code is based off of my modified version of spark-perf (JoshRosen/spark-perf@0e768b2)

image

Or, in tabular form, the means:

image

and standard deviations:

image

Keep in mind that this is running 100 back-to-back jobs; for example, v1.0.2 averaged 9ms per job for the small jobs.

I'll run these benchmarks again tomorrow morning when I'm less tired to make sure I haven't inadvertently misconfigured anything.

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have finished for PR 2030 at commit 0d8ed5b.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have finished for PR 2030 at commit 5bacb9d.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor Author

rxin commented Aug 19, 2014

Jenkins, retest this please.

@rxin
Copy link
Contributor Author

rxin commented Aug 19, 2014

Testing again to make sure tests pass two times in a row.

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have started for PR 2030 at commit 5bacb9d.

  • This patch merges cleanly.

@shivaram
Copy link
Contributor

@JoshRosen Thanks for testing -- The perf results are a bit surprising (especially that master branch became faster since the earlier one). I also realized we do 3 RPCs, as after fetching the block we report back to the master. We could see if this reporting can be made asynchronous in the future

@JoshRosen
Copy link
Contributor

@shivaram I'm actually going to re-run these tests this morning after restarting my cluster. I'll test before and after #2028 and after this commit. I can also test with a wider range of configurations if that would help to figure out what's going on.

@shivaram
Copy link
Contributor

I think just this patch vs. before #2028 vs. 1.0.2 should be fine. I just wanted to make sure the performance regression is minimal due to broadcast -- so as long as we are close or better than 1.0.2 numbers it should be good.

@rxin
Copy link
Contributor Author

rxin commented Aug 19, 2014

Ok the test failures this time are just because of flaky Python tests....

@rxin
Copy link
Contributor Author

rxin commented Aug 19, 2014

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have started for PR 2030 at commit 5bacb9d.

  • This patch merges cleanly.

* BlockManager, ready for other executors to fetch from.
*
* This prevents the driver from being the bottleneck in sending out multiple copies of the
* broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, have we observed master becoming bottleneck for http broadcast ?
We have run with about 600 executor cores concurrently and have not seen this (the tasks themselves finish in sub seconds) - though it is probably a small enough sample set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try a larger variable, like a 10MB or 100MB model.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add, the latency of torrent broadcast is "sort of" twice http (2 requests instead of 1) assuming single piece for both : which is why I am curious about the bottleneck.
Currently we are running our jobs with http explicitly since our tasks are low latency low duration tasks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, our broadcast variables are in the 1mb - 3 mb range : so does not qualify I guess ... thx for the comment.

@rxin
Copy link
Contributor Author

rxin commented Aug 19, 2014

I tested this with a local cluster (which might not be a representative experiment), but this commit makes dummy short tasks finish in about 1/4 of the time.

Command:

MASTER=local-cluster[2,2,512] bin/spark-shell

Master with this change:

rxin @ rxin-mbp : /scratch/rxin/spark-1 (torrentBroadcast) 
> grep "took" new.txt 
14/08/19 11:06:56 INFO SparkContext: Job finished: count at <console>:13, took 0.169695 s
14/08/19 11:06:58 INFO SparkContext: Job finished: count at <console>:13, took 0.010982 s
14/08/19 11:06:59 INFO SparkContext: Job finished: count at <console>:13, took 0.010995 s
14/08/19 11:07:00 INFO SparkContext: Job finished: count at <console>:13, took 0.009852 s
14/08/19 11:07:01 INFO SparkContext: Job finished: count at <console>:13, took 0.007796 s
14/08/19 11:07:02 INFO SparkContext: Job finished: count at <console>:13, took 0.009224 s
14/08/19 11:07:03 INFO SparkContext: Job finished: count at <console>:13, took 0.007944 s
14/08/19 11:07:03 INFO SparkContext: Job finished: count at <console>:13, took 0.011442 s
14/08/19 11:07:04 INFO SparkContext: Job finished: count at <console>:13, took 0.007945 s
14/08/19 11:07:05 INFO SparkContext: Job finished: count at <console>:13, took 0.008433 s
14/08/19 11:07:06 INFO SparkContext: Job finished: count at <console>:13, took 0.008017 s
14/08/19 11:07:06 INFO SparkContext: Job finished: count at <console>:13, took 0.0085 s
14/08/19 11:07:07 INFO SparkContext: Job finished: count at <console>:13, took 0.008712 s
14/08/19 11:07:08 INFO SparkContext: Job finished: count at <console>:13, took 0.008481 s
14/08/19 11:07:09 INFO SparkContext: Job finished: count at <console>:13, took 0.007848 s
14/08/19 11:07:10 INFO SparkContext: Job finished: count at <console>:13, took 0.008487 s
14/08/19 11:07:10 INFO SparkContext: Job finished: count at <console>:13, took 0.00756 s
14/08/19 11:07:11 INFO SparkContext: Job finished: count at <console>:13, took 0.008702 s
14/08/19 11:07:12 INFO SparkContext: Job finished: count at <console>:13, took 0.007459 s
14/08/19 11:07:13 INFO SparkContext: Job finished: count at <console>:13, took 0.007884 s
14/08/19 11:07:14 INFO SparkContext: Job finished: count at <console>:13, took 0.007632 s

Master without this:

rxin @ rxin-mbp : /scratch/rxin/spark-1 (torrentBroadcast) 
> grep "took" old.txt 
14/08/19 11:12:12 INFO SparkContext: Job finished: count at <console>:13, took 0.316267 s
14/08/19 11:12:14 INFO SparkContext: Job finished: count at <console>:13, took 0.035347 s
14/08/19 11:12:15 INFO SparkContext: Job finished: count at <console>:13, took 0.038807 s
14/08/19 11:12:16 INFO SparkContext: Job finished: count at <console>:13, took 0.071991 s
14/08/19 11:12:16 INFO SparkContext: Job finished: count at <console>:13, took 0.036433 s
14/08/19 11:12:17 INFO SparkContext: Job finished: count at <console>:13, took 0.036449 s
14/08/19 11:12:29 INFO SparkContext: Job finished: count at <console>:13, took 0.076298 s
14/08/19 11:12:29 INFO SparkContext: Job finished: count at <console>:13, took 0.033312 s
14/08/19 11:12:30 INFO SparkContext: Job finished: count at <console>:13, took 0.036079 s
14/08/19 11:12:31 INFO SparkContext: Job finished: count at <console>:13, took 0.033334 s
14/08/19 11:13:15 INFO SparkContext: Job finished: count at <console>:13, took 0.47008 s
14/08/19 11:13:17 INFO SparkContext: Job finished: count at <console>:13, took 0.033396 s
14/08/19 11:13:18 INFO SparkContext: Job finished: count at <console>:13, took 0.036782 s
14/08/19 11:13:19 INFO SparkContext: Job finished: count at <console>:13, took 0.033261 s
14/08/19 11:13:19 INFO SparkContext: Job finished: count at <console>:13, took 0.032565 s
14/08/19 11:13:20 INFO SparkContext: Job finished: count at <console>:13, took 0.033861 s
14/08/19 11:13:54 INFO SparkContext: Job finished: count at <console>:13, took 0.077148 s
14/08/19 11:14:03 INFO SparkContext: Job finished: count at <console>:13, took 0.031572 s

1.0.2 (50% slower)

rxin @ rxin-mbp : /scratch/rxin/spark-1 (torrentBroadcast) 
> grep took 1.0.2
14/08/19 11:26:54 INFO SparkContext: Job finished: count at <console>:13, took 0.125192 s
14/08/19 11:26:55 INFO SparkContext: Job finished: count at <console>:13, took 0.013881 s
14/08/19 11:26:56 INFO SparkContext: Job finished: count at <console>:13, took 0.012316 s
14/08/19 11:26:57 INFO SparkContext: Job finished: count at <console>:13, took 0.013337 s
14/08/19 11:26:58 INFO SparkContext: Job finished: count at <console>:13, took 0.013465 s
14/08/19 11:26:58 INFO SparkContext: Job finished: count at <console>:13, took 0.011614 s
14/08/19 11:26:59 INFO SparkContext: Job finished: count at <console>:13, took 0.013076 s
14/08/19 11:27:00 INFO SparkContext: Job finished: count at <console>:13, took 0.040934 s
14/08/19 11:27:00 INFO SparkContext: Job finished: count at <console>:13, took 0.01185 s
14/08/19 11:27:01 INFO SparkContext: Job finished: count at <console>:13, took 0.01108 s
14/08/19 11:27:02 INFO SparkContext: Job finished: count at <console>:13, took 0.011728 s
14/08/19 11:27:03 INFO SparkContext: Job finished: count at <console>:13, took 0.011679 s
14/08/19 11:27:04 INFO SparkContext: Job finished: count at <console>:13, took 0.011919 s
14/08/19 11:27:04 INFO SparkContext: Job finished: count at <console>:13, took 0.011269 s
14/08/19 11:27:05 INFO SparkContext: Job finished: count at <console>:13, took 0.011338 s
14/08/19 11:27:06 INFO SparkContext: Job finished: count at <console>:13, took 0.012175 s
14/08/19 11:27:08 INFO SparkContext: Job finished: count at <console>:13, took 0.011624 s
14/08/19 11:27:09 INFO SparkContext: Job finished: count at <console>:13, took 0.011493 s
14/08/19 11:27:10 INFO SparkContext: Job finished: count at <console>:13, took 0.011768 s
14/08/19 11:27:16 INFO SparkContext: Job finished: count at <console>:13, took 0.013372 s
14/08/19 11:27:16 INFO SparkContext: Job finished: count at <console>:13, took 0.010667 s
14/08/19 11:27:17 INFO SparkContext: Job finished: count at <console>:13, took 0.012064 s
14/08/19 11:27:18 INFO SparkContext: Job finished: count at <console>:13, took 0.011427 s

@davies
Copy link
Contributor

davies commented Aug 19, 2014

@rxin I think we can only benefit from broadcast an rdd when the closure is big enough, such as more than 1M bytes. But his most cases, the closure shoud be less than 10k. How about we turn on broadcasting only when the size of closure is big enough? This may make the code a little bit complicated, but may be make it faster in most cases (small closure).

@@ -109,99 +155,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](
private def readObject(in: ObjectInputStream) {
in.defaultReadObject()
TorrentBroadcast.synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cant we not relax the synchronized to just around local block manager interactions ?
Or is there a reason to keep it globally synchronized ?

Multiple cores per container all would block on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure there is a lot of benefit from having multiple cores execute readObject. We'll then need synchronization for co-ordinating which piece is being fetched by each thread and cap the number of total outstanding requests etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock is not on a specific instance - but on global class if I am not wrong.
A single slow readObject will block all other cores waiting for the lock to be released.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing we can improve here is to block only tasks fetching the same block. That'd help actually but requires more complexity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How 'bad' is it if we allow multiple cores to fetch the same block ? Particularly given that we shuffle the block fetch now (which is really neat change btw !)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, I am only referring to case where none of the blocks are availabe locally (first fetch of one or more broadcasts).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can explore this in a later PR in case it is not possible to do it right now - if this change is targetted for 1.1; since it is obviously much better than what existed before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup agree!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this logic almost exactly what the CacheManager does already, fine grained locking on reconstructing specific blocks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can generalize that to provide a lock manager, similar to what dbms would do.

@rxin
Copy link
Contributor Author

rxin commented Aug 19, 2014

Actually with this change my local cluster testing didn't see much difference between torrent and http for small blocks.

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have finished for PR 2030 at commit 5bacb9d.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mosharaf
Copy link
Contributor

Looks good to me.

Re: one of the earlier comments about broadcasting small objects through TorrentBroadcast. A not-so-intrusive way would be to piggyback data on top of the first control message, when data is small. Then small blocks will be like http, but bigger ones will becomes Torrent.

@rxin
Copy link
Contributor Author

rxin commented Aug 19, 2014

That's a great idea. @JoshRosen and I were just talking about this before lunch!

@JoshRosen
Copy link
Contributor

New results (testing against 8257733, 1f1819b, 1.0.2, and this):

image

Means:

image

Standard deviations:

image

@JoshRosen
Copy link
Contributor

Currently running more jobs from the spark-perf suite against 5bacb9d (the latest commit) as a stress-test to gain more confidence that this is bug-free.

@shivaram
Copy link
Contributor

Thanks Josh -- Perf results look good. LGTM. Can you file JIRAs to track the multi-threading things for 1.2 ?

@JoshRosen
Copy link
Contributor

SPARK-3115 is an umbrella issue to improve task broadcast latency for small tasks; Reynold has created a bunch of subtasks to track future improvements, including the multi-threading improvements.

@JoshRosen
Copy link
Contributor

Here are some updated results that show performance for a wider range of closure sizes (all in bytes):

image

image

@rxin
Copy link
Contributor Author

rxin commented Aug 20, 2014

Ok merging in master & branch-1.1.

@asfgit asfgit closed this in 8adfbc2 Aug 20, 2014
asfgit pushed a commit that referenced this pull request Aug 20, 2014
This is a re-implementation of TorrentBroadcast, with the following changes:

1. Removes most of the mutable, transient state from TorrentBroadcast (e.g. totalBytes, num of blocks fetched).
2. Removes TorrentInfo and TorrentBlock
3. Replaces the BlockManager.getSingle call in readObject with a getLocal, resuling in one less RPC call to the BlockManagerMasterActor to find the location of the block.
4. Removes the metadata block, resulting in one less block to fetch.
5. Removes an extra memory copy for deserialization (by using Java's SequenceInputStream).

Basically for a regular broadcasted object with only one block, the number of RPC calls goes from 5+1 to 2+1).

Old TorrentBroadcast for object of a single block:
1 RPC to ask for location of the broadcast variable
1 RPC to ask for location of the metadata block
1 RPC to fetch the metadata block
1 RPC to ask for location of the first data block
1 RPC to fetch the first data block
1 RPC to tell the driver we put the first data block in
i.e. 5 + 1

New TorrentBroadcast for object of a single block:
1 RPC to ask for location of the first data block
1 RPC to get the first data block
1 RPC to tell the driver we put the first data block in
i.e. 2 + 1

Author: Reynold Xin <rxin@apache.org>

Closes #2030 from rxin/torrentBroadcast and squashes the following commits:

5bacb9d [Reynold Xin] Always add the object to driver's block manager.
0d8ed5b [Reynold Xin] Added getBytes to BlockManager and uses that in TorrentBroadcast.
2d6a5fb [Reynold Xin] Use putBytes/getRemoteBytes throughout.
3670f00 [Reynold Xin] Code review feedback.
c1185cd [Reynold Xin] [SPARK-3119] Re-implementation of TorrentBroadcast.

(cherry picked from commit 8adfbc2)
Signed-off-by: Reynold Xin <rxin@apache.org>
@aarondav
Copy link
Contributor

@JoshRosen Did you ever see a significant difference between pre-PR and post-PR numbers? 1.0.2 was always much worse in all your graphs, but was this PR significant for larger blocks?

By the way, 24 hour turnarounds on critical PRs that are going in at the end of a release cycle seem like a really bad way to preserve stability.

@rxin
Copy link
Contributor Author

rxin commented Aug 20, 2014

FYI we actually reran all the mllib and scheduler throughput tests for this PR to verify.

@shivaram
Copy link
Contributor

@aarondav The table and graph in #2030 (comment) compares pre-PR to post-PR. Actually it breaks it down into three runs: pre-PR, after 1st PR (#2028) and after 2nd PR (#2030).

For large closures this PR had little to no effect. For smaller closures we see an improvement from 19.7ms before PR to 17.9 after 1st PR to 13.6 after the 2nd PR.

xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
This is a re-implementation of TorrentBroadcast, with the following changes:

1. Removes most of the mutable, transient state from TorrentBroadcast (e.g. totalBytes, num of blocks fetched).
2. Removes TorrentInfo and TorrentBlock
3. Replaces the BlockManager.getSingle call in readObject with a getLocal, resuling in one less RPC call to the BlockManagerMasterActor to find the location of the block.
4. Removes the metadata block, resulting in one less block to fetch.
5. Removes an extra memory copy for deserialization (by using Java's SequenceInputStream).

Basically for a regular broadcasted object with only one block, the number of RPC calls goes from 5+1 to 2+1).

Old TorrentBroadcast for object of a single block:
1 RPC to ask for location of the broadcast variable
1 RPC to ask for location of the metadata block
1 RPC to fetch the metadata block
1 RPC to ask for location of the first data block
1 RPC to fetch the first data block
1 RPC to tell the driver we put the first data block in
i.e. 5 + 1

New TorrentBroadcast for object of a single block:
1 RPC to ask for location of the first data block
1 RPC to get the first data block
1 RPC to tell the driver we put the first data block in
i.e. 2 + 1

Author: Reynold Xin <rxin@apache.org>

Closes apache#2030 from rxin/torrentBroadcast and squashes the following commits:

5bacb9d [Reynold Xin] Always add the object to driver's block manager.
0d8ed5b [Reynold Xin] Added getBytes to BlockManager and uses that in TorrentBroadcast.
2d6a5fb [Reynold Xin] Use putBytes/getRemoteBytes throughout.
3670f00 [Reynold Xin] Code review feedback.
c1185cd [Reynold Xin] [SPARK-3119] Re-implementation of TorrentBroadcast.
@davies
Copy link
Contributor

davies commented Oct 7, 2014

It could be fixed by #2624

It's strange that I can not see this comment on PR #2030.

On Tue, Oct 7, 2014 at 6:28 AM, DB Tsai notifications@github.com wrote:

We had a build against the spark master on Oct 2, and when ran our
application with data around 600GB, we got the following exception. Does
this PR fix this issue which is seen by @JoshRosen
https://github.com/JoshRosen

Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 8312, ams03-002.ff): java.io.IOException: PARSING_ERROR(2)
org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)
org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
org.xerial.snappy.SnappyInputStream.(SnappyInputStream.java:58)
org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1004)
org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1$$anonfun$onBlockFetchSuccess$1.apply(ShuffleBlockFetcherIterator.scala:116)
org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1$$anonfun$onBlockFetchSuccess$1.apply(ShuffleBlockFetcherIterator.scala:115)
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243)
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:89)
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
org.apache.spark.scheduler.Task.run(Task.scala:56)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744)

Driver stacktrace:

Reply to this email directly or view it on GitHub
#2030 (comment).

  • Davies

@dbtsai
Copy link
Member

dbtsai commented Oct 7, 2014

I thought it was a close issue, so I moved my comment to JIRA. I ran into
this issue in spark-shell not the standalone application, does SPARK-3762
apply in this situation? Thanks.

Sent from my Google Nexus 5
On Oct 7, 2014 5:17 PM, "Davies Liu" notifications@github.com wrote:

It could be fixed by #2624

It's strange that I can not see this comment on PR #2030.

On Tue, Oct 7, 2014 at 6:28 AM, DB Tsai notifications@github.com wrote:

We had a build against the spark master on Oct 2, and when ran our
application with data around 600GB, we got the following exception. Does
this PR fix this issue which is seen by @JoshRosen
https://github.com/JoshRosen

Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times,
most recent failure: Lost task 0.3 in stage 6.0 (TID 8312, ams03-002.ff):
java.io.IOException: PARSING_ERROR(2)
org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)
org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
org.xerial.snappy.SnappyInputStream.(SnappyInputStream.java:58)

org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)

org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1004)

org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1$$anonfun$onBlockFetchSuccess$1.apply(ShuffleBlockFetcherIterator.scala:116)

org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1$$anonfun$onBlockFetchSuccess$1.apply(ShuffleBlockFetcherIterator.scala:115)

org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243)

org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)

org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:89)

org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
org.apache.spark.scheduler.Task.run(Task.scala:56)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744)

Driver stacktrace:

Reply to this email directly or view it on GitHub
#2030 (comment).

  • Davies


Reply to this email directly or view it on GitHub
#2030 (comment).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
9 participants