Skip to content

Commit

Permalink
[SPARK-21083][SQL] Store zero size and row count when analyzing empty…
Browse files Browse the repository at this point in the history
… table

## What changes were proposed in this pull request?

We should be able to store zero size and row count after analyzing empty table.

This pr also enhances the test cases for re-analyzing tables.

## How was this patch tested?

Added a new test case and enhanced some test cases.

Author: Zhenhua Wang <wangzhenhua@huawei.com>

Closes #18292 from wzhfy/analyzeNewColumn.
  • Loading branch information
wzhfy authored and cloud-fan committed Jul 8, 2017
1 parent 0b8dd2d commit 9fccc36
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 11 deletions.
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.command
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType}
import org.apache.spark.sql.execution.SQLExecution


/**
Expand All @@ -40,10 +39,10 @@ case class AnalyzeTableCommand(
}
val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta)

val oldTotalSize = tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
val oldTotalSize = tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(-1L)
val oldRowCount = tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
var newStats: Option[CatalogStatistics] = None
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
if (newTotalSize >= 0 && newTotalSize != oldTotalSize) {
newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
}
// We only set rowCount when noscan is false, because otherwise:
Expand Down
Expand Up @@ -82,6 +82,19 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
}
}

test("analyze empty table") {
val table = "emptyTable"
withTable(table) {
sql(s"CREATE TABLE $table (key STRING, value STRING) USING PARQUET")
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS noscan")
val fetchedStats1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
assert(fetchedStats1.get.sizeInBytes == 0)
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
val fetchedStats2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
assert(fetchedStats2.get.sizeInBytes == 0)
}
}

test("analyze column command - unsupported types and invalid columns") {
val tableName = "column_stats_test1"
withTable(tableName) {
Expand Down
Expand Up @@ -26,6 +26,7 @@ import scala.util.matching.Regex
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.LogicalRelation
Expand Down Expand Up @@ -210,27 +211,62 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
}
}

test("test elimination of the influences of the old stats") {
test("keep existing row count in stats with noscan if table is not changed") {
val textTable = "textTable"
withTable(textTable) {
sql(s"CREATE TABLE $textTable (key STRING, value STRING) STORED AS TEXTFILE")
sql(s"CREATE TABLE $textTable (key STRING, value STRING)")
sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS")
val fetchedStats1 =
checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = Some(500))

sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan")
// when the total size is not changed, the old row count is kept
// when the table is not changed, total size is the same, and the old row count is kept
val fetchedStats2 =
checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = Some(500))
assert(fetchedStats1 == fetchedStats2)
}
}

sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan")
// update total size and remove the old and invalid row count
test("keep existing column stats if table is not changed") {
val table = "update_col_stats_table"
withTable(table) {
sql(s"CREATE TABLE $table (c1 INT, c2 STRING, c3 DOUBLE)")
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c1")
val fetchedStats0 =
checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
assert(fetchedStats0.get.colStats == Map("c1" -> ColumnStat(0, None, None, 0, 4, 4)))

// Insert new data and analyze: have the latest column stats.
sql(s"INSERT INTO TABLE $table SELECT 1, 'a', 10.0")
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c1")
val fetchedStats1 =
checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(1)).get
assert(fetchedStats1.colStats == Map(
"c1" -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(1), nullCount = 0,
avgLen = 4, maxLen = 4)))

// Analyze another column: since the table is not changed, the precious column stats are kept.
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c2")
val fetchedStats2 =
checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(1)).get
assert(fetchedStats2.colStats == Map(
"c1" -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(1), nullCount = 0,
avgLen = 4, maxLen = 4),
"c2" -> ColumnStat(distinctCount = 1, min = None, max = None, nullCount = 0,
avgLen = 1, maxLen = 1)))

// Insert new data and analyze: stale column stats are removed and newly collected column
// stats are added.
sql(s"INSERT INTO TABLE $table SELECT 2, 'b', 20.0")
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c1, c3")
val fetchedStats3 =
checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = None)
assert(fetchedStats3.get.sizeInBytes > fetchedStats2.get.sizeInBytes)
checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2)).get
assert(fetchedStats3.colStats == Map(
"c1" -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2), nullCount = 0,
avgLen = 4, maxLen = 4),
"c3" -> ColumnStat(distinctCount = 2, min = Some(10.0), max = Some(20.0), nullCount = 0,
avgLen = 8, maxLen = 8)))
}
}

Expand Down

0 comments on commit 9fccc36

Please sign in to comment.