-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-6307][Core] Coalesce RDDs before RDD.cartesian #6454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #33656 has finished for PR 6454 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you tried not do coalesce before cartesian? It seems to me that we may slow down due to these 2 extra shuffles.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without coalesce, it slightly faster. However, coalesced to less partition may benefit as we discussed in #5572. Another approach is to remove coalesce and only add description to cartesian's comment to recommend users to do coalesce.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What type of cluster have to test this on? I expect this will perform poorly if your cluster have few machines with a lot of cores, for example two machines with 32 cores each. We will ideally like the product of the number of partitions in the 2 RDDs to be equal to default parallelism (that is the total number of cores). If we do a shuffle anyway we should be able to calculate the number of partitions pretty exact.
What will happen if we coalesce with shuffle set to false? For example: if an RDD has 16 partitions on 4 nodes and we coalesced to six partitions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I think I was vague when I originally brought up coalesce. I had intended for it to only be a suggestion to the user, included in the docs, as you note. I think its really hard to guess how to do the coalescing. In fact, I just realized there is a downside to coalescing that I hadn't thought of earlier -- the more you coalesce, then it becomes more likely that you cannot store all of rdd2.iterator in memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to coalesce to reduce the number of times a given partition is fetched. Each partition is fetched once for every partition in the other RDD. The Cartesian RDD has a number of partitions equal to the product of the number of partitions in the two source RDDs. For each partition in the Cartesian RDD we will do (asymptotically) one fetch.
RDDs typically have a number of partitions equal to default parallelism (the number of cores). The Cartesian RDDs you have the same. If we don't coalesce we will expect them to have a number of partition equal to the square of default parallelism.
If we for example have 10 machines with 16 cores then default parallelism will be 160. If we don't coalesce the Cartesian RDD will have 25600 partitions. 2560 partitions can be calculated without a remote fetch but the rest will involve a remote fetch. This will result in a lot of redundant fetches if we don't cache the partitions. To be exact each partition will be fetched 144 times. If we coalesce we can bring that down to 10 or 16.
I agree it can be difficult to predict the number of partitions for all possible cases, but we can do a pretty good guess that would be great for most applications. I think the best solution is to make that guess and let the user override it if he or she has domain knowledge to change the default.
I will suggest something like:
val numPartitions1 = numExecutors
val numPartitions2 = max(numExecutors, defaultParallelism / numExecutors)
|
jenkins, test this please |
1 similar comment
|
jenkins, test this please |
|
Test build #33657 has finished for PR 6454 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused how this actually makes things any faster. It seems like even if you do unroll rdd2.iterator into an array, you just get an iterator and you don't hold on to the array. Doesn't this still lead to rdd2.iterator getting called for every element in rdd1.iterator? I'd think you would need to do something like:
val arrayValues: Option[Array[U]] = {
SparkEnv.get.blockManager.memoryStore.unrollSafely(key,
rdd2.iterator(currSplit.s2, context), updatedBlocks) match {
case Left(arr) => Some(arr)
case Right(it) => None
}
}
def getRdd2Values(): Iterator[U] = arrayValues.getOrElse { rdd2.iterator(currSplit.s2, context) }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we also need to call MemoryStore.releasePendingUnrollMemoryForThisThread after we're done w/ this task ... I guess it we'd add that with TaskContext.get.addTaskCompletionListener.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squito Yes. I noticed that but have not committed updates yet. Interesting is, the performance gap before and after applying the remotely caching approach #5572, seems to be pulled closer in latest codebase. Because this issue is there for a while, I wonder if other improvement already makes this update not important on performance. I will test it again to see if this is correct. If so, this two PRs can be closed.
@tbertelsen Can you try latest Spark codebase for RDD.cartesian performance too?
|
@viirya Sadly I don't have access to our compute cluster for the next couple of weeks so I can try it out right now. |
|
Guys, any update on this one? |
|
Close this PR first as I don't know if it is needed now. |
JIRA: https://issues.apache.org/jira/browse/SPARK-6307 and https://issues.apache.org/jira/browse/SPARK-6922
Alternative approach to this issue. Ref. #5572.
This PR tries to coalesce RDDs before
RDD.cartesian. It also implements the idea 1 discussed in #5572. So incartesianwe will fetch each partition in RDD2 once for each partition in RDD1.For performance test, using the same example codes in #5572, it costs about 4s.
cc @tbertelsen, @squito, @cloud-fan.