-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23707][SQL] Don't need shuffle exchange with single partition for 'spark.range' #20844
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@cloud-fan pls take a look, this is a small change. Thanks a lot. |
| val initRangeFuncName = ctx.addNewFunction(initRange, | ||
| s""" | ||
| | private void initRange(int idx) { | ||
| | private void ${initRange}(int idx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a test? with this bug seems we can't join 2 ranges.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan thanks for reviewing. Both BroadCastExchange and ShuffleExchange don't support CodegenSupport, so there should be two WholeStageCodegen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whole-stage codegen is pull-model. I don't think we will have 2 leaf nodes in one stage. Maybe we can just add some comments to explain it and don't change the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I can just some comments and keep the code unchanged. I changed it here just for better code robustness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @cloud-fan , before adding the comments, I have a question about why we still need exchange if we join two spark.range(1, 10, 1, 1). Because of both of the range are only one partition, does the exchange really needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can apply #20726 to the RangeExec operator and fix this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your suggestion, let me take a try.
|
@cloud-fan, pls take a look, thanks a lot. |
|
ok to test |
|
Actually, |
|
Test build #88474 has finished for PR 20844 at commit
|
|
This change is very simple, and just make it consistent with other |
| "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) | ||
|
|
||
| /** Specifies how data is partitioned across different nodes in the cluster. */ | ||
| override def outputPartitioning: Partitioning = if (numSlices == 1 && numElements != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why numElements != 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This related to the UT error. spark.range(-10, -9, -20, 1).count() faild when codegen set to true and RangeExec.outputPartitioning set to SinglePartition. I try to found the root reason, but failed.
|
Test build #88500 has finished for PR 20844 at commit
|
| val n = 10000 | ||
| // Trigger a sort | ||
| val data = spark.range(0, n, 1, 1).sort('id) | ||
| val data = spark.range(0, n, 1, 2).sort('id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
: ) Know this now
|
Can one of the admins verify this patch? |
|
I think it has been fixed? |
|
Yup, the regression tests pass at the very least. |
|
thanks for all. Closes it. |
What changes were proposed in this pull request?
Just like #20726. There is no need 'Exchange' when
spark.rangeproduce only one partition.How was this patch tested?
New UT.