Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20986] [SQL] Reset table's statistics after PruneFileSourcePartitions rule. #18205

Closed
wants to merge 7 commits into from

Conversation

lianhuiwang
Copy link
Contributor

What changes were proposed in this pull request?

After PruneFileSourcePartitions rule, It needs reset table's statistics because PruneFileSourcePartitions can filter some unnecessary partitions. So the statistics need to be changed.

How was this patch tested?

add unit test.

test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") {
withTempView("tempTbl", "partTbl") {
spark.range(1000).selectExpr("id").createOrReplaceTempView("tempTbl")
sql("CREATE TABLE partTbl (id INT) PARTITIONED BY (part INT) STORED AS parquet")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @lianhuiwang .
withTable("partTbl") instead of withTempView(..., "partTbl")?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thanks.

@SparkQA
Copy link

SparkQA commented Jun 5, 2017

Test build #77744 has finished for PR 18205 at commit 20a6043.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 6, 2017

Test build #77763 has finished for PR 18205 at commit c53a0c7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 9, 2017

Test build #77842 has finished for PR 18205 at commit c53a0c7.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wzhfy
Copy link
Contributor

wzhfy commented Jun 10, 2017

Can you please explain why reset?
To my understanding, we don't have any statistics in CatalogStatistics initially, and we can get the right size in computeStats through HadoopFsRelation.sizeInbytes, because in the original code we already replace it with prunedFsRelation.

@wzhfy
Copy link
Contributor

wzhfy commented Jun 10, 2017

OK. I get your point. But the test case does not clearly show the problem. We can first analyze the table to fill stats in CatalogStatistics, then show difference after partition pruning.

val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation)

val withStats = logicalRelation.catalogTable.map(_.copy(
stats = Some(CatalogStatistics(sizeInBytes = BigInt(prunedFileIndex.sizeInBytes)))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment here indicating we are reseting stats based on pruned file size?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Thanks.

val df = sql("SELECT * FROM partTbl where part = 1")
val query = df.queryExecution.analyzed.analyze
val sizes1 = query.collect {
case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes
Copy link
Contributor

@wzhfy wzhfy Jun 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better not to compute stats for an analyzed plan. We can use spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).stats to query the catalog stats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Thanks.

val sizes2 = Optimize.execute(query).collect {
case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes
}
assert(sizes2.size === 1, s"Size wrong for:\n ${df.queryExecution}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert the new size in catalog stats is larger than the previous one, and equal to computeStats(conf).sizeInBytes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I donot think that it have changed the stats of catalog. after the optimizer, the size in catalog stats is larger than computeStats(conf).sizeInBytes because the partition pruned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LogicalRelation overrides computeStats and it will use CatalogStatistics if it exists

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I donot think that it have changed the stats of catalog.

Don't we reset the catalog stats using the pruned size here?

@SparkQA
Copy link

SparkQA commented Jun 11, 2017

Test build #77892 has finished for PR 18205 at commit 120662e.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

val df = sql("SELECT * FROM partTbl where part = 1")
val query = df.queryExecution.analyzed.analyze
val sizes1 = query.collect {
case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes
Copy link
Contributor

@wzhfy wzhfy Jun 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get catalog stats by relation.catalogTable.get.stats.get here and check it? I just think we need to cover this reset code path

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Thanks.

}

val tableName = "partTbl"
sql(s"analyze table partTbl compute STATISTICS")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ANALYZE TABLE partTbl COMPUTE STATISTICS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Thanks.


withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") {
val df = sql("SELECT * FROM partTbl where part = 1")
val query = df.queryExecution.analyzed.analyze
Copy link
Contributor

@wzhfy wzhfy Jun 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just df.queryExecution.analyzed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there is SubqueryAlias plan, I think that we need analyze() to eliminate it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but why we need to eliminate SubqueryAlias here?

test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") {
withTempView("tempTbl") {
withTable("partTbl") {
spark.range(1000).selectExpr("id").createOrReplaceTempView("tempTbl")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this test, we can use a much smaller size (e.g. 10) to accelerate testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Thanks.

@lianhuiwang
Copy link
Contributor Author

@wzhfy I have addressed your comments. Thanks.

@SparkQA
Copy link

SparkQA commented Jun 12, 2017

Test build #77934 has finished for PR 18205 at commit f7c3dfc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

assert(sizes1.size === 1, s"Size wrong for:\n ${df.queryExecution}")
assert(sizes1(0) == tableStats.get.sizeInBytes)
val sizes2 = Optimize.execute(query).collect {
case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes
Copy link
Contributor

@wzhfy wzhfy Jun 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed the wrong place? For size1, could you get the catalog stats? We'd better not to computeStats for analyzed plan. After optimization, for size2 or size3, we can get sizes from both the catalog stats and computeStats, see if they are equal and larger than size1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Thanks. I have update with it.

@SparkQA
Copy link

SparkQA commented Jun 13, 2017

Test build #77958 has finished for PR 18205 at commit d1513a8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wzhfy
Copy link
Contributor

wzhfy commented Jun 13, 2017

LGTM, ping @cloud-fan


// Change table stats based on the sizeInBytes of pruned files
val withStats = logicalRelation.catalogTable.map(_.copy(
stats = Some(CatalogStatistics(sizeInBytes = BigInt(prunedFileIndex.sizeInBytes)))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we ignore all column stats here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Now it replace stats of CatalogTable with new CatalogStatistics() like DetermineTableStats.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Column stats are collected as table-level, here we need partition-specific stats, so we can ignore column stats.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah actually we have to, the column stats is table level and is invalid for partitions.

test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") {
withTempView("tempTbl") {
withTable("partTbl") {
spark.range(10).selectExpr("id").createOrReplaceTempView("tempTbl")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to simplify the test:

spark.range(10).select('id, 'id % 3 as 'p).write.partitionBy("p").saveAsTable("t")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes,Great, Thanks.

}
assert(sizes1.size === 1, s"Size wrong for:\n ${df.queryExecution}")
assert(sizes1(0) == tableStats.get.sizeInBytes)
val relations = Optimize.execute(query).collect {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

df.queryExecution.optimized

}
assert(relations.size === 1, s"Size wrong for:\n ${df.queryExecution}")
val size2 = relations(0).computeStats(conf).sizeInBytes
val size3 = relations(0).catalogTable.get.stats.get.sizeInBytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

assert(size2 == relations(0).catalogTable.get.stats.get.sizeInBytes)
assert(size2 < tableStats.get.sizeInBytes)

@lianhuiwang
Copy link
Contributor Author

@cloud-fan I have addressed your comments. Thanks.

@SparkQA
Copy link

SparkQA commented Jun 13, 2017

Test build #77989 has finished for PR 18205 at commit 16a3f7e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

We have another related PR: #14655

@cloud-fan
Copy link
Contributor

thanks, merging to master/2.2!

asfgit pushed a commit that referenced this pull request Jun 14, 2017
…itions rule.

## What changes were proposed in this pull request?
After PruneFileSourcePartitions rule, It needs reset table's statistics because PruneFileSourcePartitions can filter some unnecessary partitions. So the statistics need to be changed.

## How was this patch tested?
add unit test.

Author: lianhuiwang <lianhuiwang09@gmail.com>

Closes #18205 from lianhuiwang/SPARK-20986.

(cherry picked from commit 8b5b2e2)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@asfgit asfgit closed this in 8b5b2e2 Jun 14, 2017
@lianhuiwang
Copy link
Contributor Author

@cloud-fan Thanks.

dataknocker pushed a commit to dataknocker/spark that referenced this pull request Jun 16, 2017
…itions rule.

## What changes were proposed in this pull request?
After PruneFileSourcePartitions rule, It needs reset table's statistics because PruneFileSourcePartitions can filter some unnecessary partitions. So the statistics need to be changed.

## How was this patch tested?
add unit test.

Author: lianhuiwang <lianhuiwang09@gmail.com>

Closes apache#18205 from lianhuiwang/SPARK-20986.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants