-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30428][SQL] File source V2: support partition pruning #27112
[SPARK-30428][SQL] File source V2: support partition pruning #27112
Conversation
I will add more test cases. Mark this one as WIP for now. |
Test build #116200 has finished for PR 27112 at commit
|
} | ||
options: CaseInsensitiveStringMap, | ||
partitionFilters: Seq[Expression] = Seq.empty) extends FileScan { | ||
override def isSplitable(path: Path): Boolean = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found the indent is wrong in AvroScan
. Fix it as well.
Test build #116254 has finished for PR 27112 at commit
|
Test build #116260 has finished for PR 27112 at commit
|
Test build #116264 has finished for PR 27112 at commit
|
Test build #116271 has finished for PR 27112 at commit
|
if (partitionKeyFilters.nonEmpty) { | ||
val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq) | ||
val prunedFsRelation = | ||
fsRelation.copy(location = prunedFileIndex)(sparkSession) | ||
fsRelation.copy(location = prunedFileIndex)(fsRelation.sparkSession) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to pass also the dataFilters
.
This is useful for FileIndex implementations that use the dataFilters
to do the file listing.
For example, we use this to provide data skipping for all file based datasources.
I suggest something like this guykhazma@ de3415b
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guykhazma Thanks for the suggestion.
However, the PartitioningAwareFileIndex
doesn't use the data filters for listing files. Could you provide an example that the data filters will be useful here?
Also, the data filters are supposed to be pushed down in FileScanBuiler
(e.g ORC/Parquet)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gengliangwang this is useful for enabling data skipping on all file formats including formats which doesn't support pushdown (e.g CSV, JSON) by replacing the FileIndex implementation with a FileIndex which use also the dataFilters
to filter the file listing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the old v1 code path, let's not touch it in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and for v2 code path, the data filters are already pushed in the rule V2ScanRelationPushDown
override protected def sparkConf: SparkConf = | ||
super | ||
.sparkConf | ||
.set(SQLConf.USE_V1_SOURCE_LIST, "") | ||
|
||
test("Avro source v2: support partition pruning") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not related to this PR, but we should think of how to share test cases between the avro suite and FileBasedDataSourceSuite
thanks, merging to master! |
@@ -71,7 +103,7 @@ abstract class FileScan( | |||
} | |||
|
|||
protected def partitions: Seq[FilePartition] = { | |||
val selectedPartitions = fileIndex.listFiles(Seq.empty, Seq.empty) | |||
val selectedPartitions = fileIndex.listFiles(partitionFilters, Seq.empty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gengliangwang @cloud-fan continuing the discussion from above (the comment was on the wrong line).
The V2ScanRelationPushDown
rule will pushdown the dataFilters
only to datasources which support pushdown by implementing the SupportsPushDownFilters
trait.
Datasources such as csv
and json
do not implement the SupportsPushDownFilters
trait. In order to support data skipping uniformly for all file based data sources, we override the listFiles
method in a FileIndex implementation, which consults external metadata and prunes the list of files.
The suggestion is to make the necessary changes to have the dataFilters
passed to the listFiles
as well.
Otherwise, one would have to create a new datasource implementation in order to support each file based datasource that doesn't have a built in pushdown mechanism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense to me. @gengliangwang what do you think?
At least you can disable v2 file source to bring back this feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it makes sense if there is a fileIndex
can use the dataFilters
.
@guykhazma could you create a PR for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gengliangwang @cloud-fan sure, thanks.
I have opened this PR
After rebasing on the changes, my PR #26973 started failing at spark/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala Line 764 in c0e9f9f
|
op | ||
} | ||
|
||
case op @ PhysicalOperation(projects, filters, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CSV datasource in #26973 doesn't fall to the case but parquet/orc does. And withPartitionFilters
is not invoke for CSV. What's wrong with CSV when filters push down is enabled?
### What changes were proposed in this pull request? Follow up on [SPARK-30428](#27112) which added support for partition pruning in File source V2. This PR implements the necessary changes in order to pass the `dataFilters` to the `listFiles`. This enables having `FileIndex` implementations which use the `dataFilters` for further pruning the file listing (see the discussion [here](#27112 (comment))). ### Why are the changes needed? Datasources such as `csv` and `json` do not implement the `SupportsPushDownFilters` trait. In order to support data skipping uniformly for all file based data sources, one can override the `listFiles` method in a `FileIndex` implementation, which consults external metadata and prunes the list of files. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Modifying the unit tests for v2 file sources to verify the `dataFilters` are passed Closes #27157 from guykhazma/PushdataFiltersInFileListing. Authored-by: Guy Khazma <guykhag@gmail.com> Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
### What changes were proposed in this pull request? This bug was introduced by SPARK-30428 at Apache Spark 3.0.0. This PR fixes `FileScan.equals()`. ### Why are the changes needed? - Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account. - Partition filters and data filters added to `FileScan` (in #27112 and #27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities. ### Does this PR introduce _any_ user-facing change? Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues. ### How was this patch tested? Added new UTs. Closes #31848 from peter-toth/SPARK-34756-fix-filescan-equality-check. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? This bug was introduced by SPARK-30428 at Apache Spark 3.0.0. This PR fixes `FileScan.equals()`. ### Why are the changes needed? - Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account. - Partition filters and data filters added to `FileScan` (in #27112 and #27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities. ### Does this PR introduce _any_ user-facing change? Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues. ### How was this patch tested? Added new UTs. Closes #31848 from peter-toth/SPARK-34756-fix-filescan-equality-check. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 93a5d34) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This bug was introduced by SPARK-30428 at Apache Spark 3.0.0. This PR fixes `FileScan.equals()`. - Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account. - Partition filters and data filters added to `FileScan` (in #27112 and #27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities. Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues. Added new UTs. Closes #31848 from peter-toth/SPARK-34756-fix-filescan-equality-check. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 93a5d34) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This bug was introduced by SPARK-30428 at Apache Spark 3.0.0. This PR fixes `FileScan.equals()`. - Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account. - Partition filters and data filters added to `FileScan` (in apache#27112 and apache#27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities. Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues. Added new UTs. Closes apache#31848 from peter-toth/SPARK-34756-fix-filescan-equality-check. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 93a5d34) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? This bug was introduced by SPARK-30428 at Apache Spark 3.0.0. This PR fixes `FileScan.equals()`. ### Why are the changes needed? - Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account. - Partition filters and data filters added to `FileScan` (in #27112 and #27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities. ### Does this PR introduce _any_ user-facing change? Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues. ### How was this patch tested? Added new UTs. Closes #31952 from peter-toth/SPARK-34756-fix-filescan-equality-check-3.0. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? This bug was introduced by SPARK-30428 at Apache Spark 3.0.0. This PR fixes `FileScan.equals()`. ### Why are the changes needed? - Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account. - Partition filters and data filters added to `FileScan` (in apache#27112 and apache#27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities. ### Does this PR introduce _any_ user-facing change? Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues. ### How was this patch tested? Added new UTs. Closes apache#31848 from peter-toth/SPARK-34756-fix-filescan-equality-check. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 93a5d34) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? This bug was introduced by SPARK-30428 at Apache Spark 3.0.0. This PR fixes `FileScan.equals()`. ### Why are the changes needed? - Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account. - Partition filters and data filters added to `FileScan` (in apache#27112 and apache#27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities. ### Does this PR introduce _any_ user-facing change? Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues. ### How was this patch tested? Added new UTs. Closes apache#31848 from peter-toth/SPARK-34756-fix-filescan-equality-check. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 93a5d34) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? Follow up on [SPARK-30428](apache#27112) which added support for partition pruning in File source V2. This PR implements the necessary changes in order to pass the `dataFilters` to the `listFiles`. This enables having `FileIndex` implementations which use the `dataFilters` for further pruning the file listing (see the discussion [here](apache#27112 (comment))). ### Why are the changes needed? Datasources such as `csv` and `json` do not implement the `SupportsPushDownFilters` trait. In order to support data skipping uniformly for all file based data sources, one can override the `listFiles` method in a `FileIndex` implementation, which consults external metadata and prunes the list of files. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Modifying the unit tests for v2 file sources to verify the `dataFilters` are passed Closes apache#27157 from guykhazma/PushdataFiltersInFileListing. Authored-by: Guy Khazma <guykhag@gmail.com> Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com> # Conflicts: # external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala # external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala # sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
### What changes were proposed in this pull request? Follow up on [SPARK-30428](apache#27112) which added support for partition pruning in File source V2. This PR implements the necessary changes in order to pass the `dataFilters` to the `listFiles`. This enables having `FileIndex` implementations which use the `dataFilters` for further pruning the file listing (see the discussion [here](apache#27112 (comment))). ### Why are the changes needed? Datasources such as `csv` and `json` do not implement the `SupportsPushDownFilters` trait. In order to support data skipping uniformly for all file based data sources, one can override the `listFiles` method in a `FileIndex` implementation, which consults external metadata and prunes the list of files. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Modifying the unit tests for v2 file sources to verify the `dataFilters` are passed Closes apache#27157 from guykhazma/PushdataFiltersInFileListing. Authored-by: Guy Khazma <guykhag@gmail.com> Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com> # Conflicts: # external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala # external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala # sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
### What changes were proposed in this pull request? Follow up on [SPARK-30428](apache#27112) which added support for partition pruning in File source V2. This PR implements the necessary changes in order to pass the `dataFilters` to the `listFiles`. This enables having `FileIndex` implementations which use the `dataFilters` for further pruning the file listing (see the discussion [here](apache#27112 (comment))). ### Why are the changes needed? Datasources such as `csv` and `json` do not implement the `SupportsPushDownFilters` trait. In order to support data skipping uniformly for all file based data sources, one can override the `listFiles` method in a `FileIndex` implementation, which consults external metadata and prunes the list of files. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Modifying the unit tests for v2 file sources to verify the `dataFilters` are passed Closes apache#27157 from guykhazma/PushdataFiltersInFileListing. Authored-by: Guy Khazma <guykhag@gmail.com> Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com> # Conflicts: # external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala # external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala # sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
### What changes were proposed in this pull request? Follow up on [SPARK-30428](apache#27112) which added support for partition pruning in File source V2. This PR implements the necessary changes in order to pass the `dataFilters` to the `listFiles`. This enables having `FileIndex` implementations which use the `dataFilters` for further pruning the file listing (see the discussion [here](apache#27112 (comment))). ### Why are the changes needed? Datasources such as `csv` and `json` do not implement the `SupportsPushDownFilters` trait. In order to support data skipping uniformly for all file based data sources, one can override the `listFiles` method in a `FileIndex` implementation, which consults external metadata and prunes the list of files. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Modifying the unit tests for v2 file sources to verify the `dataFilters` are passed Closes apache#27157 from guykhazma/PushdataFiltersInFileListing. Authored-by: Guy Khazma <guykhag@gmail.com> Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com> # Conflicts: # external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala # external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala # sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
What changes were proposed in this pull request?
File source V2: support partition pruning.
Note: subquery predicates are not pushed down for partition pruning even after this PR, due to the limitation for the current data source V2 API and framework. The rule
PlanSubqueries
requires the subquery expression to be in the children or class parameters inSparkPlan
, while the condition is not satisfied forBatchScanExec
.Why are the changes needed?
It's important for reading performance.
Does this PR introduce any user-facing change?
No
How was this patch tested?
New unit tests for all the V2 file sources