-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-33482][SPARK-34756][SQL] Fix FileScan equality check #31848
[SPARK-33482][SPARK-34756][SQL] Fix FileScan equality check #31848
Conversation
@dongjoon-hyun, you asked for a UT in #31820 (review), shall I create a simple one focusing on |
@@ -86,7 +86,7 @@ trait FileScan extends Scan | |||
|
|||
override def equals(obj: Any): Boolean = obj match { | |||
case f: FileScan => | |||
fileIndex == f.fileIndex && readSchema == f.readSchema | |||
fileIndex == f.fileIndex && readSchema == f.readSchema && | |||
ExpressionSet(partitionFilters) == ExpressionSet(f.partitionFilters) && | |||
ExpressionSet(dataFilters) == ExpressionSet(f.dataFilters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we canonicalize the filters here to fix the exchange reuse issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But don't we need the output
of the ScanExec (BatchScanExec
) node to do that?
I need to look into this a bit...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. Then it's hard to canonicalize Scan
implementations that are outside of Spark...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about it a bit more, do we really need output
? We can canonicalize expr IDs to 0 and only look at the column name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I was thinking about the same. I will try to do this today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about a change like in f65ebe3.
That way we don't need to duplicate some parts of the canonicalization logic defined in QueryPlan.normalizeExpressions
and in Expression.canonicalized
(Canonicalize.expressionReorder
, Canonicalize.ignoreTimeZone
)...
If this looks ok to you then I can add UTs for .equals()
into a new FileScanSuite
as @dongjoon-hyun suggested and move the e2e UT from #31820 to here as we don't need the other change in BatchScanExec
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM
Thank you, @peter-toth . I prefer a simple one focusing on
|
For the following question, yes, it seems that we don't have a proper one yet. If we don't have a proper test suite, shall we make one like
|
val dataFiltersAttributes = AttributeSet(dataFilters).map(a => a.name -> a).toMap | ||
val normalizedPartitionFilters = ExpressionSet(partitionFilters.map( | ||
QueryPlan.normalizeExpressions(_, output.map(a => | ||
partitionFilterAttributes.getOrElse(a.name, a))))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we consider case sensitivity here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or the attr name inside the data/partitionFilterAttributes
have been normalized already before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we shall, added in 776828e
Thanks @dongjoon-hyun, I will try to update this PR with new tests this week. |
…756-fix-filescan-equality-check # Conflicts: # sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
3dc22fd
to
9711da0
Compare
@dongjoon-hyun I added new UTs in 9711da0 |
output.map(a => dataFiltersAttributes.getOrElse(a.name, a))))) | ||
(normalizedPartitionFilters, normalizedDataFilters) | ||
} | ||
|
||
override def equals(obj: Any): Boolean = obj match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall I update hashCode()
as well? Looks like we can easily come up with a better one...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we do that separately because it's irrelevant to the correctness issue?
In general, we expect a performance improvement with that, don't we?
Apache Spark doesn't allow to backport performance improvement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please file a new JIRA and go for it, @peter-toth ! :)
Thank you for update, @peter-toth ! |
import org.apache.spark.sql.types.{IntegerType, StructField, StructType} | ||
import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||
|
||
trait FileScanSuiteBase extends SharedSparkSession { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks nice!
Kubernetes integration test starting |
Kubernetes integration test status failure |
val readPartitionSchema = StructType(Seq(StructField("partition", IntegerType, false))) | ||
val readPartitionSchemaNotEqual = StructType(Seq( | ||
StructField("partition", IntegerType, false), | ||
StructField("other", IntegerType, false))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, the shorter is the better. Could you use the following style?
- val dataSchema = StructType(Seq(
- StructField("data", IntegerType, false),
- StructField("partition", IntegerType, false),
- StructField("other", IntegerType, false)))
- val dataSchemaNotEqual = StructType(Seq(
- StructField("data", IntegerType, false),
- StructField("partition", IntegerType, false),
- StructField("other", IntegerType, false),
- StructField("new", IntegerType, false)))
- val readDataSchema = StructType(Seq(StructField("data", IntegerType, false)))
- val readDataSchemaNotEqual = StructType(Seq(
- StructField("data", IntegerType, false),
- StructField("other", IntegerType, false)))
- val readPartitionSchema = StructType(Seq(StructField("partition", IntegerType, false)))
- val readPartitionSchemaNotEqual = StructType(Seq(
- StructField("partition", IntegerType, false),
- StructField("other", IntegerType, false)))
+ val dataSchema = StructType.fromDDL("data INT, partition INT, other INT")
+ val dataSchemaNotEqual = StructType.fromDDL("data INT, partition INT, other INT, new INT")
+ val readDataSchema = StructType.fromDDL("data INT")
+ val readDataSchemaNotEqual = StructType.fromDDL("data INT, other INT")
+ val readPartitionSchema = StructType.fromDDL("partition INT")
+ val readPartitionSchemaNotEqual = StructType.fromDDL("partition INT, other INT")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dongjoon-hyun, I updated the PR with this change in: d782723
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well. The PR is not merged back into a single PR for both issues.
Since this is merged by @cloud-fan 's request, I'm fine and I'll leave this to him.
Thank you, @peter-toth and @cloud-fan .
Kubernetes integration test starting |
Kubernetes integration test status failure |
@@ -84,11 +85,25 @@ trait FileScan extends Scan | |||
|
|||
protected def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]") | |||
|
|||
private lazy val (normalizedPartitionFilters, normalizedDataFilters) = { | |||
val output = readSchema().toAttributes.map(a => a.withName(normalizeName(a.name))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about it again, the FileScan
equality already considers fileIndex
and readSchema
, which means 2 file scans only equal to each other if they read the same set of files and the same set of columns.
Given that, I think the expr IDs do not matter for filters, only the column name matters. For normal v2 sources, they use Filter
not Expression
, which do not have expr IDs either.
The data/partition filters are created in PruneFileSourcePartitions
(see https://github.com/apache/spark/blob/v3.1.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala#L51), and the column names inside filters are already normalized w.r.t. the actual file scan output schema, so we don't need to consider case sensitivity here.
That said, I think the normalize logic here should be very simple: just turn expr IDs to 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point and agree that name is that matters in these Filter
like Expressions
but if we go this way then I think:
- we also need to clear other properties of
AttributeReference
s likequalifier
- we need to either explicitly sort
partitionFilters
anddataFilters
expression lists (probably with.sortBy(_.hashCode())
) to make sure they match withf.partitionFilters
andf.dataFilters
, or useSet(partitionFilters) == Set(f.partitionFilters)
because we can't useExpressionSet(partitionFilters) == ExpressionSet(f.partitionFilters)
as we removed all expr ids - we need to reorder all descendants of each
partitionFilters
anddataFilters
expression (withCanonicalize.expressionReorder()
to make sure likeid = 1
matches with1 = id
(andCanonicalize.ignoreTimeZone()
also needs to be applied)
And just a side note that I think we could do most of the above at https://github.com/apache/spark/blob/v3.1.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala#L120-L121 before withFilters()
and then FileScan.equals()
became very simple.
But I wonder all these changes are simpler than the current PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. For simplicity maybe we should just assign fresh expr IDs like what this PR did, but we can remove the case sensitivity handling here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, removed in 30d2d8b
Test build #136253 has finished for PR 31848 at commit
|
…756-fix-filescan-equality-check # Conflicts: # sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Kubernetes integration test starting |
Kubernetes integration test status failure |
test(s"SPARK-33482: Test $name equals") { | ||
val partitioningAwareFileIndex = newPartitioningAwareFileIndex() | ||
|
||
val parquetScan = scanBuilder( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parquetScan
-> fileScan
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 0d38ac2
Test build #136340 has finished for PR 31848 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #136349 has finished for PR 31848 at commit
|
thanks, merging to master/3.1/3.0! |
### What changes were proposed in this pull request? This bug was introduced by SPARK-30428 at Apache Spark 3.0.0. This PR fixes `FileScan.equals()`. ### Why are the changes needed? - Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account. - Partition filters and data filters added to `FileScan` (in #27112 and #27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities. ### Does this PR introduce _any_ user-facing change? Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues. ### How was this patch tested? Added new UTs. Closes #31848 from peter-toth/SPARK-34756-fix-filescan-equality-check. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 93a5d34) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Thanks @cloud-fan and @dongjoon-hyun for the review. |
This bug was introduced by SPARK-30428 at Apache Spark 3.0.0. This PR fixes `FileScan.equals()`. - Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account. - Partition filters and data filters added to `FileScan` (in #27112 and #27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities. Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues. Added new UTs. Closes #31848 from peter-toth/SPARK-34756-fix-filescan-equality-check. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 93a5d34) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Thank you, @peter-toth and @cloud-fan . |
It seems this breaks
I'll revert it first at branch-3.0. Please make a backporting PR to branch-3.0, @peter-toth . |
This bug was introduced by SPARK-30428 at Apache Spark 3.0.0. This PR fixes `FileScan.equals()`. - Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account. - Partition filters and data filters added to `FileScan` (in apache#27112 and apache#27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities. Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues. Added new UTs. Closes apache#31848 from peter-toth/SPARK-34756-fix-filescan-equality-check. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 93a5d34) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@dongjoon-hyun, I've opened a 3.0 backport PR here: #31952 |
### What changes were proposed in this pull request? This bug was introduced by SPARK-30428 at Apache Spark 3.0.0. This PR fixes `FileScan.equals()`. ### Why are the changes needed? - Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account. - Partition filters and data filters added to `FileScan` (in apache#27112 and apache#27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities. ### Does this PR introduce _any_ user-facing change? Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues. ### How was this patch tested? Added new UTs. Closes apache#31848 from peter-toth/SPARK-34756-fix-filescan-equality-check. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 93a5d34) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? This bug was introduced by SPARK-30428 at Apache Spark 3.0.0. This PR fixes `FileScan.equals()`. ### Why are the changes needed? - Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account. - Partition filters and data filters added to `FileScan` (in apache#27112 and apache#27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities. ### Does this PR introduce _any_ user-facing change? Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues. ### How was this patch tested? Added new UTs. Closes apache#31848 from peter-toth/SPARK-34756-fix-filescan-equality-check. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 93a5d34) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… filter columns are not read ### What changes were proposed in this pull request? Unfortunately the fix in #31848 was not correct in all cases. When the partition or data filter contains a column that is not in `readSchema()` the filter nornalization in `FileScan.equals()` doesn't work. ### Why are the changes needed? To fix `FileScan.equals()` to fix reuse issues. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added new UT. Closes #37693 from peter-toth/SPARK-40245-fix-filescan-equals. Authored-by: Peter Toth <ptoth@cloudera.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… filter columns are not read ### What changes were proposed in this pull request? Unfortunately the fix in apache/spark#31848 was not correct in all cases. When the partition or data filter contains a column that is not in `readSchema()` the filter nornalization in `FileScan.equals()` doesn't work. ### Why are the changes needed? To fix `FileScan.equals()` to fix reuse issues. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added new UT. Closes #37693 from peter-toth/SPARK-40245-fix-filescan-equals. Authored-by: Peter Toth <ptoth@cloudera.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… filter columns are not read ### What changes were proposed in this pull request? Unfortunately the fix in apache/spark#31848 was not correct in all cases. When the partition or data filter contains a column that is not in `readSchema()` the filter nornalization in `FileScan.equals()` doesn't work. ### Why are the changes needed? To fix `FileScan.equals()` to fix reuse issues. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added new UT. Closes #37693 from peter-toth/SPARK-40245-fix-filescan-equals. Authored-by: Peter Toth <ptoth@cloudera.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… filter columns are not read ### What changes were proposed in this pull request? Unfortunately the fix in apache/spark#31848 was not correct in all cases. When the partition or data filter contains a column that is not in `readSchema()` the filter nornalization in `FileScan.equals()` doesn't work. ### Why are the changes needed? To fix `FileScan.equals()` to fix reuse issues. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added new UT. Closes #37693 from peter-toth/SPARK-40245-fix-filescan-equals. Authored-by: Peter Toth <ptoth@cloudera.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This bug was introduced by SPARK-30428 at Apache Spark 3.0.0.
This PR fixes
FileScan.equals()
.Why are the changes needed?
FileScan.equals
doesn't takefileIndex
andreadSchema
into account.FileScan
(in [SPARK-30428][SQL] File source V2: support partition pruning #27112 and [SPARK-30475][SQL] File source V2: Push data filters for file listing #27157) caused that canonicalized form of someBatchScanExec
nodes don't match and this prevents some reuse possibilities.Does this PR introduce any user-facing change?
Yes, before this fix incorrect reuse of
FileScan
and soBatchScanExec
could have happed causing correctness issues.How was this patch tested?
Added new UTs.