Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22515] [SQL] Estimation relation size based on numRows * rowSize #19743

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIden
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -367,10 +368,11 @@ case class CatalogStatistics(
* on column names.
*/
def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean): Statistics = {
if (cboEnabled) {
val attrStats = planOutput.flatMap(a => colStats.get(a.name).map(a -> _))
Statistics(sizeInBytes = sizeInBytes, rowCount = rowCount,
attributeStats = AttributeMap(attrStats))
if (cboEnabled && rowCount.isDefined) {
val attrStats = AttributeMap(planOutput.flatMap(a => colStats.get(a.name).map(a -> _)))
// Estimate size as number of rows * row size.
val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats)
Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats)
} else {
// When CBO is disabled, we apply the size-only estimation strategy, so there's no need to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now we need to update the comment: when CBP is disabled or the table doesn't have statistics

// propagate other statistics from catalog to the plan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.plans.logical.statsEstimation

import org.apache.spark.sql.catalyst.expressions.AttributeMap
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils

// Check relation statistics
withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {
assert(relation.stats.sizeInBytes == 0)
assert(relation.stats.sizeInBytes == 1)
assert(relation.stats.rowCount == Some(0))
assert(relation.stats.attributeStats.size == 1)
val (attribute, colStat) = relation.stats.attributeStats.head
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,35 @@ import org.apache.spark.sql.types._


class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton {
test("Hive serde tables should fallback to HDFS for size estimation") {

test("size estimation for relations based on row size * number of rows") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is based on

val dsTbl = "rel_est_ds_table"
val hiveTbl = "rel_est_hive_table"
withTable(dsTbl, hiveTbl) {
spark.range(1000L).write.format("parquet").saveAsTable(dsTbl)
sql(s"CREATE TABLE $hiveTbl STORED AS parquet AS SELECT * FROM $dsTbl")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: spark.range(1000L).write.format("hive").saveAsTable(hiveTbl)


Seq(dsTbl, hiveTbl).foreach { tbl =>
sql(s"ANALYZE TABLE $tbl COMPUTE STATISTICS")
val catalogStats = getCatalogStatistics(tbl)
withSQLConf(SQLConf.CBO_ENABLED.key -> "false") {
val relationStats = spark.table(tbl).queryExecution.optimizedPlan.stats
assert(relationStats.sizeInBytes == catalogStats.sizeInBytes)
assert(relationStats.rowCount.isEmpty)
}
spark.sessionState.catalog.refreshTable(TableIdentifier(tbl))
withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {
val relationStats = spark.table(tbl).queryExecution.optimizedPlan.stats
// Due to compression in parquet files, in this test, file size is smaller than
// in-memory size.
assert(catalogStats.sizeInBytes < relationStats.sizeInBytes)
assert(catalogStats.rowCount == relationStats.rowCount)
}
}
}
}

test("Hive serde tables should fallback to HDFS for size estimation") {
withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") {
withTable("csv_table") {
withTempDir { tempDir =>
Expand Down