Skip to content

Commit

Permalink
[SPARK-40245][SQL] Fix FileScan equality check when partition or data…
Browse files Browse the repository at this point in the history
… filter columns are not read

### What changes were proposed in this pull request?
Unfortunately the fix in #31848 was not correct in all cases. When the partition or data filter contains a column that is not in `readSchema()` the filter nornalization in `FileScan.equals()` doesn't work.

### Why are the changes needed?
To fix `FileScan.equals()` to fix reuse issues.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added new UT.

Closes #37693 from peter-toth/SPARK-40245-fix-filescan-equals.

Authored-by: Peter Toth <ptoth@cloudera.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
peter-toth authored and cloud-fan committed Aug 29, 2022
1 parent 375b4e7 commit 0e8d779
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,14 @@ trait FileScan extends Scan
protected def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]")

private lazy val (normalizedPartitionFilters, normalizedDataFilters) = {
val output = readSchema().toAttributes
val partitionFilterAttributes = AttributeSet(partitionFilters).map(a => a.name -> a).toMap
val dataFiltersAttributes = AttributeSet(dataFilters).map(a => a.name -> a).toMap
val normalizedPartitionFilters = ExpressionSet(partitionFilters.map(
QueryPlan.normalizeExpressions(_,
output.map(a => partitionFilterAttributes.getOrElse(a.name, a)))))
QueryPlan.normalizeExpressions(_, fileIndex.partitionSchema.toAttributes
.map(a => partitionFilterAttributes.getOrElse(a.name, a)))))
val dataFiltersAttributes = AttributeSet(dataFilters).map(a => a.name -> a).toMap
val normalizedDataFilters = ExpressionSet(dataFilters.map(
QueryPlan.normalizeExpressions(_,
output.map(a => dataFiltersAttributes.getOrElse(a.name, a)))))
QueryPlan.normalizeExpressions(_, dataSchema.toAttributes
.map(a => dataFiltersAttributes.getOrElse(a.name, a)))))
(normalizedPartitionFilters, normalizedDataFilters)
}

Expand Down
30 changes: 30 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4028,6 +4028,36 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}
}

test("SPARK-40245: Fix FileScan canonicalization when partition or data filter columns are not " +
"read") {
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
withTempPath { path =>
spark.range(5)
.withColumn("p", $"id" % 2)
.write
.mode("overwrite")
.partitionBy("p")
.parquet(path.toString)
withTempView("t") {
spark.read.parquet(path.toString).createOrReplaceTempView("t")
val df = sql(
"""
|SELECT t1.id, t2.id, t3.id
|FROM t AS t1
|JOIN t AS t2 ON t2.id = t1.id
|JOIN t AS t3 ON t3.id = t2.id
|WHERE t1.p = 1 AND t2.p = 1 AND t3.p = 1
|""".stripMargin)
df.collect()
val reusedExchanges = collect(df.queryExecution.executedPlan) {
case r: ReusedExchangeExec => r
}
assert(reusedExchanges.size == 1)
}
}
}
}

test("SPARK-35331: Fix resolving original expression in RepartitionByExpression after aliased") {
Seq("CLUSTER", "DISTRIBUTE").foreach { keyword =>
Seq("a", "substr(a, 0, 3)").foreach { expr =>
Expand Down

0 comments on commit 0e8d779

Please sign in to comment.