Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3721] [PySpark] broadcast objects larger than 2G #2659

Closed
wants to merge 13 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Oct 5, 2014

This patch will bring support for broadcasting objects larger than 2G.

pickle, zlib, FrameSerializer and Array[Byte] all can not support objects larger than 2G, so this patch introduce LargeObjectSerializer to serialize broadcast objects, the object will be serialized and compressed into small chunks, it also change the type of Broadcast[Array[Byte]]] into Broadcast[Array[Array[Byte]]]].

Testing for support broadcast objects larger than 2G is slow and memory hungry, so this is tested manually, could be added into SparkPerf.

@SparkQA
Copy link

SparkQA commented Oct 5, 2014

QA tests have started for PR 2659 at commit 091b107.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 5, 2014

QA tests have finished for PR 2659 at commit 091b107.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SizeLimitedStream(object):
    • class CompressedStream(object):
    • class LargeObjectSerializer(Serializer):
    • class CompressedSerializer(Serializer):

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21310/Test FAILed.

@SparkQA
Copy link

SparkQA commented Oct 5, 2014

QA tests have started for PR 2659 at commit 1c2d928.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 5, 2014

QA tests have finished for PR 2659 at commit 1c2d928.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SizeLimitedStream(object):
    • class CompressedStream(object):
    • class LargeObjectSerializer(Serializer):
    • class CompressedSerializer(Serializer):

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21311/Test PASSed.

@marmbrus
Copy link
Contributor

marmbrus commented Oct 6, 2014

SQL changes LGTM.

@SparkQA
Copy link

SparkQA commented Oct 6, 2014

QA tests have started for PR 2659 at commit fda395b.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 6, 2014

QA tests have finished for PR 2659 at commit fda395b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SizeLimitedStream(object):
    • class CompressedStream(object):
    • class LargeObjectSerializer(Serializer):
    • class CompressedSerializer(Serializer):

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21346/Test PASSed.

@@ -357,16 +357,23 @@ private[spark] object PythonRDD extends Logging {
}
}

def readBroadcastFromFile(sc: JavaSparkContext, filename: String): Broadcast[Array[Byte]] = {
def readBroadcastFromFile(sc: JavaSparkContext,
filename: String): Broadcast[Array[Array[Byte]]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def readBroadcastFromFile(
    sc: JavaSparkContext,
    filename: String): Broadcast[...] = {
  ...
}

@SparkQA
Copy link

SparkQA commented Oct 7, 2014

QA tests have started for PR 2659 at commit 2514848.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 7, 2014

QA tests have finished for PR 2659 at commit 2514848.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SizeLimitedStream(object):
    • class CompressedStream(object):
    • class LargeObjectSerializer(Serializer):
    • class CompressedSerializer(Serializer):

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21406/Test PASSed.

"""
Compress the serialized data
Read at most `limit` bytes from underline stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

underline -> underlying

@JoshRosen
Copy link
Contributor

These changes look pretty good to me. Give me some time to try it out locally with a huge broadcast variable and to double-check that the index arithmetic is right.

@JoshRosen
Copy link
Contributor

Do you have a script that I can run to test this? We should have a test that creates a huge broadcast variable, serializes it, then checks that the deserialized object contains the same data. This would catch any off-by-one errors in the chunking code that could otherwise lead to silent corruption of binary data.

>>> rio.read()
'Hello world'
"""
MAX_BATCH = 1 << 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment next to this line saying "1 megabyte"?

@davies
Copy link
Contributor Author

davies commented Oct 8, 2014

The code in the JIRA could be used for test this.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21848/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Oct 17, 2014

QA tests have started for PR 2659 at commit d94b68f.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 17, 2014

Tests timed out for PR 2659 at commit d94b68f after a configured wait of 120m.

@SparkQA
Copy link

SparkQA commented Oct 17, 2014

QA tests have started for PR 2659 at commit d94b68f.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 17, 2014

QA tests have finished for PR 2659 at commit d94b68f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 7, 2014

Test build #23042 has started for PR 2659 at commit a2f6a02.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 7, 2014

Test build #23042 has finished for PR 2659 at commit a2f6a02.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SizeLimitedStream(object):
    • class CompressedStream(object):
    • class LargeObjectSerializer(Serializer):
    • class CompressedSerializer(Serializer):

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23042/
Test PASSed.

@davies
Copy link
Contributor Author

davies commented Nov 12, 2014

@JoshRosen There are several people hit the problem with large broadcast in Python, could we make this into 1.2 release?

@JoshRosen
Copy link
Contributor

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Nov 12, 2014

Test build #23275 has started for PR 2659 at commit a2f6a02.

  • This patch merges cleanly.

@JoshRosen
Copy link
Contributor

I guess this seems fine to me, since I think I reviewed it previously and it doesn't look like much has changed. It would be nice if there were more explanatory comments so that someone who knows nothing about this JIRA / code can figure out what's going on. On the other hand, PythonRDD already suffers from poor documentation, so I don't mind fixing this up myself in a subsequent pull request to add lots of comments / developer docs.

Therefore, I'll probably merge this after Jenkins passes.

@JoshRosen
Copy link
Contributor

Actually, one question: could you check some tests into spark-pef that both check that large broadcasts don't crash and ensure that the data that I get back is correct, byte-for-byte? I imagine that a pretty common use-case for large broadcasts is binary data; in these cases, it would be hard for users to spot silent data corruption (since it's unlikely to cause, say, a pickle error), so I think we need to compare the actual data in our tests to make sure we haven't introduced corruption bugs.

@SparkQA
Copy link

SparkQA commented Nov 12, 2014

Test build #23275 has finished for PR 2659 at commit a2f6a02.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SizeLimitedStream(object):
    • class CompressedStream(object):
    • class LargeObjectSerializer(Serializer):
    • class CompressedSerializer(Serializer):

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23275/
Test PASSed.

@davies
Copy link
Contributor Author

davies commented Nov 12, 2014

@JoshRosen I will do that, we can verify it by checksum.

@davies
Copy link
Contributor Author

davies commented Nov 13, 2014

@JoshRosen I add one more test for broadcast, will do more tests in scale in spark-perf.

@SparkQA
Copy link

SparkQA commented Nov 13, 2014

Test build #23331 has started for PR 2659 at commit 7b57a14.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 14, 2014

Test build #23331 has finished for PR 2659 at commit 7b57a14.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SizeLimitedStream(object):
    • class CompressedStream(object):
    • class LargeObjectSerializer(Serializer):
    • class CompressedSerializer(Serializer):

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23331/
Test PASSed.

@JoshRosen
Copy link
Contributor

Going to merge this into branch-1.2 and master. Thanks!

(@davies is running large-scale spark-perf tests, so this is going to get a lot of QA before we end up shipping 1.2).

davies pushed a commit to davies/spark that referenced this pull request Nov 19, 2014
This patch will bring support for broadcasting objects larger than 2G.

pickle, zlib, FrameSerializer and Array[Byte] all can not support objects larger than 2G, so this patch introduce LargeObjectSerializer to serialize broadcast objects, the object will be serialized and compressed into small chunks, it also change the type of Broadcast[Array[Byte]]] into Broadcast[Array[Array[Byte]]]].

Testing for support broadcast objects larger than 2G is slow and memory hungry, so this is tested manually, could be added into SparkPerf.

Author: Davies Liu <davies@databricks.com>
Author: Davies Liu <davies.liu@gmail.com>

Closes apache#2659 from davies/huge and squashes the following commits:

7b57a14 [Davies Liu] add more tests for broadcast
28acff9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into huge
a2f6a02 [Davies Liu] bug fix
4820613 [Davies Liu] Merge branch 'master' of github.com:apache/spark into huge
5875c73 [Davies Liu] address comments
10a349b [Davies Liu] address comments
0c33016 [Davies Liu] Merge branch 'master' of github.com:apache/spark into huge
6182c8f [Davies Liu] Merge branch 'master' into huge
d94b68f [Davies Liu] Merge branch 'master' of github.com:apache/spark into huge
2514848 [Davies Liu] address comments
fda395b [Davies Liu] Merge branch 'master' of github.com:apache/spark into huge
1c2d928 [Davies Liu] fix scala style
091b107 [Davies Liu] broadcast objects larger than 2G
davies pushed a commit that referenced this pull request Nov 19, 2014
This patch will bring support for broadcasting objects larger than 2G.

pickle, zlib, FrameSerializer and Array[Byte] all can not support objects larger than 2G, so this patch introduce LargeObjectSerializer to serialize broadcast objects, the object will be serialized and compressed into small chunks, it also change the type of Broadcast[Array[Byte]]] into Broadcast[Array[Array[Byte]]]].

Testing for support broadcast objects larger than 2G is slow and memory hungry, so this is tested manually, could be added into SparkPerf.

Author: Davies Liu <davies@databricks.com>
Author: Davies Liu <davies.liu@gmail.com>

Closes #2659 from davies/huge and squashes the following commits:

7b57a14 [Davies Liu] add more tests for broadcast
28acff9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into huge
a2f6a02 [Davies Liu] bug fix
4820613 [Davies Liu] Merge branch 'master' of github.com:apache/spark into huge
5875c73 [Davies Liu] address comments
10a349b [Davies Liu] address comments
0c33016 [Davies Liu] Merge branch 'master' of github.com:apache/spark into huge
6182c8f [Davies Liu] Merge branch 'master' into huge
d94b68f [Davies Liu] Merge branch 'master' of github.com:apache/spark into huge
2514848 [Davies Liu] address comments
fda395b [Davies Liu] Merge branch 'master' of github.com:apache/spark into huge
1c2d928 [Davies Liu] fix scala style
091b107 [Davies Liu] broadcast objects larger than 2G

(cherry picked from commit 4a377af)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
@davies
Copy link
Contributor Author

davies commented Nov 19, 2014

merged.

@davies davies closed this Nov 19, 2014
asfgit pushed a commit that referenced this pull request Nov 25, 2014
Re-implement the Python broadcast using file:

1) serialize the python object using cPickle, write into disks.
2) Create a wrapper in JVM (for the dumped file), it read data from during serialization
3) Using TorrentBroadcast or HttpBroadcast to transfer the data (compressed) into executors
4) During deserialization, writing the data into disk.
5) Passing the path into Python worker, read data from disk and unpickle it into python object, until the first access.

It fixes the performance regression introduced in #2659, has similar performance as 1.1, but support object larger than 2G, also improve the memory efficiency (only one compressed copy in driver and executor).

Testing with a 500M broadcast and 4 tasks (excluding the benefit from reused worker in 1.2):

         name |   1.1   | 1.2 with this patch |  improvement
---------|--------|---------|--------
      python-broadcast-w-bytes  |	25.20  |	9.33   |	170.13% |
        python-broadcast-w-set	  |     4.13	   |    4.50  |	-8.35%  |

Testing with 100 tasks (16 CPUs):

         name |   1.1   | 1.2 with this patch |  improvement
---------|--------|---------|--------
     python-broadcast-w-bytes	| 38.16	| 8.40	 | 353.98%
        python-broadcast-w-set	| 23.29	| 9.59 |	142.80%

Author: Davies Liu <davies@databricks.com>

Closes #3417 from davies/pybroadcast and squashes the following commits:

50a58e0 [Davies Liu] address comments
b98de1d [Davies Liu] disable gc while unpickle
e5ee6b9 [Davies Liu] support large string
09303b8 [Davies Liu] read all data into memory
dde02dd [Davies Liu] improve performance of python broadcast

(cherry picked from commit 6cf5076)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
asfgit pushed a commit that referenced this pull request Nov 25, 2014
Re-implement the Python broadcast using file:

1) serialize the python object using cPickle, write into disks.
2) Create a wrapper in JVM (for the dumped file), it read data from during serialization
3) Using TorrentBroadcast or HttpBroadcast to transfer the data (compressed) into executors
4) During deserialization, writing the data into disk.
5) Passing the path into Python worker, read data from disk and unpickle it into python object, until the first access.

It fixes the performance regression introduced in #2659, has similar performance as 1.1, but support object larger than 2G, also improve the memory efficiency (only one compressed copy in driver and executor).

Testing with a 500M broadcast and 4 tasks (excluding the benefit from reused worker in 1.2):

         name |   1.1   | 1.2 with this patch |  improvement
---------|--------|---------|--------
      python-broadcast-w-bytes  |	25.20  |	9.33   |	170.13% |
        python-broadcast-w-set	  |     4.13	   |    4.50  |	-8.35%  |

Testing with 100 tasks (16 CPUs):

         name |   1.1   | 1.2 with this patch |  improvement
---------|--------|---------|--------
     python-broadcast-w-bytes	| 38.16	| 8.40	 | 353.98%
        python-broadcast-w-set	| 23.29	| 9.59 |	142.80%

Author: Davies Liu <davies@databricks.com>

Closes #3417 from davies/pybroadcast and squashes the following commits:

50a58e0 [Davies Liu] address comments
b98de1d [Davies Liu] disable gc while unpickle
e5ee6b9 [Davies Liu] support large string
09303b8 [Davies Liu] read all data into memory
dde02dd [Davies Liu] improve performance of python broadcast
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants