New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SQL][SPARK-39528] Use V2 Filter in SupportsRuntimeFiltering #36918
Conversation
@@ -55,16 +57,27 @@ case class BatchScanExec( | |||
|
|||
@transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = { | |||
val dataSourceFilters = runtimeFilters.flatMap { | |||
case DynamicPruningExpression(e) => DataSourceStrategy.translateRuntimeFilter(e) | |||
case DynamicPruningExpression(e) => | |||
if (scan.isInstanceOf[SupportsRuntimeFiltering]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about match
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed. Thanks
cc @cloud-fan Could you please take a look when you have a moment? Thanks! |
scan match { | ||
case _: SupportsRuntimeFiltering => | ||
DataSourceStrategy.translateRuntimeFilter(e) | ||
case _: SupportsRuntimeV2Filtering => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we make SupportsRuntimeV2Filtering
have higher priority over SupportsRuntimeFiltering
? Also we need to document the behavior if a source implements both of them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't seem to me that a data source would implement both SupportsRuntimeV2Filtering
and SupportsRuntimeFiltering
?
} | ||
val literals = values.map { value => | ||
val literal = Literal(value) | ||
LiteralValue(literal.value, literal.dataType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to infer the data type by creating a catalyst Literal
. The type must be in.child.dataType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
Show resolved
Hide resolved
if (partitioning.length == 1 && partitioning.head.references().length == 1) { | ||
val ref = partitioning.head.references().head | ||
filters.foreach { | ||
case p : Predicate if p.name().equals("IN") => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
feels like some unapply
method to extract what you want is more preferable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Predicate
is a java class. I don't think unapply
can be used
The test failure is unrelated. |
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsRuntimeFiltering.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsRuntimeFiltering.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/util/PredicateUtils.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/util/PredicateUtils.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/util/PredicateUtils.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PredicateUtils.scala
Outdated
Show resolved
Hide resolved
The GA failure is unrelated. Merging to master, thanks! |
Thanks @cloud-fan @zinking |
@@ -1805,3 +1805,21 @@ class DynamicPartitionPruningV2SuiteAEOff extends DynamicPartitionPruningV2Suite | |||
|
|||
class DynamicPartitionPruningV2SuiteAEOn extends DynamicPartitionPruningV2Suite | |||
with EnableAdaptiveExecutionSuite | |||
|
|||
abstract class DynamicPartitionPruningV2FilterSuite | |||
extends DynamicPartitionPruningDataSourceSuiteBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we extend DynamicPartitionPruningV2Suite
here? then we can save the override protected def runAnalyzeColumnCommands: Boolean = false
, and catalog configs will be overwritten.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I have a follow-up here
Hi @huaxingao. We are trying to use spark datasourceV2 and noticed that the spark v2 built-in data sources (eg parquet one, looking at Is there a plan to have them support this? It would be really beneficial for the file scans to be able to do this and given they already benefit of some push downs we were wondering why the runtime filtering is not implemented. Or maybe I am missing something? And in that case it would be great to understand how to have spark file sources take advantage of dpp. Thanks! |
What changes were proposed in this pull request?
Use V2 Filter in run time filtering for V2 Table
Why are the changes needed?
We should use V2 Filter in DS V2.
#32921 (comment)
Does this PR introduce any user-facing change?
Yes
new interface
SupportsRuntimeV2Filtering
How was this patch tested?
new test suite