Skip to content

Commit

Permalink
address the issues in test
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingCat committed Dec 17, 2017
1 parent 4b2fcb6 commit b2d829b
Showing 1 changed file with 34 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -488,32 +488,40 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {

test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") {
withSQLConf("spark.sql.cbo.enabled" -> "true") {
val workDir = s"${Utils.createTempDir()}/table1"
val data = Seq(100, 200, 300, 400).toDF("count")
data.write.parquet(workDir)
val dfFromFile = spark.read.parquet(workDir).cache()
val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect {
case plan: InMemoryRelation => plan
}.head
// InMemoryRelation's stats is file size before the underlying RDD is materialized
assert(inMemoryRelation.computeStats().sizeInBytes === 740)

// InMemoryRelation's stats is updated after materializing RDD
dfFromFile.collect()
assert(inMemoryRelation.computeStats().sizeInBytes === 16)

// test of catalog table
val dfFromTable = spark.catalog.createTable("table1", workDir).cache()
val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan.
collect { case plan: InMemoryRelation => plan }.head

// Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's stats
// is calculated
assert(inMemoryRelation2.computeStats().sizeInBytes === 740)

// InMemoryRelation's stats should be updated after calculating stats of the table
spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS")
assert(inMemoryRelation2.computeStats().sizeInBytes === 16)
withTempDir { workDir =>
withTable("table1") {
val workDirPath = workDir.getAbsolutePath + "/table1"
val data = Seq(100, 200, 300, 400).toDF("count")
data.write.parquet(workDirPath)
val dfFromFile = spark.read.parquet(workDirPath).cache()
val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect {
case plan: InMemoryRelation => plan
}.head
// InMemoryRelation's stats is file size before the underlying RDD is materialized
assert(inMemoryRelation.computeStats().sizeInBytes === 740)

// InMemoryRelation's stats is updated after materializing RDD
dfFromFile.collect()
assert(inMemoryRelation.computeStats().sizeInBytes === 16)

// test of catalog table
val dfFromTable = spark.catalog.createTable("table1", workDirPath).cache()
val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan.
collect { case plan: InMemoryRelation => plan }.head

// Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's stats
// is calculated
assert(inMemoryRelation2.computeStats().sizeInBytes === 740)

// InMemoryRelation's stats should be updated after calculating stats of the table
// clear cache to simulate a fresh environment
dfFromTable.unpersist(blocking = true)
spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS")
val inMemoryRelation3 = spark.read.table("table1").cache().queryExecution.optimizedPlan.
collect { case plan: InMemoryRelation => plan }.head
assert(inMemoryRelation3.computeStats().sizeInBytes === 48)
}
}
}
}
}

0 comments on commit b2d829b

Please sign in to comment.