New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-44325][SQL] Use PartitionEvaluator API in SortMergeJoinExec #41884
[SPARK-44325][SQL] Use PartitionEvaluator API in SortMergeJoinExec #41884
Conversation
...core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinEvaluatorFactory.scala
Outdated
Show resolved
Hide resolved
406cb4b
to
2fad7f4
Compare
...core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinEvaluatorFactory.scala
Outdated
Show resolved
Hide resolved
9ca4b50
to
8717928
Compare
...core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinEvaluatorFactory.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks okay but I'm just wondering is there the closure issue in SortMergeJoinExec
so we need to change it to PartitionEvaluator
? Seems I don't find it.
8717928
to
80d960d
Compare
...core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinEvaluatorFactory.scala
Outdated
Show resolved
Hide resolved
80d960d
to
79937b5
Compare
@viirya I think the benefit is that, we make it more clear what gets serialized and sent to the executor side. |
LGTM+1 |
thanks @vinodkc and @cloud-fan @viirya @beliefer. merged to master |
} else { | ||
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) => | ||
val evaluator = evaluatorFactory.createEvaluator() | ||
evaluator.eval(0, leftIter, rightIter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
zipPartitionsWithIndex
method is currently absent, hence 0 index is passed to evaluator.eval(0, ...)
Once spark.sql.execution.useTaskEvaluator
is set to true by default, this block will not be executed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use TaskContext.getPartitionId
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, it's different from index. Maybe we should just leave a note here about why we always use 0
.
### What changes were proposed in this pull request? SQL operator `SortMergeJoinExec` updated to use the `PartitionEvaluator` API to do execution. ### Why are the changes needed? To avoid the use of lambda during distributed execution. Ref: SPARK-43061 for more details. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Updated 1 test case, once all the SQL operators are migrated, the flag `spark.sql.execution.useTaskEvaluator` will be enabled by default to avoid running the tests with and without this TaskEvaluator Closes apache#41884 from vinodkc/br_refactorSortMergeJoinEvaluatorFactory. Authored-by: Vinod KC <vinod.kc.in@gmail.com> Signed-off-by: Kent Yao <yao@apache.org>
What changes were proposed in this pull request?
SQL operator
SortMergeJoinExec
updated to use thePartitionEvaluator
API to do execution.Why are the changes needed?
To avoid the use of lambda during distributed execution.
Ref: SPARK-43061 for more details.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Updated 1 test case, once all the SQL operators are migrated, the flag
spark.sql.execution.useTaskEvaluator
will be enabled by default to avoid running the tests with and without this TaskEvaluator