Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43402][SQL] FileSourceScanExec supports push down data filter with scalar subquery #41088

Closed
wants to merge 2 commits into from

Conversation

ulysses-you
Copy link
Contributor

What changes were proposed in this pull request?

Scalar subquery can be pushed down as data filter at runtime, since we always execute subquery first. Ideally, we can rewrite ScalarSubquery to Literal before pushing down filter. The main issue before we do not support that is ReuseSubquery is ineffective, see #22518. It is not a issue now.

For example:

SELECT * FROM t1 WHERE c1 > (SELECT min(c2) FROM t2)

Why are the changes needed?

Improve peformance if data filter have scalar subquery.

Does this PR introduce any user-facing change?

no

How was this patch tested?

add test

@github-actions github-actions bot added the SQL label May 8, 2023
@ulysses-you
Copy link
Contributor Author

cc @cloud-fan @viirya thank you

@@ -188,7 +188,13 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
// Partition keys are not available in the statistics of the files.
// `dataColumns` might have partition columns, we need to filter them out.
val dataColumnsWithoutPartitionCols = dataColumns.filterNot(partitionSet.contains)
val dataFilters = normalizedFiltersWithoutSubqueries.flatMap { f =>
// Scalar subquery can be pushed down as data filter at runtime, since we always
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about partition filters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partition filter has already supported since #23802

@@ -543,7 +561,7 @@ case class FileSourceScanExec(
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters,
filters = dynamicallyPushedDownFilters,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One implicit change is, now we will trigger subquery execution when getting the rdd of the file scan node. This worries me a little bit. Can we trigger subquery execution in the AQE framework? e.g. before executing a query stage, transform the scan node and update its dataFilters to replace evaluated scalar subqueries with literals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not change the timing of trigger for subquery execution. If we call execute at SparkPlan, then it would always first prepare and call updateResult to executing subquery. So it seems there is no change with this pr. This pr only uses the result of subquery and push it down.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we split the dynamicallyPushedDownFilters to pushedDownFilters and pushedDownRuntimeFilters and combine them here? Something like this: https://github.com/apache/spark/pull/36128/files#diff-089285f1484c1598cb2839b86b6a9e65b98ab5b30462aedc210fe4bbf44cae78R374-R455
This is because pushedDownFilters are also used in metadata, while pushedDownRuntimeFilters can only be used here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not change pushedDownFilters, it should not be a issue with metadata. But it's a pity that we can not make metadata aware of scalar subquery filter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not the issue. Just wanted to make it clearer. dynamicallyPushedDownFilters also includes pushedDownFilters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we first filter out scalar subqueries and then add them back, then the ordering of pushed filter is changed. e.g.,
c1 > (select min(x) from t) and c2 > 1 -> c2 > 1 and c1 > (select min(x) from t)
I'm not sure if it affects the performance in parquet side. So I simply re-translate the whole data filter.

pushedDownFilters
}
}

override lazy val metadata: Map[String, String] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This metadata also contains pushedDownFilters, do we need to update it? Or, are we able to update it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are not able to update it. metadata would be used by explain which is invoked before executing and we get the pushed filter with scalar subqueris at runtime.

// execute subquery first.
// It has no meaning to push down bloom filter, so skip it.
val normalizedFiltersWithScalarSubqueries = normalizedFilters
.filterNot(e => e.containsPattern(PLAN_EXPRESSION) && !e.containsPattern(SCALAR_SUBQUERY))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible the scalar subquery is correlated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems impossible, we have rewritten the correlated scalar subquery at Optimizer side. Otherwise it would break tons of things, e.g., the partition filter with sclar subquery.

val normalizedFiltersWithScalarSubqueries = normalizedFilters
.filterNot(e => e.containsPattern(PLAN_EXPRESSION) && !e.containsPattern(SCALAR_SUBQUERY))
.filterNot(_.isInstanceOf[BloomFilterMightContain])
val dataFilters = normalizedFiltersWithScalarSubqueries.flatMap { f =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, dataFilters will go through translateFilter. Does translateFilter support translate scalar subquery?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, translateFilter does not support scalar subquery. We will first transform scalar subquery to literal then go through translateFilter at runtime. The translateFilter related code in FileSourceStrategy is only used to print log which does not matter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yea, I remember this now. 😄

@ulysses-you
Copy link
Contributor Author

@cloud-fan @viirya @wangyum do you have other thought ?

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks okay. Maybe @cloud-fan can take another look though.

@ulysses-you
Copy link
Contributor Author

I'm going to merge this pr to master(4.0.0) if no any more comments in a few days, cc @viirya @cloud-fan @wangyum

@ulysses-you
Copy link
Contributor Author

thank you for review, merged to master!

@ulysses-you ulysses-you deleted the SPARK-43402 branch July 31, 2023 02:16
val bucketSpec: Option[BucketSpec] = fsRelation.bucketSpec
val bucketSet = if (shouldPruneBuckets(bucketSpec)) {
// subquery expressions are filtered out because they can't be used to prune buckets
Copy link
Contributor

@cloud-fan cloud-fan Aug 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems it's fine to not filter out scalar subquery, as the code of genBucketSet can skip it already.

// Replace scalar subquery to literal so that `DataSourceStrategy.translateFilter` can
// support translate it. The subquery must has been materialized since SparkPlan always
// execute subquery first.
val normalized = dataFilters.map(_.transform {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should selectedPartitions also use the normalized data filters?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel a bit of inconsistency here. The partition filter can also contain scala subquery, when is it normalized?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems selectedPartitions does not accept subquery expression, it filter out the dpp filter first.

@transient lazy val selectedPartitions: Array[PartitionDirectory] = {
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
val startTime = System.nanoTime()
val ret =
relation.location.listFiles(
partitionFilters.filterNot(isDynamicPruningFilter), dataFilters)

Then in dynamicallySelectedPartitions, normalized the partition filter.

@transient protected lazy val dynamicallySelectedPartitions: Array[PartitionDirectory] = {
val dynamicPartitionFilters = partitionFilters.filter(isDynamicPruningFilter)
if (dynamicPartitionFilters.nonEmpty) {
val startTime = System.nanoTime()
// call the file index for the files matching all filters except dynamic partition filters
val predicate = dynamicPartitionFilters.reduce(And)
val partitionColumns = relation.partitionSchema
val boundPredicate = Predicate.create(predicate.transform {
case a: AttributeReference =>
val index = partitionColumns.indexWhere(a.name == _.name)
BoundReference(index, partitionColumns(index).dataType, nullable = true)
}, Nil)
val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For scalar subquery, I think we should turn it into literal so that it can be used to prune partitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The outputPartitioning and outputOrdering use selectedPartitions when it is bucket scan which is before do execution. Besides, selectedPartitions is a public field, I'm afraid it would break something if we assume it only be called at runtime...

// execute subquery first.
// It has no meaning to push down bloom filter, so skip it.
val normalizedFiltersWithScalarSubqueries = normalizedFilters
.filterNot(e => e.containsPattern(PLAN_EXPRESSION) && !e.containsPattern(SCALAR_SUBQUERY))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking about this again. I think all non-correlated subqueries can be allowed? They will be evaluated before the actual plan execution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, subquery expressions must be non-correlated when we reach here (they must have been rewritten into joins before). Then we need to deal with two kinds of subqueries:

  1. foldable ones like scalar subquery. We can turn them into literals at the beginning and do filtering.
  2. non-foldable ones like the DPP subquery. We will evaluate them when we have an initial partition list and need to filter it further.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, and list subquery should also be rewritten to join. So we can only match scalar subquery here for data filter. DPP is a partition filter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to make it clear about the 3 different kinds of predicates

  1. simple predicates that can be used to prune partitions, or anything that needs to be accessed during planning
  2. foldable subquery expressions that can be turned into literal before further use (is data source filter pushdown the only use?)
  3. subquery expressions that can be used to do additional pruning after planning but before execution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems 2 and 3 are same but 2 is data filter and 3 is partition filter ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically it can be. But there is a issue if we want to use data filte to help pruning partition directory #41088 (comment). Or, add one more field to do one more list partition directory with normalized data filter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, something like part_col = (select ... )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it has been worked since #23802. So, both partitionFilters and dataFilters support contain subquery expression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, so we only need to covert scala subquery to literal because filter pushdown needs v1 filter which doesn't support subquery?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, v1 pushdown filter seems only support literal.

yaooqinn pushed a commit that referenced this pull request Aug 4, 2023
…a/partition filters

### What changes were proposed in this pull request?

This is a followup of #41088 to make the code cleaner. There are two kinds of data/partition filters: static filters that can be executed during the planning phase, and dynamic filters that can only be executed after the planning phase. This PR makes sure these two kinds of filters are properly used and adds code comments to explain it.

### Why are the changes needed?

code cleanup

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

existing tests

Closes #42318 from cloud-fan/minor.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Kent Yao <yao@apache.org>
@epa095
Copy link

epa095 commented Nov 6, 2023

@ulysses-you
Copy link
Contributor Author

@epa095 yes, updated that jira status

HyukjinKwon added a commit that referenced this pull request Feb 26, 2024
…y produces in NPE in Parquet filter

### What changes were proposed in this pull request?

This issue has been introduced in #41088  where we convert scalar subqueries to literals and then convert the literals to org.apache.spark.sql.sources.Filters. These filters are then pushed down to parquet.

If the literal is a comparison with null then the parquet filter conversion code throws NPE. 

dateToDays does not expect null
https://github.com/apache/spark/blob/9b53b803998001e4b706666e37e5f86f900a7430/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala#L153

but Gt passes it directly
https://github.com/apache/spark/blob/9b53b803998001e4b706666e37e5f86f900a7430/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala#L403C29-L403C41

```
Caused by: scala.MatchError: null
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilters.org$apache$spark$sql$execution$datasources$parquet$ParquetFilters$$dateToDays(ParquetFilters.scala:168)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilters$$anonfun$5.$anonfun$applyOrElse$73(ParquetFilters.scala:419)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilters.$anonfun$createFilterHelper$9(ParquetFilters.scala:720)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilters.createFilterHelper(ParquetFilters.scala:720)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilters.createFilter(ParquetFilters.scala:621)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilterPredicateFactory.$anonfun$createFilter$2(ParquetFilters.scala:885)
	at scala.collection.immutable.List.flatMap(List.scala:366)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilterPredicateFactory.createFilter(ParquetFilters.scala:885)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:410)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:265)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:601)
```

### Why are the changes needed?
NPE when subquery returns a NULL literal
NPE when comparing against a NULL literal with NullPropagation rule turned off.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
unit test

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #45202 from cosmind-db/cosmind-db/filter-null.

Lead-authored-by: cosmind-db <cosmin.dumitru@databricks.com>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
jpcorreia99 pushed a commit to jpcorreia99/spark that referenced this pull request Feb 26, 2024
…y produces in NPE in Parquet filter

### What changes were proposed in this pull request?

This issue has been introduced in apache#41088  where we convert scalar subqueries to literals and then convert the literals to org.apache.spark.sql.sources.Filters. These filters are then pushed down to parquet.

If the literal is a comparison with null then the parquet filter conversion code throws NPE. 

dateToDays does not expect null
https://github.com/apache/spark/blob/9b53b803998001e4b706666e37e5f86f900a7430/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala#L153

but Gt passes it directly
https://github.com/apache/spark/blob/9b53b803998001e4b706666e37e5f86f900a7430/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala#L403C29-L403C41

```
Caused by: scala.MatchError: null
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilters.org$apache$spark$sql$execution$datasources$parquet$ParquetFilters$$dateToDays(ParquetFilters.scala:168)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilters$$anonfun$5.$anonfun$applyOrElse$73(ParquetFilters.scala:419)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilters.$anonfun$createFilterHelper$9(ParquetFilters.scala:720)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilters.createFilterHelper(ParquetFilters.scala:720)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilters.createFilter(ParquetFilters.scala:621)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilterPredicateFactory.$anonfun$createFilter$2(ParquetFilters.scala:885)
	at scala.collection.immutable.List.flatMap(List.scala:366)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilterPredicateFactory.createFilter(ParquetFilters.scala:885)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:410)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:265)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:601)
```

### Why are the changes needed?
NPE when subquery returns a NULL literal
NPE when comparing against a NULL literal with NullPropagation rule turned off.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
unit test

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#45202 from cosmind-db/cosmind-db/filter-null.

Lead-authored-by: cosmind-db <cosmin.dumitru@databricks.com>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
…with scalar subquery

### What changes were proposed in this pull request?

Scalar subquery can be pushed down as data filter at runtime, since we always execute subquery first. Ideally, we can rewrite `ScalarSubquery` to `Literal` before pushing down filter. The main issue before we do not support that is `ReuseSubquery` is ineffective, see apache#22518. It is not a issue now.

For example:
```sql
SELECT * FROM t1 WHERE c1 > (SELECT min(c2) FROM t2)
```

### Why are the changes needed?

Improve peformance if data filter have scalar subquery.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

add test

Closes apache#41088 from ulysses-you/SPARK-43402.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Xiduo You <ulyssesyou@apache.org>
ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
…a/partition filters

### What changes were proposed in this pull request?

This is a followup of apache#41088 to make the code cleaner. There are two kinds of data/partition filters: static filters that can be executed during the planning phase, and dynamic filters that can only be executed after the planning phase. This PR makes sure these two kinds of filters are properly used and adds code comments to explain it.

### Why are the changes needed?

code cleanup

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

existing tests

Closes apache#42318 from cloud-fan/minor.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Kent Yao <yao@apache.org>
ericm-db pushed a commit to ericm-db/spark that referenced this pull request Mar 5, 2024
…y produces in NPE in Parquet filter

### What changes were proposed in this pull request?

This issue has been introduced in apache#41088  where we convert scalar subqueries to literals and then convert the literals to org.apache.spark.sql.sources.Filters. These filters are then pushed down to parquet.

If the literal is a comparison with null then the parquet filter conversion code throws NPE. 

dateToDays does not expect null
https://github.com/apache/spark/blob/9b53b803998001e4b706666e37e5f86f900a7430/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala#L153

but Gt passes it directly
https://github.com/apache/spark/blob/9b53b803998001e4b706666e37e5f86f900a7430/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala#L403C29-L403C41

```
Caused by: scala.MatchError: null
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilters.org$apache$spark$sql$execution$datasources$parquet$ParquetFilters$$dateToDays(ParquetFilters.scala:168)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilters$$anonfun$5.$anonfun$applyOrElse$73(ParquetFilters.scala:419)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilters.$anonfun$createFilterHelper$9(ParquetFilters.scala:720)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilters.createFilterHelper(ParquetFilters.scala:720)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilters.createFilter(ParquetFilters.scala:621)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilterPredicateFactory.$anonfun$createFilter$2(ParquetFilters.scala:885)
	at scala.collection.immutable.List.flatMap(List.scala:366)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilterPredicateFactory.createFilter(ParquetFilters.scala:885)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:410)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:265)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:601)
```

### Why are the changes needed?
NPE when subquery returns a NULL literal
NPE when comparing against a NULL literal with NullPropagation rule turned off.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
unit test

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#45202 from cosmind-db/cosmind-db/filter-null.

Lead-authored-by: cosmind-db <cosmin.dumitru@databricks.com>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants