Skip to content

Commit

Permalink
check filters in test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
gengliangwang committed Jan 7, 2020
1 parent 31c8c14 commit 7652c35
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import org.apache.commons.io.FileUtils
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql._
import org.apache.spark.sql.TestingUDT.{IntervalData, NullData, NullUDT}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{DataSource, FilePartition}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
Expand Down Expand Up @@ -1527,6 +1529,16 @@ class AvroV2Suite extends AvroSuite {
.option("header", true)
.load(dir.getCanonicalPath)
.where("p1 = 1 and p2 = 2 and value != \"a\"")

val filterCondition = df.queryExecution.optimizedPlan.collectFirst {
case f: Filter => f.condition
}
assert(filterCondition.isDefined)
// The partitions filters should be pushed down and no need to be reevaluated.
assert(filterCondition.get.collectFirst {
case a: AttributeReference if a.name == "p1" || a.name == "p2" => a
}.isEmpty)

val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: AvroScan) => f
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
if (partitionKeyFilters.nonEmpty) {
val prunedV2Relation =
v2Relation.copy(scan = scan.withPartitionFilters(partitionKeyFilters.toSeq))
// The pushed down partition filters don't need to be evaluated again.
// The pushed down partition filters don't need to be reevaluated.
val afterScanFilters =
ExpressionSet(filters) -- partitionKeyFilters.filter(_.references.nonEmpty)
rebuildPhysicalOperation(projects, afterScanFilters.toSeq, prunedV2Relation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2ScanRelation, FileScan}
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
Expand Down Expand Up @@ -744,6 +746,16 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession {
.option("header", true)
.load(dir.getCanonicalPath)
.where("p1 = 1 and p2 = 2 and value != \"a\"")

val filterCondition = df.queryExecution.optimizedPlan.collectFirst {
case f: Filter => f.condition
}
assert(filterCondition.isDefined)
// The partitions filters should be pushed down and no need to be reevaluated.
assert(filterCondition.get.collectFirst {
case a: AttributeReference if a.name == "p1" || a.name == "p2" => a
}.isEmpty)

val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan) => f
}
Expand Down

0 comments on commit 7652c35

Please sign in to comment.