Skip to content

Commit

Permalink
[SPARK-33482][SPARK-34756][SQL] Fix FileScan equality check
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This bug was introduced by SPARK-30428 at Apache Spark 3.0.0.
This PR fixes `FileScan.equals()`.

### Why are the changes needed?
- Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account.
- Partition filters and data filters added to `FileScan` (in apache#27112 and apache#27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities.

### Does this PR introduce _any_ user-facing change?
Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues.

### How was this patch tested?
Added new UTs.

Closes apache#31848 from peter-toth/SPARK-34756-fix-filescan-equality-check.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 93a5d34)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
peter-toth authored and fishcus committed Jan 12, 2022
1 parent 519a6fa commit 5a22ad8
Show file tree
Hide file tree
Showing 4 changed files with 446 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.avro

import org.apache.spark.sql.FileScanSuiteBase
import org.apache.spark.sql.v2.avro.AvroScan

class AvroScanSuite extends FileScanSuiteBase {
val scanBuilders = Seq[(String, ScanBuilder, Seq[String])](
("AvroScan",
(s, fi, ds, rds, rps, f, o, pf, df) => AvroScan(s, fi, ds, rds, rps, o, f, pf, df),
Seq.empty))

run(scanBuilders)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.IO_WARNING_LARGEFILETHRESHOLD
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet}
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ExpressionSet}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.connector.read.{Batch, InputPartition, Scan, Statistics, SupportsReportStatistics}
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources._
Expand Down Expand Up @@ -84,11 +85,24 @@ trait FileScan extends Scan

protected def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]")

private lazy val (normalizedPartitionFilters, normalizedDataFilters) = {
val output = readSchema().toAttributes
val partitionFilterAttributes = AttributeSet(partitionFilters).map(a => a.name -> a).toMap
val dataFiltersAttributes = AttributeSet(dataFilters).map(a => a.name -> a).toMap
val normalizedPartitionFilters = ExpressionSet(partitionFilters.map(
QueryPlan.normalizeExpressions(_,
output.map(a => partitionFilterAttributes.getOrElse(a.name, a)))))
val normalizedDataFilters = ExpressionSet(dataFilters.map(
QueryPlan.normalizeExpressions(_,
output.map(a => dataFiltersAttributes.getOrElse(a.name, a)))))
(normalizedPartitionFilters, normalizedDataFilters)
}

override def equals(obj: Any): Boolean = obj match {
case f: FileScan =>
fileIndex == f.fileIndex && readSchema == f.readSchema
ExpressionSet(partitionFilters) == ExpressionSet(f.partitionFilters) &&
ExpressionSet(dataFilters) == ExpressionSet(f.dataFilters)
fileIndex == f.fileIndex && readSchema == f.readSchema &&
normalizedPartitionFilters == f.normalizedPartitionFilters &&
normalizedDataFilters == f.normalizedDataFilters

case _ => false
}
Expand Down
Loading

0 comments on commit 5a22ad8

Please sign in to comment.