-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2568] RangePartitioner should run only one job if data is balanced #1562
Conversation
QA tests have started for PR 1562. This patch merges cleanly. |
QA results for PR 1562: |
QA tests have started for PR 1562. This patch merges cleanly. |
QA tests have started for PR 1562. This patch merges cleanly. |
QA results for PR 1562: |
QA results for PR 1562: |
val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sorted | ||
if (rddSample.length == 0) { | ||
Array() | ||
// This is the sample size we need to have roughly balanced output partitions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be great to break this down into couple different functions that we can unit test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me break it down.
Allow small errors in comparison. @dbtsai , this unit test blocks #1562 . I may need to merge this one first. We can change it to use the tools in #1425 after that PR gets merged. Author: Xiangrui Meng <meng@databricks.com> Closes #1576 from mengxr/fix-binary-metrics-unit-tests and squashes the following commits: 5076a7f [Xiangrui Meng] fix binary metrics unit tests
var numItems = 0L | ||
sketch.foreach { case (_, n, _) => | ||
numItems += n | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can replace this with val numItems = sketch.map(_._2).sum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(It would probably also be more efficient than doing a pattern match here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.sum
will return an Int instead of Long. I will remove pattern matching.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val numItems = sketch.map(_._2.toLong).sum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. My previous concern was sketch.map
generates a temp array. But since the number of partitions is small, it is not a big deal and this reads better.
I think it's fine to make it random. Actually it would be better to do something like |
Actually I meant |
QA tests have started for PR 1562. This patch merges cleanly. |
QA tests have started for PR 1562. This patch merges cleanly. |
QA results for PR 1562: |
QA results for PR 1562: |
@@ -103,26 +107,49 @@ class RangePartitioner[K : Ordering : ClassTag, V]( | |||
private var ascending: Boolean = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be great to update the documentation on when this results in two passes vs one pass. We should probably update the documentation for sortByKey and various other sorts that use this too. Let's do that in another PR.
LGTM. Merging in master. |
Allow small errors in comparison. @dbtsai , this unit test blocks apache#1562 . I may need to merge this one first. We can change it to use the tools in apache#1425 after that PR gets merged. Author: Xiangrui Meng <meng@databricks.com> Closes apache#1576 from mengxr/fix-binary-metrics-unit-tests and squashes the following commits: 5076a7f [Xiangrui Meng] fix binary metrics unit tests
…nced As of Spark 1.0, RangePartitioner goes through data twice: once to compute the count and once to do sampling. As a result, to do sortByKey, Spark goes through data 3 times (once to count, once to sample, and once to sort). `RangePartitioner` should go through data only once, collecting samples from input partitions as well as counting. If the data is balanced, this should give us a good sketch. If we see big partitions, we re-sample from them in order to collect enough items. The downside is that we need to collect more from each partition in the first pass. An alternative solution is caching the intermediate result and decide whether to fetch the data after. Author: Xiangrui Meng <meng@databricks.com> Author: Reynold Xin <rxin@apache.org> Closes apache#1562 from mengxr/range-partitioner and squashes the following commits: 6cc2551 [Xiangrui Meng] change foreach to for eb39b08 [Xiangrui Meng] Merge branch 'master' into range-partitioner eb95dd8 [Xiangrui Meng] separate sketching and determining bounds impl c436d30 [Xiangrui Meng] fix binary metrics unit tests db58a55 [Xiangrui Meng] add unit tests a6e35d6 [Xiangrui Meng] minor update 60be09e [Xiangrui Meng] remove importance sampler 9ee9992 [Xiangrui Meng] update range partitioner to run only one job on roughly balanced data cc12f47 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into range-part 06ac2ec [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into range-part 17bcbf3 [Reynold Xin] Added seed. badf20d [Reynold Xin] Renamed the method. 6940010 [Reynold Xin] Reservoir sampling implementation.
As of Spark 1.0, RangePartitioner goes through data twice: once to compute the count and once to do sampling. As a result, to do sortByKey, Spark goes through data 3 times (once to count, once to sample, and once to sort).
RangePartitioner
should go through data only once, collecting samples from input partitions as well as counting. If the data is balanced, this should give us a good sketch. If we see big partitions, we re-sample from them in order to collect enough items.The downside is that we need to collect more from each partition in the first pass. An alternative solution is caching the intermediate result and decide whether to fetch the data after.