Skip to content

Commit

Permalink
[SPARK-20986][SQL] Reset table's statistics after PruneFileSourcePart…
Browse files Browse the repository at this point in the history
…itions rule.

## What changes were proposed in this pull request?
After PruneFileSourcePartitions rule, It needs reset table's statistics because PruneFileSourcePartitions can filter some unnecessary partitions. So the statistics need to be changed.

## How was this patch tested?
add unit test.

Author: lianhuiwang <lianhuiwang09@gmail.com>

Closes apache#18205 from lianhuiwang/SPARK-20986.
  • Loading branch information
lianhuiwang authored and wangzejie committed Jun 16, 2017
1 parent fb1a870 commit f71145a
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql.catalyst.catalog.CatalogStatistics
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
Expand Down Expand Up @@ -59,8 +60,11 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)
val prunedFsRelation =
fsRelation.copy(location = prunedFileIndex)(sparkSession)
val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation)

// Change table stats based on the sizeInBytes of pruned files
val withStats = logicalRelation.catalogTable.map(_.copy(
stats = Some(CatalogStatistics(sizeInBytes = BigInt(prunedFileIndex.sizeInBytes)))))
val prunedLogicalRelation = logicalRelation.copy(
relation = prunedFsRelation, catalogTable = withStats)
// Keep partition-pruning predicates so that they are visible in physical planning
val filterExpression = filters.reduceLeft(And)
val filter = Filter(filterExpression, prunedLogicalRelation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive.execution

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
Expand Down Expand Up @@ -66,4 +67,28 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te
}
}
}

test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") {
withTable("tbl") {
spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").saveAsTable("tbl")
sql(s"ANALYZE TABLE tbl COMPUTE STATISTICS")
val tableStats = spark.sessionState.catalog.getTableMetadata(TableIdentifier("tbl")).stats
assert(tableStats.isDefined && tableStats.get.sizeInBytes > 0, "tableStats is lost")

val df = sql("SELECT * FROM tbl WHERE p = 1")
val sizes1 = df.queryExecution.analyzed.collect {
case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes
}
assert(sizes1.size === 1, s"Size wrong for:\n ${df.queryExecution}")
assert(sizes1(0) == tableStats.get.sizeInBytes)

val relations = df.queryExecution.optimizedPlan.collect {
case relation: LogicalRelation => relation
}
assert(relations.size === 1, s"Size wrong for:\n ${df.queryExecution}")
val size2 = relations(0).computeStats(conf).sizeInBytes
assert(size2 == relations(0).catalogTable.get.stats.get.sizeInBytes)
assert(size2 < tableStats.get.sizeInBytes)
}
}
}

0 comments on commit f71145a

Please sign in to comment.