Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23445] ColumnStat refactoring #20624

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -21,7 +21,9 @@ import java.net.URI
import java.util.Date

import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
Expand All @@ -30,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._


/**
Expand Down Expand Up @@ -361,15 +363,16 @@ object CatalogTable {
case class CatalogStatistics(
sizeInBytes: BigInt,
rowCount: Option[BigInt] = None,
colStats: Map[String, ColumnStat] = Map.empty) {
colStats: Map[String, CatalogColumnStat] = Map.empty) {

/**
* Convert [[CatalogStatistics]] to [[Statistics]], and match column stats to attributes based
* on column names.
*/
def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean): Statistics = {
if (cboEnabled && rowCount.isDefined) {
val attrStats = AttributeMap(planOutput.flatMap(a => colStats.get(a.name).map(a -> _)))
val attrStats = AttributeMap(planOutput
.flatMap(a => colStats.get(a.name).map(a -> _.toPlanStat(a.name, a.dataType))))
// Estimate size as number of rows * row size.
val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats)
Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats)
Expand All @@ -387,6 +390,143 @@ case class CatalogStatistics(
}
}

/**
* This class of statistics for a column is used in [[CatalogTable]] to interact with metastore.
*/
case class CatalogColumnStat(
distinctCount: Option[BigInt] = None,
min: Option[String] = None,
max: Option[String] = None,
nullCount: Option[BigInt] = None,
avgLen: Option[Long] = None,
maxLen: Option[Long] = None,
histogram: Option[Histogram] = None) {

/**
* Returns a map from string to string that can be used to serialize the column stats.
* The key is the name of the column and name of the field (e.g. "colName.distinctCount"),
* and the value is the string representation for the value.
* min/max values are stored as Strings. They can be deserialized using
* [[CatalogColumnStat.fromExternalString]].
*
* As part of the protocol, the returned map always contains a key called "version".
* Any of the fields that are null (None) won't appear in the map.
*/
def toMap(colName: String): Map[String, String] = {
val map = new scala.collection.mutable.HashMap[String, String]
map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", "1")
distinctCount.foreach { v =>
map.put(s"${colName}.${CatalogColumnStat.KEY_DISTINCT_COUNT}", v.toString)
}
nullCount.foreach { v =>
map.put(s"${colName}.${CatalogColumnStat.KEY_NULL_COUNT}", v.toString)
}
avgLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_AVG_LEN}", v.toString) }
maxLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_LEN}", v.toString) }
min.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) }
max.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) }
histogram.foreach { h =>
map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", HistogramSerializer.serialize(h))
}
map.toMap
}

/** Convert [[CatalogColumnStat]] to [[ColumnStat]]. */
def toPlanStat(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toPlanStat is the same as CatalogStatistics.toPlanStat. Should we use toColumnState?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally made it the same.
CatalogStatistics.toPlanStat converts it to Statistics. CatalogColumnStat.toPlanStat converts it to ColumnStat. The name signifies that it is used to convert both of these objects to their counterparts that are used in the query plan.

colName: String,
dataType: DataType): ColumnStat =
ColumnStat(
distinctCount = distinctCount,
min = min.map(CatalogColumnStat.fromExternalString(_, colName, dataType)),
max = max.map(CatalogColumnStat.fromExternalString(_, colName, dataType)),
nullCount = nullCount,
avgLen = avgLen,
maxLen = maxLen,
histogram = histogram)
}

object CatalogColumnStat extends Logging {

// List of string keys used to serialize CatalogColumnStat
val KEY_VERSION = "version"
private val KEY_DISTINCT_COUNT = "distinctCount"
private val KEY_MIN_VALUE = "min"
private val KEY_MAX_VALUE = "max"
private val KEY_NULL_COUNT = "nullCount"
private val KEY_AVG_LEN = "avgLen"
private val KEY_MAX_LEN = "maxLen"
private val KEY_HISTOGRAM = "histogram"

/**
* Converts from string representation of data type to the corresponding Catalyst data type.
*/
def fromExternalString(s: String, name: String, dataType: DataType): Any = {
dataType match {
case BooleanType => s.toBoolean
case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
case ByteType => s.toByte
case ShortType => s.toShort
case IntegerType => s.toInt
case LongType => s.toLong
case FloatType => s.toFloat
case DoubleType => s.toDouble
case _: DecimalType => Decimal(s)
// This version of Spark does not use min/max for binary/string types so we ignore it.
case BinaryType | StringType => null
case _ =>
throw new AnalysisException("Column statistics deserialization is not supported for " +
s"column $name of data type: $dataType.")
}
}

/**
* Converts the given value from Catalyst data type to string representation of external
* data type.
*/
def toExternalString(v: Any, colName: String, dataType: DataType): String = {
val externalValue = dataType match {
case DateType => DateTimeUtils.toJavaDate(v.asInstanceOf[Int])
case TimestampType => DateTimeUtils.toJavaTimestamp(v.asInstanceOf[Long])
case BooleanType | _: IntegralType | FloatType | DoubleType => v
case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
// This version of Spark does not use min/max for binary/string types so we ignore it.
case _ =>
throw new AnalysisException("Column statistics deserialization is not supported for " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deserialization -> serialization?

s"column $colName of data type: $dataType.")
}
externalValue.toString
}


/**
* Creates a [[CatalogColumnStat]] object from the given map.
* This is used to deserialize column stats from some external storage.
* The serialization side is defined in [[CatalogColumnStat.toMap]].
*/
def fromMap(
table: String,
colName: String,
map: Map[String, String]): Option[CatalogColumnStat] = {

try {
Some(CatalogColumnStat(
distinctCount = map.get(s"${colName}.${KEY_DISTINCT_COUNT}").map(v => BigInt(v.toLong)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have migration issue here? Now, the key is changed. Can Spark 2.4 read the catalog prop wrote by Spark 2.3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The keys or format of stats in the metastore didn't change. After this patch it remains backwards compatible with stats created before.

What changed here is that the map passed here used to contain stats for just one column, stripped of the columnName prefix, and now I'm passing a map that has all statistics for all columns, with keys prefixed by columnName.

It reduces complexity in statsFromProperties, see https://github.com/apache/spark/pull/20624/files#diff-159191585e10542f013cb3a714f26075R1057
It used to create a filtered map for every column, stripping the prefix together with column name.
Now it just passes the map of all column's stat properties, and an individual column picks up what it needs.

I'll add a bit of doc / comments about that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test case? BTW, forwards compatibility is also needed since Hive metastore is being shared by different Spark versions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC the format doesn't change, we just change the way to save/restore stats in metastore, which looks cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The format doesn't change.
There is existing test StatisticsSuite."verify serialized column stats after analyzing columns" that the format of the serialized stats in the metastore doesn't change by comparing it to a manual map of properties.
I will add a test that verifies it the other way - adds the properties manually as TBLPROPERTIES, and verifies that they are successfully parsed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added "verify column stats can be deserialized from tblproperties" test.

min = map.get(s"${colName}.${KEY_MIN_VALUE}"),
max = map.get(s"${colName}.${KEY_MAX_VALUE}"),
nullCount = map.get(s"${colName}.${KEY_NULL_COUNT}").map(v => BigInt(v.toLong)),
avgLen = map.get(s"${colName}.${KEY_AVG_LEN}").map(_.toLong),
maxLen = map.get(s"${colName}.${KEY_MAX_LEN}").map(_.toLong),
histogram = map.get(s"${colName}.${KEY_HISTOGRAM}").map(HistogramSerializer.deserialize)
))
} catch {
case NonFatal(e) =>
logWarning(s"Failed to parse column statistics for column ${colName} in table $table", e)
None
}
}
}


case class CatalogTableType private(name: String)
object CatalogTableType {
Expand Down
Expand Up @@ -187,11 +187,11 @@ object StarSchemaDetection extends PredicateHelper {
stats.rowCount match {
case Some(rowCount) if rowCount >= 0 =>
if (stats.attributeStats.nonEmpty && stats.attributeStats.contains(col)) {
val colStats = stats.attributeStats.get(col)
if (colStats.get.nullCount > 0) {
val colStats = stats.attributeStats.get(col).get
if (!colStats.hasCountStats || colStats.nullCount.get > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check whether it is defined before calling .get?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hasCountStats == distinctCount.isDefined && nullCount.isDefined
So if it passed to the second part of the ||, then hasCountStats == true -> nullCount.isDefined

false
} else {
val distinctCount = colStats.get.distinctCount
val distinctCount = colStats.distinctCount.get
val relDiff = math.abs((distinctCount.toDouble / rowCount.toDouble) - 1.0d)
// ndvMaxErr adjusted based on TPCDS 1TB data results
relDiff <= conf.ndvMaxError * 2
Expand Down