Skip to content

Commit

Permalink
normalize filters in FileScan.equals()
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Mar 16, 2021
1 parent e1a9722 commit f65ebe3
Showing 1 changed file with 17 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.IO_WARNING_LARGEFILETHRESHOLD
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet}
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ExpressionSet}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.connector.read.{Batch, InputPartition, Scan, Statistics, SupportsReportStatistics}
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources._
Expand Down Expand Up @@ -84,11 +85,24 @@ trait FileScan extends Scan

protected def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]")

private lazy val (normalizedPartitionFilters, normalizedDataFilters) = {
val output = readSchema().toAttributes
val partitionFilterAttributes = AttributeSet(partitionFilters).map(a => a.name -> a).toMap
val dataFiltersAttributes = AttributeSet(dataFilters).map(a => a.name -> a).toMap
val normalizedPartitionFilters = ExpressionSet(partitionFilters.map(
QueryPlan.normalizeExpressions(_, output.map(a =>
partitionFilterAttributes.getOrElse(a.name, a)))))
val normalizedDataFilters = ExpressionSet(dataFilters.map(
QueryPlan.normalizeExpressions(_, output.map(a =>
dataFiltersAttributes.getOrElse(a.name, a)))))
(normalizedPartitionFilters, normalizedDataFilters)
}

override def equals(obj: Any): Boolean = obj match {
case f: FileScan =>
fileIndex == f.fileIndex && readSchema == f.readSchema &&
ExpressionSet(partitionFilters) == ExpressionSet(f.partitionFilters) &&
ExpressionSet(dataFilters) == ExpressionSet(f.dataFilters)
normalizedPartitionFilters == f.normalizedPartitionFilters &&
normalizedDataFilters == f.normalizedDataFilters

case _ => false
}
Expand Down

0 comments on commit f65ebe3

Please sign in to comment.