From 1650d5aa5e3d41c57cd0a63f1f916c098bf181b5 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Wed, 22 Aug 2018 20:59:29 +0900 Subject: [PATCH 1/2] Refactor AnalyzeColumnCommand for reuse in other places --- .../command/AnalyzeColumnCommand.scala | 224 +++--------------- .../sql/execution/command/CommandUtils.scala | 200 +++++++++++++++- 2 files changed, 233 insertions(+), 191 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala index 04a9442424354..734b3c7670098 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala @@ -17,17 +17,12 @@ package org.apache.spark.sql.execution.command -import scala.collection.mutable - import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatistics, CatalogTableType} -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.util.ArrayData -import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types._ @@ -42,7 +37,7 @@ case class AnalyzeColumnCommand( allColumns: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - require((columnNames.isDefined ^ allColumns), "Parameter `columnNames` or `allColumns` are " + + require(columnNames.isDefined ^ allColumns, "Parameter `columnNames` or `allColumns` are " + "mutually exclusive. Only one of them should be specified.") val sessionState = sparkSession.sessionState val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) @@ -57,14 +52,19 @@ case class AnalyzeColumnCommand( // Compute stats for the computed list of columns. val (rowCount, newColStats) = - computeColumnStats(sparkSession, relation, columnsToAnalyze) + CommandUtils.computeColumnStats(sparkSession, relation, columnsToAnalyze) + + val newColCatalogStats = newColStats.map { + case (attr, columnStat) => + attr.name -> columnStat.toCatalogColumnStat(attr.name, attr.dataType) + } // We also update table-level stats in order to keep them consistent with column-level stats. val statistics = CatalogStatistics( sizeInBytes = sizeInBytes, rowCount = Some(rowCount), // Newly computed column stats should override the existing ones. - colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColStats) + colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColCatalogStats) sessionState.catalog.alterTableStats(tableIdentWithDB, Some(statistics)) @@ -95,80 +95,35 @@ case class AnalyzeColumnCommand( columnsToAnalyze } - /** - * Compute stats for the given columns. - * @return (row count, map from column name to CatalogColumnStats) - */ - private def computeColumnStats( - sparkSession: SparkSession, - relation: LogicalPlan, - columns: Seq[Attribute]): (Long, Map[String, CatalogColumnStat]) = { - val conf = sparkSession.sessionState.conf - - // Collect statistics per column. - // If no histogram is required, we run a job to compute basic column stats such as - // min, max, ndv, etc. Otherwise, besides basic column stats, histogram will also be - // generated. Currently we only support equi-height histogram. - // To generate an equi-height histogram, we need two jobs: - // 1. compute percentiles p(0), p(1/n) ... p((n-1)/n), p(1). - // 2. use the percentiles as value intervals of bins, e.g. [p(0), p(1/n)], - // [p(1/n), p(2/n)], ..., [p((n-1)/n), p(1)], and then count ndv in each bin. - // Basic column stats will be computed together in the second job. - val attributePercentiles = computePercentiles(columns, sparkSession, relation) - - // The first element in the result will be the overall row count, the following elements - // will be structs containing all column stats. - // The layout of each struct follows the layout of the ColumnStats. - val expressions = Count(Literal(1)).toAggregateExpression() +: - columns.map(statExprs(_, conf, attributePercentiles)) - - val namedExpressions = expressions.map(e => Alias(e, e.toString)()) - val statsRow = new QueryExecution(sparkSession, Aggregate(Nil, namedExpressions, relation)) - .executedPlan.executeTake(1).head + private def analyzeColumnInCatalog(sparkSession: SparkSession): Unit = { + val sessionState = sparkSession.sessionState + val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) + val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) + val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB) + if (tableMeta.tableType == CatalogTableType.VIEW) { + throw new AnalysisException("ANALYZE TABLE is not supported on views.") + } + val sizeInBytes = CommandUtils.calculateTotalSize(sparkSession, tableMeta) + val relation = sparkSession.table(tableIdent).logicalPlan + val columnsToAnalyze = getColumnsToAnalyze(tableIdent, relation, columnNames, allColumns) - val rowCount = statsRow.getLong(0) - val columnStats = columns.zipWithIndex.map { case (attr, i) => - // according to `statExprs`, the stats struct always have 7 fields. - (attr.name, rowToColumnStat(statsRow.getStruct(i + 1, 7), attr, rowCount, - attributePercentiles.get(attr)).toCatalogColumnStat(attr.name, attr.dataType)) - }.toMap - (rowCount, columnStats) - } + // Compute stats for the computed list of columns. + val (rowCount, newColStats) = + CommandUtils.computeColumnStats(sparkSession, relation, columnsToAnalyze) - /** Computes percentiles for each attribute. */ - private def computePercentiles( - attributesToAnalyze: Seq[Attribute], - sparkSession: SparkSession, - relation: LogicalPlan): AttributeMap[ArrayData] = { - val attrsToGenHistogram = if (conf.histogramEnabled) { - attributesToAnalyze.filter(a => supportsHistogram(a.dataType)) - } else { - Nil + val newColCatalogStats = newColStats.map { + case (attr, columnStat) => + attr.name -> columnStat.toCatalogColumnStat(attr.name, attr.dataType) } - val attributePercentiles = mutable.HashMap[Attribute, ArrayData]() - if (attrsToGenHistogram.nonEmpty) { - val percentiles = (0 to conf.histogramNumBins) - .map(i => i.toDouble / conf.histogramNumBins).toArray - val namedExprs = attrsToGenHistogram.map { attr => - val aggFunc = - new ApproximatePercentile(attr, Literal(percentiles), Literal(conf.percentileAccuracy)) - val expr = aggFunc.toAggregateExpression() - Alias(expr, expr.toString)() - } + // We also update table-level stats in order to keep them consistent with column-level stats. + val statistics = CatalogStatistics( + sizeInBytes = sizeInBytes, + rowCount = Some(rowCount), + // Newly computed column stats should override the existing ones. + colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColCatalogStats) - val percentilesRow = new QueryExecution(sparkSession, Aggregate(Nil, namedExprs, relation)) - .executedPlan.executeTake(1).head - attrsToGenHistogram.zipWithIndex.foreach { case (attr, i) => - val percentiles = percentilesRow.getArray(i) - // When there is no non-null value, `percentiles` is null. In such case, there is no - // need to generate histogram. - if (percentiles != null) { - attributePercentiles += attr -> percentiles - } - } - } - AttributeMap(attributePercentiles.toSeq) + sessionState.catalog.alterTableStats(tableIdentWithDB, Some(statistics)) } /** Returns true iff the we support gathering column statistics on column of the given type. */ @@ -182,109 +137,4 @@ case class AnalyzeColumnCommand( case BinaryType | StringType => true case _ => false } - - /** Returns true iff the we support gathering histogram on column of the given type. */ - private def supportsHistogram(dataType: DataType): Boolean = dataType match { - case _: IntegralType => true - case _: DecimalType => true - case DoubleType | FloatType => true - case DateType => true - case TimestampType => true - case _ => false - } - - /** - * Constructs an expression to compute column statistics for a given column. - * - * The expression should create a single struct column with the following schema: - * distinctCount: Long, min: T, max: T, nullCount: Long, avgLen: Long, maxLen: Long, - * distinctCountsForIntervals: Array[Long] - * - * Together with [[rowToColumnStat]], this function is used to create [[ColumnStat]] and - * as a result should stay in sync with it. - */ - private def statExprs( - col: Attribute, - conf: SQLConf, - colPercentiles: AttributeMap[ArrayData]): CreateNamedStruct = { - def struct(exprs: Expression*): CreateNamedStruct = CreateStruct(exprs.map { expr => - expr.transformUp { case af: AggregateFunction => af.toAggregateExpression() } - }) - val one = Literal(1L, LongType) - - // the approximate ndv (num distinct value) should never be larger than the number of rows - val numNonNulls = if (col.nullable) Count(col) else Count(one) - val ndv = Least(Seq(HyperLogLogPlusPlus(col, conf.ndvMaxError), numNonNulls)) - val numNulls = Subtract(Count(one), numNonNulls) - val defaultSize = Literal(col.dataType.defaultSize.toLong, LongType) - val nullArray = Literal(null, ArrayType(LongType)) - - def fixedLenTypeStruct: CreateNamedStruct = { - val genHistogram = - supportsHistogram(col.dataType) && colPercentiles.contains(col) - val intervalNdvsExpr = if (genHistogram) { - ApproxCountDistinctForIntervals(col, - Literal(colPercentiles(col), ArrayType(col.dataType)), conf.ndvMaxError) - } else { - nullArray - } - // For fixed width types, avg size should be the same as max size. - struct(ndv, Cast(Min(col), col.dataType), Cast(Max(col), col.dataType), numNulls, - defaultSize, defaultSize, intervalNdvsExpr) - } - - col.dataType match { - case _: IntegralType => fixedLenTypeStruct - case _: DecimalType => fixedLenTypeStruct - case DoubleType | FloatType => fixedLenTypeStruct - case BooleanType => fixedLenTypeStruct - case DateType => fixedLenTypeStruct - case TimestampType => fixedLenTypeStruct - case BinaryType | StringType => - // For string and binary type, we don't compute min, max or histogram - val nullLit = Literal(null, col.dataType) - struct( - ndv, nullLit, nullLit, numNulls, - // Set avg/max size to default size if all the values are null or there is no value. - Coalesce(Seq(Ceil(Average(Length(col))), defaultSize)), - Coalesce(Seq(Cast(Max(Length(col)), LongType), defaultSize)), - nullArray) - case _ => - throw new AnalysisException("Analyzing column statistics is not supported for column " + - s"${col.name} of data type: ${col.dataType}.") - } - } - - /** Convert a struct for column stats (defined in `statExprs`) into [[ColumnStat]]. */ - private def rowToColumnStat( - row: InternalRow, - attr: Attribute, - rowCount: Long, - percentiles: Option[ArrayData]): ColumnStat = { - // The first 6 fields are basic column stats, the 7th is ndvs for histogram bins. - val cs = ColumnStat( - distinctCount = Option(BigInt(row.getLong(0))), - // for string/binary min/max, get should return null - min = Option(row.get(1, attr.dataType)), - max = Option(row.get(2, attr.dataType)), - nullCount = Option(BigInt(row.getLong(3))), - avgLen = Option(row.getLong(4)), - maxLen = Option(row.getLong(5)) - ) - if (row.isNullAt(6) || cs.nullCount.isEmpty) { - cs - } else { - val ndvs = row.getArray(6).toLongArray() - assert(percentiles.get.numElements() == ndvs.length + 1) - val endpoints = percentiles.get.toArray[Any](attr.dataType).map(_.toString.toDouble) - // Construct equi-height histogram - val bins = ndvs.zipWithIndex.map { case (ndv, i) => - HistogramBin(endpoints(i), endpoints(i + 1), ndv) - } - val nonNullRows = rowCount - cs.nullCount.get - val histogram = Histogram(nonNullRows.toDouble / ndvs.length, bins) - cs.copy(histogram = Some(histogram)) - } - } - } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index df71bc9effb3e..07945847aa613 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -19,16 +19,23 @@ package org.apache.spark.sql.execution.command import java.net.URI +import scala.collection.mutable import scala.util.control.NonFatal import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition} +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFileIndex} -import org.apache.spark.sql.internal.SessionState +import org.apache.spark.sql.internal.{SessionState, SQLConf} +import org.apache.spark.sql.types._ object CommandUtils extends Logging { @@ -153,4 +160,189 @@ object CommandUtils extends Logging { } newStats } + + /** + * Compute stats for the given columns. + * @return (row count, map from column name to CatalogColumnStats) + */ + private[sql] def computeColumnStats( + sparkSession: SparkSession, + relation: LogicalPlan, + columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) = { + val conf = sparkSession.sessionState.conf + + // Collect statistics per column. + // If no histogram is required, we run a job to compute basic column stats such as + // min, max, ndv, etc. Otherwise, besides basic column stats, histogram will also be + // generated. Currently we only support equi-height histogram. + // To generate an equi-height histogram, we need two jobs: + // 1. compute percentiles p(0), p(1/n) ... p((n-1)/n), p(1). + // 2. use the percentiles as value intervals of bins, e.g. [p(0), p(1/n)], + // [p(1/n), p(2/n)], ..., [p((n-1)/n), p(1)], and then count ndv in each bin. + // Basic column stats will be computed together in the second job. + val attributePercentiles = computePercentiles(columns, sparkSession, relation) + + // The first element in the result will be the overall row count, the following elements + // will be structs containing all column stats. + // The layout of each struct follows the layout of the ColumnStats. + val expressions = Count(Literal(1)).toAggregateExpression() +: + columns.map(statExprs(_, conf, attributePercentiles)) + + val namedExpressions = expressions.map(e => Alias(e, e.toString)()) + val statsRow = new QueryExecution(sparkSession, Aggregate(Nil, namedExpressions, relation)) + .executedPlan.executeTake(1).head + + val rowCount = statsRow.getLong(0) + val columnStats = columns.zipWithIndex.map { case (attr, i) => + // according to `statExprs`, the stats struct always have 7 fields. + (attr, rowToColumnStat(statsRow.getStruct(i + 1, 7), attr, rowCount, + attributePercentiles.get(attr))) + }.toMap + (rowCount, columnStats) + } + + /** Computes percentiles for each attribute. */ + private def computePercentiles( + attributesToAnalyze: Seq[Attribute], + sparkSession: SparkSession, + relation: LogicalPlan): AttributeMap[ArrayData] = { + val conf = sparkSession.sessionState.conf + val attrsToGenHistogram = if (conf.histogramEnabled) { + attributesToAnalyze.filter(a => supportsHistogram(a.dataType)) + } else { + Nil + } + val attributePercentiles = mutable.HashMap[Attribute, ArrayData]() + if (attrsToGenHistogram.nonEmpty) { + val percentiles = (0 to conf.histogramNumBins) + .map(i => i.toDouble / conf.histogramNumBins).toArray + + val namedExprs = attrsToGenHistogram.map { attr => + val aggFunc = + new ApproximatePercentile(attr, Literal(percentiles), Literal(conf.percentileAccuracy)) + val expr = aggFunc.toAggregateExpression() + Alias(expr, expr.toString)() + } + + val percentilesRow = new QueryExecution(sparkSession, Aggregate(Nil, namedExprs, relation)) + .executedPlan.executeTake(1).head + attrsToGenHistogram.zipWithIndex.foreach { case (attr, i) => + val percentiles = percentilesRow.getArray(i) + // When there is no non-null value, `percentiles` is null. In such case, there is no + // need to generate histogram. + if (percentiles != null) { + attributePercentiles += attr -> percentiles + } + } + } + AttributeMap(attributePercentiles.toSeq) + } + + + /** Returns true iff the we support gathering histogram on column of the given type. */ + private def supportsHistogram(dataType: DataType): Boolean = dataType match { + case _: IntegralType => true + case _: DecimalType => true + case DoubleType | FloatType => true + case DateType => true + case TimestampType => true + case _ => false + } + + /** + * Constructs an expression to compute column statistics for a given column. + * + * The expression should create a single struct column with the following schema: + * distinctCount: Long, min: T, max: T, nullCount: Long, avgLen: Long, maxLen: Long, + * distinctCountsForIntervals: Array[Long] + * + * Together with [[rowToColumnStat]], this function is used to create [[ColumnStat]] and + * as a result should stay in sync with it. + */ + private def statExprs( + col: Attribute, + conf: SQLConf, + colPercentiles: AttributeMap[ArrayData]): CreateNamedStruct = { + def struct(exprs: Expression*): CreateNamedStruct = CreateStruct(exprs.map { expr => + expr.transformUp { case af: AggregateFunction => af.toAggregateExpression() } + }) + val one = Literal(1.toLong, LongType) + + // the approximate ndv (num distinct value) should never be larger than the number of rows + val numNonNulls = if (col.nullable) Count(col) else Count(one) + val ndv = Least(Seq(HyperLogLogPlusPlus(col, conf.ndvMaxError), numNonNulls)) + val numNulls = Subtract(Count(one), numNonNulls) + val defaultSize = Literal(col.dataType.defaultSize.toLong, LongType) + val nullArray = Literal(null, ArrayType(LongType)) + + def fixedLenTypeStruct: CreateNamedStruct = { + val genHistogram = + supportsHistogram(col.dataType) && colPercentiles.contains(col) + val intervalNdvsExpr = if (genHistogram) { + ApproxCountDistinctForIntervals(col, + Literal(colPercentiles(col), ArrayType(col.dataType)), conf.ndvMaxError) + } else { + nullArray + } + // For fixed width types, avg size should be the same as max size. + struct(ndv, Cast(Min(col), col.dataType), Cast(Max(col), col.dataType), numNulls, + defaultSize, defaultSize, intervalNdvsExpr) + } + + col.dataType match { + case _: IntegralType => fixedLenTypeStruct + case _: DecimalType => fixedLenTypeStruct + case DoubleType | FloatType => fixedLenTypeStruct + case BooleanType => fixedLenTypeStruct + case DateType => fixedLenTypeStruct + case TimestampType => fixedLenTypeStruct + case BinaryType | StringType => + // For string and binary type, we don't compute min, max or histogram + val nullLit = Literal(null, col.dataType) + struct( + ndv, nullLit, nullLit, numNulls, + // Set avg/max size to default size if all the values are null or there is no value. + Coalesce(Seq(Ceil(Average(Length(col))), defaultSize)), + Coalesce(Seq(Cast(Max(Length(col)), LongType), defaultSize)), + nullArray) + case _ => + throw new AnalysisException("Analyzing column statistics is not supported for column " + + s"${col.name} of data type: ${col.dataType}.") + } + } + + /** + * Convert a struct for column stats (defined in `statExprs`) into + * [[org.apache.spark.sql.catalyst.plans.logical.ColumnStat]]. + */ + private def rowToColumnStat( + row: InternalRow, + attr: Attribute, + rowCount: Long, + percentiles: Option[ArrayData]): ColumnStat = { + // The first 6 fields are basic column stats, the 7th is ndvs for histogram bins. + val cs = ColumnStat( + distinctCount = Option(BigInt(row.getLong(0))), + // for string/binary min/max, get should return null + min = Option(row.get(1, attr.dataType)), + max = Option(row.get(2, attr.dataType)), + nullCount = Option(BigInt(row.getLong(3))), + avgLen = Option(row.getLong(4)), + maxLen = Option(row.getLong(5)) + ) + if (row.isNullAt(6) || cs.nullCount.isEmpty) { + cs + } else { + val ndvs = row.getArray(6).toLongArray() + assert(percentiles.get.numElements() == ndvs.length + 1) + val endpoints = percentiles.get.toArray[Any](attr.dataType).map(_.toString.toDouble) + // Construct equi-height histogram + val bins = ndvs.zipWithIndex.map { case (ndv, i) => + HistogramBin(endpoints(i), endpoints(i + 1), ndv) + } + val nonNullRows = rowCount - cs.nullCount.get + val histogram = Histogram(nonNullRows.toDouble / ndvs.length, bins) + cs.copy(histogram = Some(histogram)) + } + } } From a238c03844cf44a598c36703e633a981198d06a2 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sat, 9 Mar 2019 07:24:16 +0900 Subject: [PATCH 2/2] Fix --- .../command/AnalyzeColumnCommand.scala | 39 +------------------ .../sql/execution/command/CommandUtils.scala | 8 ++-- 2 files changed, 5 insertions(+), 42 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala index 734b3c7670098..5d91f33b82563 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -54,17 +53,12 @@ case class AnalyzeColumnCommand( val (rowCount, newColStats) = CommandUtils.computeColumnStats(sparkSession, relation, columnsToAnalyze) - val newColCatalogStats = newColStats.map { - case (attr, columnStat) => - attr.name -> columnStat.toCatalogColumnStat(attr.name, attr.dataType) - } - // We also update table-level stats in order to keep them consistent with column-level stats. val statistics = CatalogStatistics( sizeInBytes = sizeInBytes, rowCount = Some(rowCount), // Newly computed column stats should override the existing ones. - colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColCatalogStats) + colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColStats) sessionState.catalog.alterTableStats(tableIdentWithDB, Some(statistics)) @@ -95,37 +89,6 @@ case class AnalyzeColumnCommand( columnsToAnalyze } - private def analyzeColumnInCatalog(sparkSession: SparkSession): Unit = { - val sessionState = sparkSession.sessionState - val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) - val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) - val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB) - if (tableMeta.tableType == CatalogTableType.VIEW) { - throw new AnalysisException("ANALYZE TABLE is not supported on views.") - } - val sizeInBytes = CommandUtils.calculateTotalSize(sparkSession, tableMeta) - val relation = sparkSession.table(tableIdent).logicalPlan - val columnsToAnalyze = getColumnsToAnalyze(tableIdent, relation, columnNames, allColumns) - - // Compute stats for the computed list of columns. - val (rowCount, newColStats) = - CommandUtils.computeColumnStats(sparkSession, relation, columnsToAnalyze) - - val newColCatalogStats = newColStats.map { - case (attr, columnStat) => - attr.name -> columnStat.toCatalogColumnStat(attr.name, attr.dataType) - } - - // We also update table-level stats in order to keep them consistent with column-level stats. - val statistics = CatalogStatistics( - sizeInBytes = sizeInBytes, - rowCount = Some(rowCount), - // Newly computed column stats should override the existing ones. - colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColCatalogStats) - - sessionState.catalog.alterTableStats(tableIdentWithDB, Some(statistics)) - } - /** Returns true iff the we support gathering column statistics on column of the given type. */ private def supportsType(dataType: DataType): Boolean = dataType match { case _: IntegralType => true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index 07945847aa613..0ea928b0e89ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatistics, CatalogTable} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -168,7 +168,7 @@ object CommandUtils extends Logging { private[sql] def computeColumnStats( sparkSession: SparkSession, relation: LogicalPlan, - columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) = { + columns: Seq[Attribute]): (Long, Map[String, CatalogColumnStat]) = { val conf = sparkSession.sessionState.conf // Collect statistics per column. @@ -195,8 +195,8 @@ object CommandUtils extends Logging { val rowCount = statsRow.getLong(0) val columnStats = columns.zipWithIndex.map { case (attr, i) => // according to `statExprs`, the stats struct always have 7 fields. - (attr, rowToColumnStat(statsRow.getStruct(i + 1, 7), attr, rowCount, - attributePercentiles.get(attr))) + (attr.name, rowToColumnStat(statsRow.getStruct(i + 1, 7), attr, rowCount, + attributePercentiles.get(attr)).toCatalogColumnStat(attr.name, attr.dataType)) }.toMap (rowCount, columnStats) }