Skip to content


[SPARK-21322][SQL] support histogram in filter cardinality estimation
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Histogram is effective in dealing with skewed distribution. After we generate histogram information for column statistics, we need to adjust filter estimation based on histogram data structure.

## How was this patch tested?

We revised all the unit test cases by including histogram data structure.

Please review before opening a pull request.

Author: Ron Hu <>

Closes #19783 from ron8hu/supportHistogram.
  • Loading branch information
ron8hu authored and cloud-fan committed Dec 12, 2017
1 parent a400265 commit ecc179e
Show file tree
Hide file tree
Showing 3 changed files with 448 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
import scala.math.BigDecimal.RoundingMode

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types.{DecimalType, _}

Expand Down Expand Up @@ -114,4 +114,99 @@ object EstimationUtils {

* Returns the number of the first bin into which a column value falls for a specified
* numeric equi-height histogram.
* @param value a literal value of a column
* @param bins an array of bins for a given numeric equi-height histogram
* @return the id of the first bin into which a column value falls.
def findFirstBinForValue(value: Double, bins: Array[HistogramBin]): Int = {
var i = 0
while ((i < bins.length) && (value > bins(i).hi)) {
i += 1

* Returns the number of the last bin into which a column value falls for a specified
* numeric equi-height histogram.
* @param value a literal value of a column
* @param bins an array of bins for a given numeric equi-height histogram
* @return the id of the last bin into which a column value falls.
def findLastBinForValue(value: Double, bins: Array[HistogramBin]): Int = {
var i = bins.length - 1
while ((i >= 0) && (value < bins(i).lo)) {
i -= 1

* Returns a percentage of a bin holding values for column value in the range of
* [lowerValue, higherValue]
* @param higherValue a given upper bound value of a specified column value range
* @param lowerValue a given lower bound value of a specified column value range
* @param bin a single histogram bin
* @return the percentage of a single bin holding values in [lowerValue, higherValue].
private def getOccupation(
higherValue: Double,
lowerValue: Double,
bin: HistogramBin): Double = {
assert(bin.lo <= lowerValue && lowerValue <= higherValue && higherValue <= bin.hi)
if (bin.hi == bin.lo) {
// the entire bin is covered in the range
} else if (higherValue == lowerValue) {
// set percentage to 1/NDV
1.0 / bin.ndv.toDouble
} else {
// Use proration since the range falls inside this bin.
math.min((higherValue - lowerValue) / (bin.hi - bin.lo), 1.0)

* Returns the number of bins for column values in [lowerValue, higherValue].
* The column value distribution is saved in an equi-height histogram. The return values is a
* double value is because we may return a portion of a bin. For example, a predicate
* "column = 8" may return the number of bins 0.2 if the holding bin has 5 distinct values.
* @param higherId id of the high end bin holding the high end value of a column range
* @param lowerId id of the low end bin holding the low end value of a column range
* @param higherEnd a given upper bound value of a specified column value range
* @param lowerEnd a given lower bound value of a specified column value range
* @param histogram a numeric equi-height histogram
* @return the number of bins for column values in [lowerEnd, higherEnd].
def getOccupationBins(
higherId: Int,
lowerId: Int,
higherEnd: Double,
lowerEnd: Double,
histogram: Histogram): Double = {
assert(lowerId <= higherId)

if (lowerId == higherId) {
val curBin = histogram.bins(lowerId)
getOccupation(higherEnd, lowerEnd, curBin)
} else {
// compute how much lowerEnd/higherEnd occupies its bin
val lowerCurBin = histogram.bins(lowerId)
val lowerPart = getOccupation(lowerCurBin.hi, lowerEnd, lowerCurBin)

val higherCurBin = histogram.bins(higherId)
val higherPart = getOccupation(higherEnd, higherCurBin.lo, higherCurBin)

// the total length is lowerPart + higherPart + bins between them
lowerPart + higherPart + higherId - lowerId - 1

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Filter, LeafNode, Statistics}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -265,7 +265,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
* @param update a boolean flag to specify if we need to update ColumnStat of a given column
* for subsequent conditions
* @return an optional double value to show the percentage of rows meeting a given condition
* It returns None if no statistics exists for a given column or wrong value.
* It returns None if no statistics exists for a given column or wrong value.
def evaluateBinary(
op: BinaryComparison,
Expand Down Expand Up @@ -332,8 +332,44 @@ case class FilterEstimation(plan: Filter) extends Logging {
colStatsMap.update(attr, newStats)

Some(1.0 / BigDecimal(ndv))
} else {
if (colStat.histogram.isEmpty) {
// returns 1/ndv if there is no histogram
Some(1.0 / BigDecimal(ndv))
} else {
// We compute filter selectivity using Histogram information.
val datum = EstimationUtils.toDecimal(literal.value, literal.dataType).toDouble
val histogram = colStat.histogram.get
val hgmBins = histogram.bins

// find bins where column's current min and max locate. Note that a column's [min, max]
// range may change due to another condition applied earlier.
val min = EstimationUtils.toDecimal(colStat.min.get, literal.dataType).toDouble
val max = EstimationUtils.toDecimal(colStat.max.get, literal.dataType).toDouble
val minBinId = EstimationUtils.findFirstBinForValue(min, hgmBins)
val maxBinId = EstimationUtils.findLastBinForValue(max, hgmBins)

// compute how many bins the column's current valid range [min, max] occupies.
// Note that a column's [min, max] range may vary after we apply some filter conditions.
val validRangeBins = EstimationUtils.getOccupationBins(maxBinId, minBinId, max,
min, histogram)

val lowerBinId = EstimationUtils.findFirstBinForValue(datum, hgmBins)
val higherBinId = EstimationUtils.findLastBinForValue(datum, hgmBins)
assert(lowerBinId <= higherBinId)
val lowerBinNdv = hgmBins(lowerBinId).ndv
val higherBinNdv = hgmBins(higherBinId).ndv
// assume uniform distribution in each bin
val occupiedBins = if (lowerBinId == higherBinId) {
1.0 / lowerBinNdv
} else {
(1.0 / lowerBinNdv) + // lowest bin
(higherBinId - lowerBinId - 1) + // middle bins
(1.0 / higherBinNdv) // highest bin
Some(occupiedBins / validRangeBins)

} else { // not in interval

Expand Down Expand Up @@ -471,37 +507,46 @@ case class FilterEstimation(plan: Filter) extends Logging {
percent = 1.0
} else {
// This is the partial overlap case:
// Without advanced statistics like histogram, we assume uniform data distribution.
// We just prorate the adjusted range over the initial range to compute filter selectivity.
assert(max > min)
percent = op match {
case _: LessThan =>
if (numericLiteral == max) {
// If the literal value is right on the boundary, we can minus the part of the
// boundary value (1/ndv).
1.0 - 1.0 / ndv
} else {
(numericLiteral - min) / (max - min)
case _: LessThanOrEqual =>
if (numericLiteral == min) {
// The boundary value is the only satisfying value.
1.0 / ndv
} else {
(numericLiteral - min) / (max - min)
case _: GreaterThan =>
if (numericLiteral == min) {
1.0 - 1.0 / ndv
} else {
(max - numericLiteral) / (max - min)
case _: GreaterThanOrEqual =>
if (numericLiteral == max) {
1.0 / ndv
} else {
(max - numericLiteral) / (max - min)

if (colStat.histogram.isEmpty) {
// Without advanced statistics like histogram, we assume uniform data distribution.
// We just prorate the adjusted range over the initial range to compute filter selectivity.
assert(max > min)
percent = op match {
case _: LessThan =>
if (numericLiteral == max) {
// If the literal value is right on the boundary, we can minus the part of the
// boundary value (1/ndv).
1.0 - 1.0 / ndv
} else {
(numericLiteral - min) / (max - min)
case _: LessThanOrEqual =>
if (numericLiteral == min) {
// The boundary value is the only satisfying value.
1.0 / ndv
} else {
(numericLiteral - min) / (max - min)
case _: GreaterThan =>
if (numericLiteral == min) {
1.0 - 1.0 / ndv
} else {
(max - numericLiteral) / (max - min)
case _: GreaterThanOrEqual =>
if (numericLiteral == max) {
1.0 / ndv
} else {
(max - numericLiteral) / (max - min)
} else {
val numericHistogram = colStat.histogram.get
val datum = EstimationUtils.toDecimal(literal.value, literal.dataType).toDouble
val max = EstimationUtils.toDecimal(colStat.max.get, literal.dataType).toDouble
val min = EstimationUtils.toDecimal(colStat.min.get, literal.dataType).toDouble
percent = computePercentByEquiHeightHgm(op, numericHistogram, max, min, datum)

if (update) {
Expand All @@ -513,10 +558,9 @@ case class FilterEstimation(plan: Filter) extends Logging {

op match {
case _: GreaterThan | _: GreaterThanOrEqual =>
// If new ndv is 1, then new max must be equal to new min.
newMin = if (newNdv == 1) newMax else newValue
newMin = newValue
case _: LessThan | _: LessThanOrEqual =>
newMax = if (newNdv == 1) newMin else newValue
newMax = newValue

val newStats =
Expand All @@ -529,6 +573,54 @@ case class FilterEstimation(plan: Filter) extends Logging {

* Returns the selectivity percentage for binary condition in the column's
* current valid range [min, max]
* @param op a binary comparison operator
* @param histogram a numeric equi-height histogram
* @param max the upper bound of the current valid range for a given column
* @param min the lower bound of the current valid range for a given column
* @param datumNumber the numeric value of a literal
* @return the selectivity percentage for a condition in the current range.

def computePercentByEquiHeightHgm(
op: BinaryComparison,
histogram: Histogram,
max: Double,
min: Double,
datumNumber: Double): Double = {
// find bins where column's current min and max locate. Note that a column's [min, max]
// range may change due to another condition applied earlier.
val minBinId = EstimationUtils.findFirstBinForValue(min, histogram.bins)
val maxBinId = EstimationUtils.findLastBinForValue(max, histogram.bins)

// compute how many bins the column's current valid range [min, max] occupies.
// Note that a column's [min, max] range may vary after we apply some filter conditions.
val minToMaxLength = EstimationUtils.getOccupationBins(maxBinId, minBinId, max, min, histogram)

val datumInBinId = op match {
case LessThan(_, _) | GreaterThanOrEqual(_, _) =>
EstimationUtils.findFirstBinForValue(datumNumber, histogram.bins)
case LessThanOrEqual(_, _) | GreaterThan(_, _) =>
EstimationUtils.findLastBinForValue(datumNumber, histogram.bins)

op match {
// LessThan and LessThanOrEqual share the same logic,
// but their datumInBinId may be different
case LessThan(_, _) | LessThanOrEqual(_, _) =>
EstimationUtils.getOccupationBins(datumInBinId, minBinId, datumNumber, min,
histogram) / minToMaxLength
// GreaterThan and GreaterThanOrEqual share the same logic,
// but their datumInBinId may be different
case GreaterThan(_, _) | GreaterThanOrEqual(_, _) =>
EstimationUtils.getOccupationBins(maxBinId, datumInBinId, max, datumNumber,
histogram) / minToMaxLength

* Returns a percentage of rows meeting a binary comparison expression containing two columns.
* In SQL queries, we also see predicate expressions involving two columns
Expand Down Expand Up @@ -784,11 +876,16 @@ case class ColumnStatsMap(originalMap: AttributeMap[ColumnStat]) {
def outputColumnStats(rowsBeforeFilter: BigInt, rowsAfterFilter: BigInt)
: AttributeMap[ColumnStat] = {
val newColumnStats = { case (attr, oriColStat) =>
// Update ndv based on the overall filter selectivity: scale down ndv if the number of rows
// decreases; otherwise keep it unchanged.
val newNdv = EstimationUtils.updateNdv(oldNumRows = rowsBeforeFilter,
newNumRows = rowsAfterFilter, oldNdv = oriColStat.distinctCount)
val colStat = updatedMap.get(attr.exprId).map(_._2).getOrElse(oriColStat)
val newNdv = if (colStat.distinctCount > 1) {
// Update ndv based on the overall filter selectivity: scale down ndv if the number of rows
// decreases; otherwise keep it unchanged.
EstimationUtils.updateNdv(oldNumRows = rowsBeforeFilter,
newNumRows = rowsAfterFilter, oldNdv = oriColStat.distinctCount)
} else {
// no need to scale down since it is already down to 1 (for skewed distribution case)
attr -> colStat.copy(distinctCount = newNdv)
Expand Down

0 comments on commit ecc179e

Please sign in to comment.