Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34084][SQL] Fix auto updating of table stats in ALTER TABLE .. ADD PARTITION #31149

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -486,17 +486,17 @@ case class AlterTableAddPartitionCommand(
}

sparkSession.catalog.refreshTable(table.identifier.quotedString)
if (table.stats.nonEmpty) {
if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
val addedSize = CommandUtils.calculateMultipleLocationSizes(sparkSession, table.identifier,
parts.map(_.storage.locationUri)).sum
if (addedSize > 0) {
val newStats = CatalogStatistics(sizeInBytes = table.stats.get.sizeInBytes + addedSize)
catalog.alterTableStats(table.identifier, Some(newStats))
}
} else {
catalog.alterTableStats(table.identifier, None)
if (table.stats.nonEmpty && sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
// Updating table stats only if new partition is not empty
val addedSize = CommandUtils.calculateMultipleLocationSizes(sparkSession, table.identifier,
parts.map(_.storage.locationUri)).sum
if (addedSize > 0) {
val newStats = CatalogStatistics(sizeInBytes = table.stats.get.sizeInBytes + addedSize)
catalog.alterTableStats(table.identifier, Some(newStats))
}
} else {
// Re-calculating of table size including all partitions
CommandUtils.updateTableStats(sparkSession, table)
}
Seq.empty[Row]
}
Expand Down
Expand Up @@ -98,10 +98,11 @@ trait DDLCommandTestUtils extends SQLTestUtils {
sql(s"DESCRIBE TABLE EXTENDED $tableName")
.select("data_type")
.where("col_name = 'Statistics'")
.first()
.getString(0)
if (stats.isEmpty) {
throw new IllegalArgumentException(s"The table $tableName does not have stats")
}
val tableSizeInStats = ".*(\\d) bytes.*".r
val size = stats match {
val size = stats.first().getString(0) match {
case tableSizeInStats(s) => s.toInt
case _ => throw new IllegalArgumentException("Not found table size in stats")
}
Expand Down
Expand Up @@ -23,6 +23,7 @@ import org.apache.commons.io.FileUtils

import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.execution.command
import org.apache.spark.sql.internal.SQLConf

/**
* This base suite contains unified tests for the `ALTER TABLE .. ADD PARTITION` command that
Expand Down Expand Up @@ -72,6 +73,23 @@ trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuit
checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0), Row(0, 1)))
}
}

test("SPARK-34084: auto update table stats") {
withNamespaceAndTable("ns", "tbl") { t =>
withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> "false") {
sql(s"CREATE TABLE $t (col0 int, part int) $defaultUsing PARTITIONED BY (part)")
sql(s"INSERT INTO $t PARTITION (part=0) SELECT 0")
val errMsg = intercept[IllegalArgumentException] {
getTableSize(t)
}.getMessage
assert(errMsg.contains(s"The table $t does not have stats"))
}
withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> "true") {
sql(s"ALTER TABLE $t ADD PARTITION (part=1)")
assert(getTableSize(t) > 0)
}
}
}
}

/**
Expand Down