New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19426][SQL] Custom coalescer for Dataset #18861
Conversation
Test build #80306 has finished for PR 18861 at commit
|
Test build #80307 has finished for PR 18861 at commit
|
@gatorsmile ok |
@@ -571,7 +570,8 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan { | |||
* current upstream partitions will be executed in parallel (per whatever | |||
* the current partitioning is). | |||
*/ | |||
case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode { | |||
case class CoalesceExec(numPartitions: Int, child: SparkPlan, coalescer: Option[PartitionCoalescer]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add the parm description of coalescer
? also update function descriptions? Thanks~!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok!
addPartition(partition, splitSize) | ||
index += 1 | ||
} else { | ||
updateGroups | ||
updateGroups() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the above changes are not related to this PR, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I just left the changes of the original author (probably refactoring stuffs?) ..., better remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine about this, but it might confuse the others. Maybe just remove them in this PR? You can submit a separate PR later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I'll drop these from this pr.
* supplying a `PartitionCoalescer` to control the behavior of the partitioning. | ||
*/ | ||
case class PartitionCoalesce(numPartitions: Int, coalescer: PartitionCoalescer, child: LogicalPlan) | ||
extends UnaryNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding new logical nodes also needs the updates in multiple different components. (e.g., Optimizer).
Is that possible to reuse the existing node Repartition
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, I think so. I'll try and plz give me days to do so.
Test build #80321 has finished for PR 18861 at commit
|
Test build #80320 has finished for PR 18861 at commit
|
Test build #80327 has finished for PR 18861 at commit
|
Test build #80328 has finished for PR 18861 at commit
|
@gatorsmile ok, could you check again? |
numPartitions: Int, | ||
shuffle: Boolean, | ||
child: LogicalPlan, | ||
coalescer: Option[PartitionCoalescer] = None) | ||
extends RepartitionOperation { | ||
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a new require here?
require(!shuffle || coalescer.isEmpty, "Custom coalescer is not allowed for repartition(shuffle=true)")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
* @since 2.3.0 | ||
*/ | ||
def coalesce(numPartitions: Int, userDefinedCoalescer: Option[PartitionCoalescer]) | ||
: Dataset[T] = withTypedPlan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def coalesce(
numPartitions: Int,
userDefinedCoalescer: Option[PartitionCoalescer]): Dataset[T] = withTypedPlan {
case (false, true) => if (r.numPartitions >= child.numPartitions) child else r | ||
case _ => r.copy(child = child.child) | ||
} | ||
case r @ Repartition(_, _, child: RepartitionOperation, None) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, my bad. Fixed.
Test build #80373 has finished for PR 18861 at commit
|
@gatorsmile ok, fixed. |
@gatorsmile ping |
@@ -596,14 +596,17 @@ object CollapseProject extends Rule[LogicalPlan] { | |||
object CollapseRepartition extends Rule[LogicalPlan] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also add new test cases to CollapseRepartitionSuite
for the changes in this rule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@maropu any update on this issue? |
@gatorsmile better to close this pr and the jira? |
Yeah, maybe close it first. Thanks! |
ok, thanks! |
What changes were proposed in this pull request?
This pr added a new API for
coalesce
inDataset
; users can specify the custom coalescer which reduces an input Dataset into fewer partitions. This coalescer implementation is the same with the one inRDD#coalesce
added in #11865 (SPARK-14042).This is the rework of #16766.
How was this patch tested?
Added tests in
DatasetSuite
.