[SPARK-37369][SQL] Avoid redundant ColumnarToRow transistion on InMemoryTableScan#34642
[SPARK-37369][SQL] Avoid redundant ColumnarToRow transistion on InMemoryTableScan#34642viirya wants to merge 14 commits intoapache:masterfrom
Conversation
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
Outdated
Show resolved
Hide resolved
|
Test build #145370 has finished for PR 34642 at commit
|
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #145373 has finished for PR 34642 at commit
|
attilapiros
left a comment
There was a problem hiding this comment.
I think we need an explicit unittest to validate the to-row transition is missing from the plan in this case.
| conf.cacheVectorizedReaderEnabled && | ||
| !WholeStageCodegenExec.isTooManyFields(conf, relation.schema) && | ||
| relation.cacheBuilder.serializer.supportsColumnarOutput(relation.schema) | ||
| relation.cacheBuilder.serializer.supportsColumnarOutput(relation.schema) && outputColumnar |
There was a problem hiding this comment.
As evaluating the outputColumnar flag is one of the fastest in this expression (where only && operators are used) I would move it before the isTooManyFields call (which uses a recursive function).
|
Thanks @attilapiros. Yea, I will add some tests later. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Kubernetes integration test starting |
|
Test build #145440 has finished for PR 34642 at commit
|
|
Kubernetes integration test starting |
|
Test build #145444 has finished for PR 34642 at commit
|
|
Kubernetes integration test status failure |
|
Kubernetes integration test status failure |
|
Test build #145447 has finished for PR 34642 at commit
|
|
@viirya
So we might abuse supportsColumnar == false and take it as supportsRowBased. What about introducing a new flag right beside the old one: Something like: And in This would a more generic solution as any node which support both can avoid to use the unneeded to-row transition. |
|
Ideally, yes, it is a more general one to have another flag. In practice, I suspect if there will be more such nodes that could choose to output row-based or columnar output for some conditions? For the in-memory relation scan here, adding a new flag |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #146017 has finished for PR 34642 at commit
|
|
Test build #146021 has finished for PR 34642 at commit
|
sunchao
left a comment
There was a problem hiding this comment.
So the motivation of this PR is to improve from the existing:
CachedBatch -> ColumnarBatch -> InternalRow
transition to:
CachedBatch -> InternalRow
Is that right? this makes sense to me.
| val id: Int = SparkPlan.newPlanId() | ||
|
|
||
| /** | ||
| * Return true if this stage of the plan supports row-based execution. |
There was a problem hiding this comment.
Maybe add some explanation why we need both this and supportsColumnar? it's a bit confusing when reading this code.
Also I'm wondering if something like prefersColumnar is better, so that we have:
supportsColumnar: this plan can support columnar output, alongside the default row-based output which every plan supports.prefersColumnar: this plan prefers to output columnar batches even if it is not explicitly requested (e.g.,outputsColumnaris false).
There was a problem hiding this comment.
supportsColumnar: this plan can support columnar output, alongside the default row-based output which every plan supports.
Hmm, seems not exactly? Not every plan supports row-based output.
prefersColumnar seems redundant? As I see, we usually prefer columnar output already.
There was a problem hiding this comment.
Not every plan supports row-based output.
Hmm any example? every physical plan node has to implement doExecute which outputs rows, while doExecuteColumnar throws exception by default.
As I see, we usually prefer columnar output already.
I'm not sure about this part. To my understanding, at the moment it appears we prefer columnar output because 1) vectorized readers for OPC/Parquet yield much better performance so we always want to use that over the default row-based impls, and 2) supportsColumnar defaults to false as most operators don't support columnar execution yet, so we'll do the columnar-row conversion and switch back to whole-stage codegen.
However this may not hold true if we add columnar support for more operators like filter/project etc in future. Do we want to prefer columnar execution over the whole-stage codegen approach? I'm not sure yet and maybe some evaluation is required. prefersColumnar could give us a knob to control this.
There was a problem hiding this comment.
Hmm any example? every physical plan node has to implement doExecute which outputs rows, while doExecuteColumnar throws exception by default.
I think there is no guarantee that a physical node must implement a working doExecute. For a columnar node, it can just throw exception saying it is not implemented (like the default doExecuteColumnar) if it is not designed to be executed under row-based execution.
I also don't see a need to have implement both (working) row-based and columnar execution for a node in general. But in Spark, because we don't actually have official columnar execution nodes, so maybe I cannot get an example from Spark itself. Hopefully I convey the idea clearly.
prefersColumnar: this plan prefers to output columnar batches even if it is not explicitly requested (e.g., outputsColumnar is false).
BTW, outputsColumnar is not a preference option I think (at least for its usage now in the rule). It actually means that the output should be in columnar or not. Once outputsColumnar is false, the plan should output row-based output and it is why we add ColumnarToRowExec for the case.
Yea, the preference I mentioned is pretty limited so far. I agree that we maybe need to have a preference rule (or something) in the future. As we don't have real built-in columnar operators in Spark, so currently the situation seems that some columnar extensions/libraries replace row-based operators with columnar operators during planning. I'm not sure if we can estimate which one is preferred during planning.
There was a problem hiding this comment.
BTW, IMHO, if we add columnar support for more operators in the future, I guess it already implicitly indicates we "prefer" it over current execution (whole-stage codegen or interpreted one)? Just like whole-stage codegen, seems we simply prefer it once we verify it having better performance generally. This is similar to the 3rd party extensions/libraries situation, I think.
There was a problem hiding this comment.
I see, makes sense. I was referring to nodes in Spark itself but yea an extension could only implement doExecuteColumnar.
Although I'm still slightly in favor of prefersColumnar, but it's only a minor personal preference. Overall it looks OK.
There was a problem hiding this comment.
For now I guess columnar route seems to be considered as superior, otherwise there should be calculation for the cost of plan between row vs columnar.
This is just to cover the case that downstream doesn't support columnar but upstream can support both row and columnar, and has performance of producing output as columnar output > row output > columnar output + columnar-to-row conversion, so upstream wants to produce directly whatever downstream wants without conversion.
If upstream can produce columnar output faster enough to cover the overhead of columnar-to-row conversion (columnar output > columnar output + columnar-to-row conversion > row output), then it could just tactically say "it only cover columnar output" and Spark will take the conversion.
| @@ -0,0 +1,12 @@ | |||
| ================================================================================================ | |||
There was a problem hiding this comment.
nit: ideally we should generate result using the GitHub workflow
There was a problem hiding this comment.
Updated with the result of GitHub Action.
| private def buildBuffers(): RDD[CachedBatch] = { | ||
| val cb = if (cachedPlan.supportsColumnar) { | ||
| val cb = if (cachedPlan.supportsColumnar && | ||
| serializer.supportsColumnarInput(cachedPlan.output)) { |
There was a problem hiding this comment.
hmm why this is necessary? shouldn't cachedPlan.supportsColumnar already covers this? for instance in InMemoryTableScanExec
There was a problem hiding this comment.
This is actually a bug. cachedPlan.supportsColumnar only indicates the cached plan can output columnar format, but whether this cached rdd builder can take such input, is depending on its serializer.
There is one test which failed due to the proposed change. I remember that it happens for InMemoryRelation under InMemoryRelation.
Previously we always add additional ColumnarToRow transition between two InMemoryRelations, so we don't hit this.
|
cc. @revans2 @tgravescs @andygrove |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #146065 has finished for PR 34642 at commit
|
revans2
left a comment
There was a problem hiding this comment.
The change looks good to me. More comments on supportsRowBased and supportsColumnar might be good to make it clear how to use them, but it is fairly clear to me.
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #146079 has finished for PR 34642 at commit
|
|
I think this change should be pretty clear. If no more comments or objection, I will merge this in next few days. Thanks. |
HeartSaVioR
left a comment
There was a problem hiding this comment.
The code change looks good, as long as this got approved by @revans2 who made recent change here and also depends on the change (I guess).
Would you mind if I ask to explain the addition of supportsRowBased in the PR description? It would help to track the change afterwards.
|
@HeartSaVioR Updated the description. Thanks. |
HeartSaVioR
left a comment
There was a problem hiding this comment.
+1 thanks for the patience!
dongjoon-hyun
left a comment
There was a problem hiding this comment.
+1, LGTM. Thank you, @viirya and all.
|
Thank you all! Merging to master. |
Sorry for the late review. How do we "ask" for producing row output? I don't see any related change to the in-memory table scan in this PR. |
Sorry for confusion. I should say we can let |
### What changes were proposed in this pull request? In PR #34642, we added a `supportsRowBased` in `SparkPlan` in order to avoid redundant `ColumnarToRow` transition in `InMemoryTableScan `. But, this optimization also applies to Union if its children both support row-based output. So, this PR adds the `supportsRowBased` implementation for `UnionExec`. ### Why are the changes needed? followup PR ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests passed. Closes #35061 from linhongliu-db/SPARK-37369-followup. Authored-by: Linhong Liu <linhong.liu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This patch proposes to let
InMemoryTableScanExecproduces row output directly, if its parent query plan only accepts rows instead of columnar output. In particular, this change adds a new method inSparkPlancalledsupportsRowBased, alongside with the existingsupportsColumnar.Why are the changes needed?
We currently have
supportsColumnarindicating if a physical node can produce columnar output. The current columnar transition rule seems taking an assumption that one node can only produce columnar output but not row-based one ifsupportsColumnarreturns true. But actually one node can possibly produce both format, i.e. columnar and row-based. For such node, if we require row-based output, the columnar transition rule will add additional columnar-to-row after it due to the wrong assumption.So this change introduces
supportsRowBasedwhich is used to indicates if the node can produce row-based output. The rule can check this method when deciding if a columnar-to-row transition is necessary or not.For example,
InMemoryTableScanExeccan produce columnar output. So if its parent plan isn't columnar, the rule adds aColumnarToRowbetween them, e.g.,But
InMemoryTableScanExecis capable of row-based output too. After this change, for such case, we can simply askInMemoryTableScanExecto produce row output instead of a redundant conversion.Does this PR introduce any user-facing change?
No
How was this patch tested?
Existing tests.