Skip to content

Commit

Permalink
Move stuff out of ColumnStat object.
Browse files Browse the repository at this point in the history
  • Loading branch information
juliuszsompolski committed Feb 25, 2018
1 parent b81252b commit 0406f52
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 177 deletions.
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.types._


/**
Expand Down Expand Up @@ -407,7 +407,7 @@ case class CatalogColumnStat(
* The key is the name of the column and name of the field (e.g. "colName.distinctCount"),
* and the value is the string representation for the value.
* min/max values are stored as Strings. They can be deserialized using
* [[ColumnStat.fromExternalString]].
* [[CatalogColumnStat.fromExternalString]].
*
* As part of the protocol, the returned map always contains a key called "version".
* Any of the fields that are null (None) won't appear in the map.
Expand Down Expand Up @@ -437,8 +437,8 @@ case class CatalogColumnStat(
dataType: DataType): ColumnStat =
ColumnStat(
distinctCount = distinctCount,
min = min.map(ColumnStat.fromExternalString(_, colName, dataType)),
max = max.map(ColumnStat.fromExternalString(_, colName, dataType)),
min = min.map(CatalogColumnStat.fromExternalString(_, colName, dataType)),
max = max.map(CatalogColumnStat.fromExternalString(_, colName, dataType)),
nullCount = nullCount,
avgLen = avgLen,
maxLen = maxLen,
Expand All @@ -457,6 +457,48 @@ object CatalogColumnStat extends Logging {
private val KEY_MAX_LEN = "maxLen"
private val KEY_HISTOGRAM = "histogram"

/**
* Converts from string representation of data type to the corresponding Catalyst data type.
*/
def fromExternalString(s: String, name: String, dataType: DataType): Any = {
dataType match {
case BooleanType => s.toBoolean
case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
case ByteType => s.toByte
case ShortType => s.toShort
case IntegerType => s.toInt
case LongType => s.toLong
case FloatType => s.toFloat
case DoubleType => s.toDouble
case _: DecimalType => Decimal(s)
// This version of Spark does not use min/max for binary/string types so we ignore it.
case BinaryType | StringType => null
case _ =>
throw new AnalysisException("Column statistics deserialization is not supported for " +
s"column $name of data type: $dataType.")
}
}

/**
* Converts the given value from Catalyst data type to string representation of external
* data type.
*/
def toExternalString(v: Any, colName: String, dataType: DataType): String = {
val externalValue = dataType match {
case DateType => DateTimeUtils.toJavaDate(v.asInstanceOf[Int])
case TimestampType => DateTimeUtils.toJavaTimestamp(v.asInstanceOf[Long])
case BooleanType | _: IntegralType | FloatType | DoubleType => v
case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
// This version of Spark does not use min/max for binary/string types so we ignore it.
case _ =>
throw new AnalysisException("Column statistics deserialization is not supported for " +
s"column $colName of data type: $dataType.")
}
externalValue.toString
}


/**
* Creates a [[CatalogColumnStat]] object from the given map.
* This is used to deserialize column stats from some external storage.
Expand Down
Expand Up @@ -80,11 +80,10 @@ case class Statistics(
/**
* Statistics collected for a column.
*
* 1. Supported data types are defined in `ColumnStat.supportsType`.
* 2. The JVM data type stored in min/max is the internal data type for the corresponding
* 1. The JVM data type stored in min/max is the internal data type for the corresponding
* Catalyst data type. For example, the internal type of DateType is Int, and that the internal
* type of TimestampType is Long.
* 3. There is no guarantee that the statistics collected are accurate. Approximation algorithms
* 2. There is no guarantee that the statistics collected are accurate. Approximation algorithms
* (sketches) might have been used, and the data collected can also be stale.
*
* @param distinctCount number of distinct values
Expand Down Expand Up @@ -116,176 +115,14 @@ case class ColumnStat(
def toCatalogColumnStat(colName: String, dataType: DataType): CatalogColumnStat =
CatalogColumnStat(
distinctCount = distinctCount,
min = min.map(ColumnStat.toExternalString(_, colName, dataType)),
max = max.map(ColumnStat.toExternalString(_, colName, dataType)),
min = min.map(CatalogColumnStat.toExternalString(_, colName, dataType)),
max = max.map(CatalogColumnStat.toExternalString(_, colName, dataType)),
nullCount = nullCount,
avgLen = avgLen,
maxLen = maxLen,
histogram = histogram)
}


object ColumnStat extends Logging {

/** Returns true iff the we support gathering column statistics on column of the given type. */
def supportsType(dataType: DataType): Boolean = dataType match {
case _: IntegralType => true
case _: DecimalType => true
case DoubleType | FloatType => true
case BooleanType => true
case DateType => true
case TimestampType => true
case BinaryType | StringType => true
case _ => false
}

/** Returns true iff the we support gathering histogram on column of the given type. */
def supportsHistogram(dataType: DataType): Boolean = dataType match {
case _: IntegralType => true
case _: DecimalType => true
case DoubleType | FloatType => true
case DateType => true
case TimestampType => true
case _ => false
}

/**
* Converts from string representation of data type to the corresponding Catalyst data type.
*/
def fromExternalString(s: String, name: String, dataType: DataType): Any = {
dataType match {
case BooleanType => s.toBoolean
case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
case ByteType => s.toByte
case ShortType => s.toShort
case IntegerType => s.toInt
case LongType => s.toLong
case FloatType => s.toFloat
case DoubleType => s.toDouble
case _: DecimalType => Decimal(s)
// This version of Spark does not use min/max for binary/string types so we ignore it.
case BinaryType | StringType => null
case _ =>
throw new AnalysisException("Column statistics deserialization is not supported for " +
s"column $name of data type: $dataType.")
}
}

/**
* Converts the given value from Catalyst data type to string representation of external
* data type.
*/
def toExternalString(v: Any, colName: String, dataType: DataType): String = {
val externalValue = dataType match {
case DateType => DateTimeUtils.toJavaDate(v.asInstanceOf[Int])
case TimestampType => DateTimeUtils.toJavaTimestamp(v.asInstanceOf[Long])
case BooleanType | _: IntegralType | FloatType | DoubleType => v
case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
// This version of Spark does not use min/max for binary/string types so we ignore it.
case _ =>
throw new AnalysisException("Column statistics deserialization is not supported for " +
s"column $colName of data type: $dataType.")
}
externalValue.toString
}

/**
* Constructs an expression to compute column statistics for a given column.
*
* The expression should create a single struct column with the following schema:
* distinctCount: Long, min: T, max: T, nullCount: Long, avgLen: Long, maxLen: Long,
* distinctCountsForIntervals: Array[Long]
*
* Together with [[rowToColumnStat]], this function is used to create [[ColumnStat]] and
* as a result should stay in sync with it.
*/
def statExprs(
col: Attribute,
conf: SQLConf,
colPercentiles: AttributeMap[ArrayData]): CreateNamedStruct = {
def struct(exprs: Expression*): CreateNamedStruct = CreateStruct(exprs.map { expr =>
expr.transformUp { case af: AggregateFunction => af.toAggregateExpression() }
})
val one = Literal(1, LongType)

// the approximate ndv (num distinct value) should never be larger than the number of rows
val numNonNulls = if (col.nullable) Count(col) else Count(one)
val ndv = Least(Seq(HyperLogLogPlusPlus(col, conf.ndvMaxError), numNonNulls))
val numNulls = Subtract(Count(one), numNonNulls)
val defaultSize = Literal(col.dataType.defaultSize, LongType)
val nullArray = Literal(null, ArrayType(LongType))

def fixedLenTypeStruct: CreateNamedStruct = {
val genHistogram =
ColumnStat.supportsHistogram(col.dataType) && colPercentiles.contains(col)
val intervalNdvsExpr = if (genHistogram) {
ApproxCountDistinctForIntervals(col,
Literal(colPercentiles(col), ArrayType(col.dataType)), conf.ndvMaxError)
} else {
nullArray
}
// For fixed width types, avg size should be the same as max size.
struct(ndv, Cast(Min(col), col.dataType), Cast(Max(col), col.dataType), numNulls,
defaultSize, defaultSize, intervalNdvsExpr)
}

col.dataType match {
case _: IntegralType => fixedLenTypeStruct
case _: DecimalType => fixedLenTypeStruct
case DoubleType | FloatType => fixedLenTypeStruct
case BooleanType => fixedLenTypeStruct
case DateType => fixedLenTypeStruct
case TimestampType => fixedLenTypeStruct
case BinaryType | StringType =>
// For string and binary type, we don't compute min, max or histogram
val nullLit = Literal(null, col.dataType)
struct(
ndv, nullLit, nullLit, numNulls,
// Set avg/max size to default size if all the values are null or there is no value.
Coalesce(Seq(Ceil(Average(Length(col))), defaultSize)),
Coalesce(Seq(Cast(Max(Length(col)), LongType), defaultSize)),
nullArray)
case _ =>
throw new AnalysisException("Analyzing column statistics is not supported for column " +
s"${col.name} of data type: ${col.dataType}.")
}
}

/** Convert a struct for column stats (defined in `statExprs`) into [[ColumnStat]]. */
def rowToColumnStat(
row: InternalRow,
attr: Attribute,
rowCount: Long,
percentiles: Option[ArrayData]): ColumnStat = {
// The first 6 fields are basic column stats, the 7th is ndvs for histogram bins.
val cs = ColumnStat(
distinctCount = Option(BigInt(row.getLong(0))),
// for string/binary min/max, get should return null
min = Option(row.get(1, attr.dataType)),
max = Option(row.get(2, attr.dataType)),
nullCount = Option(BigInt(row.getLong(3))),
avgLen = Option(row.getLong(4)),
maxLen = Option(row.getLong(5))
)
if (row.isNullAt(6) || !cs.nullCount.isDefined) {
cs
} else {
val ndvs = row.getArray(6).toLongArray()
assert(percentiles.get.numElements() == ndvs.length + 1)
val endpoints = percentiles.get.toArray[Any](attr.dataType).map(_.toString.toDouble)
// Construct equi-height histogram
val bins = ndvs.zipWithIndex.map { case (ndv, i) =>
HistogramBin(endpoints(i), endpoints(i + 1), ndv)
}
val nonNullRows = rowCount - cs.nullCount.get
val histogram = Histogram(nonNullRows.toDouble / ndvs.length, bins)
cs.copy(histogram = Some(histogram))
}
}

}

/**
* This class is an implementation of equi-height histogram.
* Equi-height histogram represents the distribution of a column's values by a sequence of bins.
Expand Down

0 comments on commit 0406f52

Please sign in to comment.