Skip to content

Commit

Permalink
update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gengliangwang committed Jan 7, 2020
1 parent 65200b6 commit 13e9535
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@ import org.apache.commons.io.FileUtils
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql._
import org.apache.spark.sql.TestingUDT.{IntervalData, NullData, NullUDT}
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{DataSource, FilePartition}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.v2.avro.AvroScan
import org.apache.spark.util.Utils

abstract class AvroSuite extends QueryTest with SharedSparkSession {
Expand Down Expand Up @@ -1502,8 +1505,65 @@ class AvroV1Suite extends AvroSuite {
}

class AvroV2Suite extends AvroSuite {
import testImplicits._

override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_LIST, "")

test("Avro source v2: support partition pruning") {
withTempPath { dir =>
Seq(("a", 1, 2), ("b", 1, 2), ("c", 2, 1))
.toDF("value", "p1", "p2")
.write
.format("avro")
.partitionBy("p1", "p2")
.option("header", true)
.save(dir.getCanonicalPath)
val df = spark
.read
.format("avro")
.option("header", true)
.load(dir.getCanonicalPath)
.where("p1 = 1 and p2 = 2 and value != \"a\"")
val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: AvroScan) => f
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.nonEmpty)
assert(fileScan.get.planInputPartitions().forall { partition =>
partition.asInstanceOf[FilePartition].files.forall { file =>
file.filePath.contains("p1=1") && file.filePath.contains("p2=2")
}
})
checkAnswer(df, Row("b", 1, 2))
}
}

private def getBatchScanExec(plan: SparkPlan): BatchScanExec = {
plan.find(_.isInstanceOf[BatchScanExec]).get.asInstanceOf[BatchScanExec]
}

test("Avro source v2: same result with different orders of data filters and partition filters") {
withTempPath { path =>
val tmpDir = path.getCanonicalPath
spark
.range(10)
.selectExpr("id as a", "id + 1 as b", "id + 2 as c", "id + 3 as d")
.write
.partitionBy("a", "b")
.format("avro")
.save(tmpDir)
val df = spark.read.format("avro").load(tmpDir)
// partition filters: a > 1 AND b < 9
// data filters: c > 1 AND d < 9
val plan1 = df.where("a > 1 AND b < 9 AND c > 1 AND d < 9").queryExecution.sparkPlan
val plan2 = df.where("b < 9 AND a > 1 AND d < 9 AND c > 1").queryExecution.sparkPlan
assert(plan1.sameResult(plan2))
val scan1 = getBatchScanExec(plan1)
val scan2 = getBatchScanExec(plan2)
assert(scan1.sameResult(scan2))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -731,19 +731,30 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession {
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
allFileBasedDataSources.foreach { format =>
withTempPath { dir =>
Seq(("a", 1), ("b", 2)).toDF("v", "p").write.format(format)
.partitionBy("p").save(dir.getCanonicalPath)
val df = spark.read.format(format).load(dir.getCanonicalPath).where("p = 1")
Seq(("a", 1, 2), ("b", 1, 2), ("c", 2, 1))
.toDF("value", "p1", "p2")
.write
.format(format)
.partitionBy("p1", "p2")
.option("header", true)
.save(dir.getCanonicalPath)
val df = spark
.read
.format(format)
.option("header", true)
.load(dir.getCanonicalPath)
.where("p1 = 1 and p2 = 2 and value != \"a\"")
val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan) => f
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.nonEmpty)
assert(fileScan.get.planInputPartitions().forall { partition =>
partition.asInstanceOf[FilePartition].files.forall { file =>
file.filePath.contains("p=1")
file.filePath.contains("p1=1") && file.filePath.contains("p2=2")
}
})
checkAnswer(df, Row("b", 1, 2))
}
}
}
Expand Down

0 comments on commit 13e9535

Please sign in to comment.