New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-41017][SQL] Support column pruning with multiple nondeterministic Filters #38511
Conversation
db83295
to
d12ecb5
Compare
d12ecb5
to
33a3706
Compare
also cc @wangyum @ulysses-you |
@@ -85,15 +72,25 @@ object PhysicalOperation extends AliasHelper with PredicateHelper { | |||
// projects. We need to meet the following conditions to do so: | |||
// 1) no Project collected so far or the collected Projects are all deterministic | |||
// 2) the collected filters and this filter are all deterministic, or this is the | |||
// first collected filter. | |||
// first collected filter. This condition can be relaxed if `canKeepMultipleFilters` is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH, the comment here is hard to understand..
l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table, _)) => | ||
// We can only push down the bottom-most filter to the relation, as `ScanOperation` decided to | ||
// not merge these filters and we need to keep their evaluation order. | ||
val filters = allFilters.lastOption.getOrElse(Nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So for filter pushdown, we will use the last filter. For schema pruning, we will use all the filters.
I wonder if we should return both allFilters
and pushdownFilters
to make the syntax clear.
val (fields, filters, child, _) = collectProjectsAndFilters(plan, alwaysInline) | ||
Some((fields.getOrElse(child.output), filters, child)) | ||
} | ||
protected def canKeepMultipleFilters: Boolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: add a simple comment
val canIncludeThisFilter = filters.isEmpty || { | ||
filters.length == 1 && filters.head.forall(_.deterministic) && condition.deterministic | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, this is filters.forall(_.deterministic)
, why it is relaxed here too? I think it is not under canKeepMultipleFilters
condition below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the core change of this PR. PhysicalOperation
returns a single filter condition, which means it combines filters, and we have to make sure all the filters are deterministic. ScanOperation
returns multiple filter conditions and does not have this restriction.
val projectedFilters = filters.map(_.map(_.transformDown { | ||
case projectionOverSchema(expr) => expr | ||
}) | ||
val newFilterCondition = projectedFilters.reduce(And) | ||
Filter(newFilterCondition, leafNode) | ||
})) | ||
val newFilterConditions = projectedFilters.map(_.reduce(And)) | ||
newFilterConditions.foldRight[LogicalPlan](leafNode)((cond, plan) => Filter(cond, plan)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, is that same as before?
This constructs a new Filter
with projected predicates (reduced by And
). But this change reduces all projected predicates from all adjoining Filter
s which can be non-deterministic?
33a3706
to
651e44d
Compare
pushed a refactor to make the code easier to understand, please take another look, thanks! |
651e44d
to
b8ef19f
Compare
Merged to master. |
### What changes were proposed in this pull request? This is a followup of #38511 to fix a mistake: we should respect the original `Filter` operator order when re-constructing the query plan. ### Why are the changes needed? bug fix ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? N/A Closes #38684 from cloud-fan/column-pruning. Lead-authored-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…nondeterministic predicates ### What changes were proposed in this pull request? This PR fixes a regression caused by #38511 . For `FROM t WHERE rand() > 0.5 AND col = 1`, we can still push down `col = 1` because we don't guarantee the predicates evaluation order within a `Filter`. This PR updates `ScanOperation` to consider this case and bring back the previous pushdown behavior. ### Why are the changes needed? fix perf regression ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes #38746 from cloud-fan/filter. Lead-authored-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…tic Filters ### What changes were proposed in this pull request? Today, Spark does column pruning in 3 steps: 1. The rule `PushDownPredicates` pushes down `Filter`s as closer to the scan node as possible. 2. The rule `ColumnPruning` generates `Project` below many operators, to prune columns before evaluating these operators. One exception is `Filter`. We do not generate `Project` below `Filter` as it conflicts with `PushDownPredicates`. 3. After the above 2 steps, we should have a plan pattern like `Project(..., Filter(..., Relation))`, and we have rules (DS v1 and v2 have different rules) to match this pattern using `PhysicalOperation`, then apply filter pushdown and column pruning. This works fine in most cases, but we can not always combine adjacent `Filter`s into one, due to non-deterministic predicates. For example, `Project(a, Filter(rand() > 0.5, Filter(rand() < 0.8), Relation)))`. `PhysicalOperation` can only match `Filter(rand() < 0.8), Relation)` and we can't do column pruning today. This PR fixes this problem by adding a variant of `PhysicalOperation`: `ScanOperation`. It keeps all the adjacent `Filter`s, so that it can match more plan patterns and do column pruning better. The caller sides are also updated to restore the `Filter`s w.r.t. to their original order in the query plan. ### Why are the changes needed? Apply column pruning in more cases. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes apache#38511 from cloud-fan/column-pruning. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Yuming Wang <yumwang@ebay.com>
### What changes were proposed in this pull request? This is a followup of apache#38511 to fix a mistake: we should respect the original `Filter` operator order when re-constructing the query plan. ### Why are the changes needed? bug fix ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? N/A Closes apache#38684 from cloud-fan/column-pruning. Lead-authored-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…nondeterministic predicates ### What changes were proposed in this pull request? This PR fixes a regression caused by apache#38511 . For `FROM t WHERE rand() > 0.5 AND col = 1`, we can still push down `col = 1` because we don't guarantee the predicates evaluation order within a `Filter`. This PR updates `ScanOperation` to consider this case and bring back the previous pushdown behavior. ### Why are the changes needed? fix perf regression ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes apache#38746 from cloud-fan/filter. Lead-authored-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…tic Filters ### What changes were proposed in this pull request? Today, Spark does column pruning in 3 steps: 1. The rule `PushDownPredicates` pushes down `Filter`s as closer to the scan node as possible. 2. The rule `ColumnPruning` generates `Project` below many operators, to prune columns before evaluating these operators. One exception is `Filter`. We do not generate `Project` below `Filter` as it conflicts with `PushDownPredicates`. 3. After the above 2 steps, we should have a plan pattern like `Project(..., Filter(..., Relation))`, and we have rules (DS v1 and v2 have different rules) to match this pattern using `PhysicalOperation`, then apply filter pushdown and column pruning. This works fine in most cases, but we can not always combine adjacent `Filter`s into one, due to non-deterministic predicates. For example, `Project(a, Filter(rand() > 0.5, Filter(rand() < 0.8), Relation)))`. `PhysicalOperation` can only match `Filter(rand() < 0.8), Relation)` and we can't do column pruning today. This PR fixes this problem by adding a variant of `PhysicalOperation`: `ScanOperation`. It keeps all the adjacent `Filter`s, so that it can match more plan patterns and do column pruning better. The caller sides are also updated to restore the `Filter`s w.r.t. to their original order in the query plan. ### Why are the changes needed? Apply column pruning in more cases. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes apache#38511 from cloud-fan/column-pruning. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Yuming Wang <yumwang@ebay.com>
### What changes were proposed in this pull request? This is a followup of apache#38511 to fix a mistake: we should respect the original `Filter` operator order when re-constructing the query plan. ### Why are the changes needed? bug fix ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? N/A Closes apache#38684 from cloud-fan/column-pruning. Lead-authored-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…nondeterministic predicates ### What changes were proposed in this pull request? This PR fixes a regression caused by apache#38511 . For `FROM t WHERE rand() > 0.5 AND col = 1`, we can still push down `col = 1` because we don't guarantee the predicates evaluation order within a `Filter`. This PR updates `ScanOperation` to consider this case and bring back the previous pushdown behavior. ### Why are the changes needed? fix perf regression ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes apache#38746 from cloud-fan/filter. Lead-authored-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…tic Filters ### What changes were proposed in this pull request? Today, Spark does column pruning in 3 steps: 1. The rule `PushDownPredicates` pushes down `Filter`s as closer to the scan node as possible. 2. The rule `ColumnPruning` generates `Project` below many operators, to prune columns before evaluating these operators. One exception is `Filter`. We do not generate `Project` below `Filter` as it conflicts with `PushDownPredicates`. 3. After the above 2 steps, we should have a plan pattern like `Project(..., Filter(..., Relation))`, and we have rules (DS v1 and v2 have different rules) to match this pattern using `PhysicalOperation`, then apply filter pushdown and column pruning. This works fine in most cases, but we can not always combine adjacent `Filter`s into one, due to non-deterministic predicates. For example, `Project(a, Filter(rand() > 0.5, Filter(rand() < 0.8), Relation)))`. `PhysicalOperation` can only match `Filter(rand() < 0.8), Relation)` and we can't do column pruning today. This PR fixes this problem by adding a variant of `PhysicalOperation`: `ScanOperation`. It keeps all the adjacent `Filter`s, so that it can match more plan patterns and do column pruning better. The caller sides are also updated to restore the `Filter`s w.r.t. to their original order in the query plan. ### Why are the changes needed? Apply column pruning in more cases. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes apache#38511 from cloud-fan/column-pruning. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Yuming Wang <yumwang@ebay.com>
### What changes were proposed in this pull request? This is a followup of apache#38511 to fix a mistake: we should respect the original `Filter` operator order when re-constructing the query plan. ### Why are the changes needed? bug fix ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? N/A Closes apache#38684 from cloud-fan/column-pruning. Lead-authored-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…nondeterministic predicates ### What changes were proposed in this pull request? This PR fixes a regression caused by apache#38511 . For `FROM t WHERE rand() > 0.5 AND col = 1`, we can still push down `col = 1` because we don't guarantee the predicates evaluation order within a `Filter`. This PR updates `ScanOperation` to consider this case and bring back the previous pushdown behavior. ### Why are the changes needed? fix perf regression ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes apache#38746 from cloud-fan/filter. Lead-authored-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? This pr aims to upgrade Arrow from 14.0.2 to 15.0.0, this version fixes the compatibility issue with Netty 4.1.104.Final(GH-39265). Additionally, since the `arrow-vector` module uses `eclipse-collections` to replace `netty-common` as a compile-level dependency, Apache Spark has added a dependency on `eclipse-collections` after upgrading to use Arrow 15.0.0. ### Why are the changes needed? The new version brings the following major changes: Bug Fixes GH-34610 - [Java] Fix valueCount and field name when loading/transferring NullVector GH-38242 - [Java] Fix incorrect internal struct accounting for DenseUnionVector#getBufferSizeFor GH-38254 - [Java] Add reusable buffer getters to char/binary vectors GH-38366 - [Java] Fix Murmur hash on buffers less than 4 bytes GH-38387 - [Java] Fix JDK8 compilation issue with TestAllTypes GH-38614 - [Java] Add VarBinary and VarCharWriter helper methods to more writers GH-38725 - [Java] decompression in Lz4CompressionCodec.java does not set writer index New Features and Improvements GH-38511 - [Java] Add getTransferPair(Field, BufferAllocator, CallBack) for StructVector and MapVector GH-14936 - [Java] Remove netty dependency from arrow-vector GH-38990 - [Java] Upgrade to flatc version 23.5.26 GH-39265 - [Java] Make it run well with the netty newest version 4.1.104 The full release notes as follows: - https://arrow.apache.org/release/15.0.0.html ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #44797 from LuciferYang/SPARK-46718. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
What changes were proposed in this pull request?
Today, Spark does column pruning in 3 steps:
PushDownPredicates
pushes downFilter
s as closer to the scan node as possible.ColumnPruning
generatesProject
below many operators, to prune columns before evaluating these operators. One exception isFilter
. We do not generateProject
belowFilter
as it conflicts withPushDownPredicates
.Project(..., Filter(..., Relation))
, and we have rules (DS v1 and v2 have different rules) to match this pattern usingPhysicalOperation
, then apply filter pushdown and column pruning.This works fine in most cases, but we can not always combine adjacent
Filter
s into one, due to non-deterministic predicates. For example,Project(a, Filter(rand() > 0.5, Filter(rand() < 0.8), Relation)))
.PhysicalOperation
can only matchFilter(rand() < 0.8), Relation)
and we can't do column pruning today.This PR fixes this problem by adding a variant of
PhysicalOperation
:ScanOperation
. It keeps all the adjacentFilter
s, so that it can match more plan patterns and do column pruning better. The caller sides are also updated to restore theFilter
s w.r.t. to their original order in the query plan.Why are the changes needed?
Apply column pruning in more cases.
Does this PR introduce any user-facing change?
no
How was this patch tested?
new tests