Skip to content

Commit

Permalink
column stat refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
juliuszsompolski committed Feb 16, 2018
1 parent 1dc2c1d commit cf36020
Show file tree
Hide file tree
Showing 22 changed files with 712 additions and 613 deletions.
Expand Up @@ -21,7 +21,9 @@ import java.net.URI
import java.util.Date

import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
Expand All @@ -30,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{DataType, StructType}


/**
Expand Down Expand Up @@ -361,15 +363,16 @@ object CatalogTable {
case class CatalogStatistics(
sizeInBytes: BigInt,
rowCount: Option[BigInt] = None,
colStats: Map[String, ColumnStat] = Map.empty) {
colStats: Map[String, CatalogColumnStat] = Map.empty) {

/**
* Convert [[CatalogStatistics]] to [[Statistics]], and match column stats to attributes based
* on column names.
*/
def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean): Statistics = {
if (cboEnabled && rowCount.isDefined) {
val attrStats = AttributeMap(planOutput.flatMap(a => colStats.get(a.name).map(a -> _)))
val attrStats = AttributeMap(planOutput
.flatMap(a => colStats.get(a.name).map(a -> _.toPlanStat(a.name, a.dataType))))
// Estimate size as number of rows * row size.
val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats)
Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats)
Expand All @@ -387,6 +390,101 @@ case class CatalogStatistics(
}
}

/**
* This class of statistics for a column is used in [[CatalogTable]] to interact with metastore.
*/
case class CatalogColumnStat(
distinctCount: Option[BigInt] = None,
min: Option[String] = None,
max: Option[String] = None,
nullCount: Option[BigInt] = None,
avgLen: Option[Long] = None,
maxLen: Option[Long] = None,
histogram: Option[Histogram] = None) {

/**
* Returns a map from string to string that can be used to serialize the column stats.
* The key is the name of the column and name of the field (e.g. "colName.distinctCount"),
* and the value is the string representation for the value.
* min/max values are stored as Strings. They can be deserialized using
* [[ColumnStat.fromExternalString]].
*
* As part of the protocol, the returned map always contains a key called "version".
* In the case min/max values are null (None), they won't appear in the map.
*/
def toMap(colName: String): Map[String, String] = {
val map = new scala.collection.mutable.HashMap[String, String]
map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", "1")
distinctCount.foreach { v =>
map.put(s"${colName}.${CatalogColumnStat.KEY_DISTINCT_COUNT}", v.toString)
}
nullCount.foreach { v =>
map.put(s"${colName}.${CatalogColumnStat.KEY_NULL_COUNT}", v.toString)
}
avgLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_AVG_LEN}", v.toString) }
maxLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_LEN}", v.toString) }
min.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) }
max.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) }
histogram.foreach { h =>
map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", HistogramSerializer.serialize(h))
}
map.toMap
}

/** Convert [[CatalogColumnStat]] to [[ColumnStat]]. */
def toPlanStat(
colName: String,
dataType: DataType): ColumnStat =
ColumnStat(
distinctCount = distinctCount,
min = min.map(ColumnStat.fromExternalString(_, colName, dataType)),
max = max.map(ColumnStat.fromExternalString(_, colName, dataType)),
nullCount = nullCount,
avgLen = avgLen,
maxLen = maxLen,
histogram = histogram)
}

object CatalogColumnStat extends Logging {

// List of string keys used to serialize CatalogColumnStat
val KEY_VERSION = "version"
private val KEY_DISTINCT_COUNT = "distinctCount"
private val KEY_MIN_VALUE = "min"
private val KEY_MAX_VALUE = "max"
private val KEY_NULL_COUNT = "nullCount"
private val KEY_AVG_LEN = "avgLen"
private val KEY_MAX_LEN = "maxLen"
private val KEY_HISTOGRAM = "histogram"

/**
* Creates a [[CatalogColumnStat]] object from the given map.
* This is used to deserialize column stats from some external storage.
* The serialization side is defined in [[CatalogColumnStat.toMap]].
*/
def fromMap(
table: String,
colName: String,
map: Map[String, String]): Option[CatalogColumnStat] = {

try {
Some(CatalogColumnStat(
distinctCount = map.get(s"${colName}.${KEY_DISTINCT_COUNT}").map(v => BigInt(v.toLong)),
min = map.get(s"${colName}.${KEY_MIN_VALUE}"),
max = map.get(s"${colName}.${KEY_MAX_VALUE}"),
nullCount = map.get(s"${colName}.${KEY_NULL_COUNT}").map(v => BigInt(v.toLong)),
avgLen = map.get(s"${colName}.${KEY_AVG_LEN}").map(_.toLong),
maxLen = map.get(s"${colName}.${KEY_MAX_LEN}").map(_.toLong),
histogram = map.get(s"${colName}.${KEY_HISTOGRAM}").map(HistogramSerializer.deserialize)
))
} catch {
case NonFatal(e) =>
logWarning(s"Failed to parse column statistics for column ${colName} in table $table", e)
None
}
}
}


case class CatalogTableType private(name: String)
object CatalogTableType {
Expand Down
Expand Up @@ -187,11 +187,11 @@ object StarSchemaDetection extends PredicateHelper {
stats.rowCount match {
case Some(rowCount) if rowCount >= 0 =>
if (stats.attributeStats.nonEmpty && stats.attributeStats.contains(col)) {
val colStats = stats.attributeStats.get(col)
if (colStats.get.nullCount > 0) {
val colStats = stats.attributeStats.get(col).get
if (!colStats.hasCountStats || colStats.nullCount.get > 0) {
false
} else {
val distinctCount = colStats.get.distinctCount
val distinctCount = colStats.distinctCount.get
val relDiff = math.abs((distinctCount.toDouble / rowCount.toDouble) - 1.0d)
// ndvMaxErr adjusted based on TPCDS 1TB data results
relDiff <= conf.ndvMaxError * 2
Expand Down
Expand Up @@ -27,6 +27,7 @@ import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils}
Expand Down Expand Up @@ -95,75 +96,37 @@ case class Statistics(
* @param histogram histogram of the values
*/
case class ColumnStat(
distinctCount: BigInt,
min: Option[Any],
max: Option[Any],
nullCount: BigInt,
avgLen: Long,
maxLen: Long,
distinctCount: Option[BigInt] = None,
min: Option[Any] = None,
max: Option[Any] = None,
nullCount: Option[BigInt] = None,
avgLen: Option[Long] = None,
maxLen: Option[Long] = None,
histogram: Option[Histogram] = None) {

// We currently don't store min/max for binary/string type. This can change in the future and
// then we need to remove this require.
require(min.isEmpty || (!min.get.isInstanceOf[Array[Byte]] && !min.get.isInstanceOf[String]))
require(max.isEmpty || (!max.get.isInstanceOf[Array[Byte]] && !max.get.isInstanceOf[String]))
// Are distinctCount and nullCount statistics defined?
val hasCountStats = distinctCount.isDefined && nullCount.isDefined

/**
* Returns a map from string to string that can be used to serialize the column stats.
* The key is the name of the field (e.g. "distinctCount" or "min"), and the value is the string
* representation for the value. min/max values are converted to the external data type. For
* example, for DateType we store java.sql.Date, and for TimestampType we store
* java.sql.Timestamp. The deserialization side is defined in [[ColumnStat.fromMap]].
*
* As part of the protocol, the returned map always contains a key called "version".
* In the case min/max values are null (None), they won't appear in the map.
*/
def toMap(colName: String, dataType: DataType): Map[String, String] = {
val map = new scala.collection.mutable.HashMap[String, String]
map.put(ColumnStat.KEY_VERSION, "1")
map.put(ColumnStat.KEY_DISTINCT_COUNT, distinctCount.toString)
map.put(ColumnStat.KEY_NULL_COUNT, nullCount.toString)
map.put(ColumnStat.KEY_AVG_LEN, avgLen.toString)
map.put(ColumnStat.KEY_MAX_LEN, maxLen.toString)
min.foreach { v => map.put(ColumnStat.KEY_MIN_VALUE, toExternalString(v, colName, dataType)) }
max.foreach { v => map.put(ColumnStat.KEY_MAX_VALUE, toExternalString(v, colName, dataType)) }
histogram.foreach { h => map.put(ColumnStat.KEY_HISTOGRAM, HistogramSerializer.serialize(h)) }
map.toMap
}
// Are min and max statistics defined?
val hasMinMaxStats = min.isDefined && max.isDefined

/**
* Converts the given value from Catalyst data type to string representation of external
* data type.
*/
private def toExternalString(v: Any, colName: String, dataType: DataType): String = {
val externalValue = dataType match {
case DateType => DateTimeUtils.toJavaDate(v.asInstanceOf[Int])
case TimestampType => DateTimeUtils.toJavaTimestamp(v.asInstanceOf[Long])
case BooleanType | _: IntegralType | FloatType | DoubleType => v
case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
// This version of Spark does not use min/max for binary/string types so we ignore it.
case _ =>
throw new AnalysisException("Column statistics deserialization is not supported for " +
s"column $colName of data type: $dataType.")
}
externalValue.toString
}
// Are avgLen and maxLen statistics defined?
val hasLenStats = avgLen.isDefined && maxLen.isDefined

def toCatalogColumnStat(colName: String, dataType: DataType): CatalogColumnStat =
CatalogColumnStat(
distinctCount = distinctCount,
min = min.map(ColumnStat.toExternalString(_, colName, dataType)),
max = max.map(ColumnStat.toExternalString(_, colName, dataType)),
nullCount = nullCount,
avgLen = avgLen,
maxLen = maxLen,
histogram = histogram)
}


object ColumnStat extends Logging {

// List of string keys used to serialize ColumnStat
val KEY_VERSION = "version"
private val KEY_DISTINCT_COUNT = "distinctCount"
private val KEY_MIN_VALUE = "min"
private val KEY_MAX_VALUE = "max"
private val KEY_NULL_COUNT = "nullCount"
private val KEY_AVG_LEN = "avgLen"
private val KEY_MAX_LEN = "maxLen"
private val KEY_HISTOGRAM = "histogram"

/** Returns true iff the we support gathering column statistics on column of the given type. */
def supportsType(dataType: DataType): Boolean = dataType match {
case _: IntegralType => true
Expand All @@ -187,35 +150,9 @@ object ColumnStat extends Logging {
}

/**
* Creates a [[ColumnStat]] object from the given map. This is used to deserialize column stats
* from some external storage. The serialization side is defined in [[ColumnStat.toMap]].
* Converts from string representation of data type to the corresponding Catalyst data type.
*/
def fromMap(table: String, field: StructField, map: Map[String, String]): Option[ColumnStat] = {
try {
Some(ColumnStat(
distinctCount = BigInt(map(KEY_DISTINCT_COUNT).toLong),
// Note that flatMap(Option.apply) turns Option(null) into None.
min = map.get(KEY_MIN_VALUE)
.map(fromExternalString(_, field.name, field.dataType)).flatMap(Option.apply),
max = map.get(KEY_MAX_VALUE)
.map(fromExternalString(_, field.name, field.dataType)).flatMap(Option.apply),
nullCount = BigInt(map(KEY_NULL_COUNT).toLong),
avgLen = map.getOrElse(KEY_AVG_LEN, field.dataType.defaultSize.toString).toLong,
maxLen = map.getOrElse(KEY_MAX_LEN, field.dataType.defaultSize.toString).toLong,
histogram = map.get(KEY_HISTOGRAM).map(HistogramSerializer.deserialize)
))
} catch {
case NonFatal(e) =>
logWarning(s"Failed to parse column statistics for column ${field.name} in table $table", e)
None
}
}

/**
* Converts from string representation of external data type to the corresponding Catalyst data
* type.
*/
private def fromExternalString(s: String, name: String, dataType: DataType): Any = {
def fromExternalString(s: String, name: String, dataType: DataType): Any = {
dataType match {
case BooleanType => s.toBoolean
case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
Expand All @@ -235,6 +172,24 @@ object ColumnStat extends Logging {
}
}

/**
* Converts the given value from Catalyst data type to string representation of external
* data type.
*/
def toExternalString(v: Any, colName: String, dataType: DataType): String = {
val externalValue = dataType match {
case DateType => DateTimeUtils.toJavaDate(v.asInstanceOf[Int])
case TimestampType => DateTimeUtils.toJavaTimestamp(v.asInstanceOf[Long])
case BooleanType | _: IntegralType | FloatType | DoubleType => v
case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
// This version of Spark does not use min/max for binary/string types so we ignore it.
case _ =>
throw new AnalysisException("Column statistics deserialization is not supported for " +
s"column $colName of data type: $dataType.")
}
externalValue.toString
}

/**
* Constructs an expression to compute column statistics for a given column.
*
Expand Down Expand Up @@ -305,15 +260,15 @@ object ColumnStat extends Logging {
percentiles: Option[ArrayData]): ColumnStat = {
// The first 6 fields are basic column stats, the 7th is ndvs for histogram bins.
val cs = ColumnStat(
distinctCount = BigInt(row.getLong(0)),
distinctCount = Option(BigInt(row.getLong(0))),
// for string/binary min/max, get should return null
min = Option(row.get(1, attr.dataType)),
max = Option(row.get(2, attr.dataType)),
nullCount = BigInt(row.getLong(3)),
avgLen = row.getLong(4),
maxLen = row.getLong(5)
nullCount = Option(BigInt(row.getLong(3))),
avgLen = Option(row.getLong(4)),
maxLen = Option(row.getLong(5))
)
if (row.isNullAt(6)) {
if (row.isNullAt(6) || !cs.nullCount.isDefined) {
cs
} else {
val ndvs = row.getArray(6).toLongArray()
Expand All @@ -323,7 +278,7 @@ object ColumnStat extends Logging {
val bins = ndvs.zipWithIndex.map { case (ndv, i) =>
HistogramBin(endpoints(i), endpoints(i + 1), ndv)
}
val nonNullRows = rowCount - cs.nullCount
val nonNullRows = rowCount - cs.nullCount.get
val histogram = Histogram(nonNullRows.toDouble / ndvs.length, bins)
cs.copy(histogram = Some(histogram))
}
Expand Down
Expand Up @@ -32,13 +32,18 @@ object AggregateEstimation {
val childStats = agg.child.stats
// Check if we have column stats for all group-by columns.
val colStatsExist = agg.groupingExpressions.forall { e =>
e.isInstanceOf[Attribute] && childStats.attributeStats.contains(e.asInstanceOf[Attribute])
e.isInstanceOf[Attribute] && (
childStats.attributeStats.get(e.asInstanceOf[Attribute]) match {
case Some(colStats) => colStats.hasCountStats
case None => false
})
}
if (rowCountsExist(agg.child) && colStatsExist) {
// Multiply distinct counts of group-by columns. This is an upper bound, which assumes
// the data contains all combinations of distinct values of group-by columns.
var outputRows: BigInt = agg.groupingExpressions.foldLeft(BigInt(1))(
(res, expr) => res * childStats.attributeStats(expr.asInstanceOf[Attribute]).distinctCount)
(res, expr) => res *
childStats.attributeStats(expr.asInstanceOf[Attribute]).distinctCount.get)

outputRows = if (agg.groupingExpressions.isEmpty) {
// If there's no group-by columns, the output is a single row containing values of aggregate
Expand Down

0 comments on commit cf36020

Please sign in to comment.