Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21322][SQL] support histogram in filter cardinality estimation #19783

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
import scala.math.BigDecimal.RoundingMode

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types.{DecimalType, _}


Expand Down Expand Up @@ -114,4 +114,99 @@ object EstimationUtils {
}
}

/**
* Returns the number of the first bin into which a column value falls for a specified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: number -> index?

* numeric equi-height histogram.
*
* @param value a literal value of a column
* @param bins an array of bins for a given numeric equi-height histogram
* @return the id of the first bin into which a column value falls.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this is redundant, shall we remove it?

*/
def findFirstBinForValue(value: Double, bins: Array[HistogramBin]): Int = {
var i = 0
while ((i < bins.length) && (value > bins(i).hi)) {
i += 1
}
i
}

/**
* Returns the number of the last bin into which a column value falls for a specified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

* numeric equi-height histogram.
*
* @param value a literal value of a column
* @param bins an array of bins for a given numeric equi-height histogram
* @return the id of the last bin into which a column value falls.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

*/
def findLastBinForValue(value: Double, bins: Array[HistogramBin]): Int = {
var i = bins.length - 1
while ((i >= 0) && (value < bins(i).lo)) {
i -= 1
}
i
}

/**
* Returns a percentage of a bin holding values for column value in the range of
* [lowerValue, higherValue]
*
* @param higherValue a given upper bound value of a specified column value range
* @param lowerValue a given lower bound value of a specified column value range
* @param bin a single histogram bin
* @return the percentage of a single bin holding values in [lowerValue, higherValue].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant

*/
private def getOccupation(
higherValue: Double,
lowerValue: Double,
bin: HistogramBin): Double = {
assert(bin.lo <= lowerValue && lowerValue <= higherValue && higherValue <= bin.hi)
if (bin.hi == bin.lo) {
// the entire bin is covered in the range
1.0
} else if (higherValue == lowerValue) {
// set percentage to 1/NDV
1.0 / bin.ndv.toDouble
} else {
// Use proration since the range falls inside this bin.
math.min((higherValue - lowerValue) / (bin.hi - bin.lo), 1.0)
}
}

/**
* Returns the number of bins for column values in [lowerValue, higherValue].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returns the number of bins... why the return type is double?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because we may return a percentage of a bin. For example, a predicate column=5 may return the number of bins 0.2 if the holding bin has 5 distinct values. Hence, we cannot return an integer type value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see. Let's explain this in the java doc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Done.

* The column value distribution is saved in an equi-height histogram. The return values is a
* double value is because we may return a portion of a bin. For example, a predicate
* "column = 8" may return the number of bins 0.2 if the holding bin has 5 distinct values.
*
* @param higherId id of the high end bin holding the high end value of a column range
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: higherIndex

* @param lowerId id of the low end bin holding the low end value of a column range
* @param higherEnd a given upper bound value of a specified column value range
* @param lowerEnd a given lower bound value of a specified column value range
* @param histogram a numeric equi-height histogram
* @return the number of bins for column values in [lowerEnd, higherEnd].
*/
def getOccupationBins(
higherId: Int,
lowerId: Int,
higherEnd: Double,
lowerEnd: Double,
histogram: Histogram): Double = {
assert(lowerId <= higherId)

if (lowerId == higherId) {
val curBin = histogram.bins(lowerId)
getOccupation(higherEnd, lowerEnd, curBin)
} else {
// compute how much lowerEnd/higherEnd occupies its bin
val lowerCurBin = histogram.bins(lowerId)
val lowerPart = getOccupation(lowerCurBin.hi, lowerEnd, lowerCurBin)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we assert that lowerBin.lo <= lowerEnd


val higherCurBin = histogram.bins(higherId)
val higherPart = getOccupation(higherEnd, higherCurBin.lo, higherCurBin)

// the total length is lowerPart + higherPart + bins between them
lowerPart + higherPart + higherId - lowerId - 1
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Filter, LeafNode, Statistics}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -265,7 +265,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
* @param update a boolean flag to specify if we need to update ColumnStat of a given column
* for subsequent conditions
* @return an optional double value to show the percentage of rows meeting a given condition
* It returns None if no statistics exists for a given column or wrong value.
* It returns None if no statistics exists for a given column or wrong value.
*/
def evaluateBinary(
op: BinaryComparison,
Expand Down Expand Up @@ -332,8 +332,44 @@ case class FilterEstimation(plan: Filter) extends Logging {
colStatsMap.update(attr, newStats)
}

Some(1.0 / BigDecimal(ndv))
} else {
if (colStat.histogram.isEmpty) {
// returns 1/ndv if there is no histogram
Some(1.0 / BigDecimal(ndv))
} else {
// We compute filter selectivity using Histogram information.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to move these to a new method.

Copy link
Contributor Author

@ron8hu ron8hu Dec 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. will do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you create a new method?

val datum = EstimationUtils.toDecimal(literal.value, literal.dataType).toDouble
val histogram = colStat.histogram.get
val hgmBins = histogram.bins

// find bins where column's current min and max locate. Note that a column's [min, max]
// range may change due to another condition applied earlier.
val min = EstimationUtils.toDecimal(colStat.min.get, literal.dataType).toDouble
val max = EstimationUtils.toDecimal(colStat.max.get, literal.dataType).toDouble
val minBinId = EstimationUtils.findFirstBinForValue(min, hgmBins)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: minBinIndex

val maxBinId = EstimationUtils.findLastBinForValue(max, hgmBins)

// compute how many bins the column's current valid range [min, max] occupies.
// Note that a column's [min, max] range may vary after we apply some filter conditions.
val validRangeBins = EstimationUtils.getOccupationBins(maxBinId, minBinId, max,
min, histogram)

val lowerBinId = EstimationUtils.findFirstBinForValue(datum, hgmBins)
val higherBinId = EstimationUtils.findLastBinForValue(datum, hgmBins)
assert(lowerBinId <= higherBinId)
val lowerBinNdv = hgmBins(lowerBinId).ndv
val higherBinNdv = hgmBins(higherBinId).ndv
// assume uniform distribution in each bin
val occupiedBins = if (lowerBinId == higherBinId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this just EstimationUtils.getOccupationBins(higherBinId, lowerBinId, datum, datum, histogram)?

1.0 / lowerBinNdv
} else {
(1.0 / lowerBinNdv) + // lowest bin
(higherBinId - lowerBinId - 1) + // middle bins
(1.0 / higherBinNdv) // highest bin
}
Some(occupiedBins / validRangeBins)
}

} else { // not in interval
Some(0.0)
}

Expand Down Expand Up @@ -471,37 +507,46 @@ case class FilterEstimation(plan: Filter) extends Logging {
percent = 1.0
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else if (colStat.histogram.isEmpty)

// This is the partial overlap case:
// Without advanced statistics like histogram, we assume uniform data distribution.
// We just prorate the adjusted range over the initial range to compute filter selectivity.
assert(max > min)
percent = op match {
case _: LessThan =>
if (numericLiteral == max) {
// If the literal value is right on the boundary, we can minus the part of the
// boundary value (1/ndv).
1.0 - 1.0 / ndv
} else {
(numericLiteral - min) / (max - min)
}
case _: LessThanOrEqual =>
if (numericLiteral == min) {
// The boundary value is the only satisfying value.
1.0 / ndv
} else {
(numericLiteral - min) / (max - min)
}
case _: GreaterThan =>
if (numericLiteral == min) {
1.0 - 1.0 / ndv
} else {
(max - numericLiteral) / (max - min)
}
case _: GreaterThanOrEqual =>
if (numericLiteral == max) {
1.0 / ndv
} else {
(max - numericLiteral) / (max - min)
}

if (colStat.histogram.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this if to upper level, then we can reduce code diff

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea please do it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot move the if statement to upper level. This is because, for the partial overlap case, we need to update the the [min, max] range for a given column. For the no-overlap and complete-overlap cases, we do not need to do so. I think the current code is modular for this reason.

// Without advanced statistics like histogram, we assume uniform data distribution.
// We just prorate the adjusted range over the initial range to compute filter selectivity.
assert(max > min)
percent = op match {
case _: LessThan =>
if (numericLiteral == max) {
// If the literal value is right on the boundary, we can minus the part of the
// boundary value (1/ndv).
1.0 - 1.0 / ndv
} else {
(numericLiteral - min) / (max - min)
}
case _: LessThanOrEqual =>
if (numericLiteral == min) {
// The boundary value is the only satisfying value.
1.0 / ndv
} else {
(numericLiteral - min) / (max - min)
}
case _: GreaterThan =>
if (numericLiteral == min) {
1.0 - 1.0 / ndv
} else {
(max - numericLiteral) / (max - min)
}
case _: GreaterThanOrEqual =>
if (numericLiteral == max) {
1.0 / ndv
} else {
(max - numericLiteral) / (max - min)
}
}
} else {
val numericHistogram = colStat.histogram.get
val datum = EstimationUtils.toDecimal(literal.value, literal.dataType).toDouble
val max = EstimationUtils.toDecimal(colStat.max.get, literal.dataType).toDouble
val min = EstimationUtils.toDecimal(colStat.min.get, literal.dataType).toDouble
percent = computePercentByEquiHeightHgm(op, numericHistogram, max, min, datum)
}

if (update) {
Expand All @@ -513,10 +558,9 @@ case class FilterEstimation(plan: Filter) extends Logging {

op match {
case _: GreaterThan | _: GreaterThanOrEqual =>
// If new ndv is 1, then new max must be equal to new min.
newMin = if (newNdv == 1) newMax else newValue
newMin = newValue
case _: LessThan | _: LessThanOrEqual =>
newMax = if (newNdv == 1) newMin else newValue
newMax = newValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change these two line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously I coded that way because of a corner test case: test("cbool > false"). At that time, I set the newMin to newMax since newNdv = 1. However, this logic does not work well for the skewed distribution test case: test ("cintHgm < 3"). In this test, newMin=1 newMax=3. I think the revised code makes better sense.

}

val newStats =
Expand All @@ -529,6 +573,54 @@ case class FilterEstimation(plan: Filter) extends Logging {
Some(percent)
}

/**
* Returns the selectivity percentage for binary condition in the column's
* current valid range [min, max]
*
* @param op a binary comparison operator
* @param histogram a numeric equi-height histogram
* @param max the upper bound of the current valid range for a given column
* @param min the lower bound of the current valid range for a given column
* @param datumNumber the numeric value of a literal
* @return the selectivity percentage for a condition in the current range.
*/

def computePercentByEquiHeightHgm(
op: BinaryComparison,
histogram: Histogram,
max: Double,
min: Double,
datumNumber: Double): Double = {
// find bins where column's current min and max locate. Note that a column's [min, max]
// range may change due to another condition applied earlier.
val minBinId = EstimationUtils.findFirstBinForValue(min, histogram.bins)
val maxBinId = EstimationUtils.findLastBinForValue(max, histogram.bins)

// compute how many bins the column's current valid range [min, max] occupies.
// Note that a column's [min, max] range may vary after we apply some filter conditions.
val minToMaxLength = EstimationUtils.getOccupationBins(maxBinId, minBinId, max, min, histogram)

val datumInBinId = op match {
case LessThan(_, _) | GreaterThanOrEqual(_, _) =>
EstimationUtils.findFirstBinForValue(datumNumber, histogram.bins)
case LessThanOrEqual(_, _) | GreaterThan(_, _) =>
EstimationUtils.findLastBinForValue(datumNumber, histogram.bins)
}

op match {
// LessThan and LessThanOrEqual share the same logic,
// but their datumInBinId may be different
case LessThan(_, _) | LessThanOrEqual(_, _) =>
EstimationUtils.getOccupationBins(datumInBinId, minBinId, datumNumber, min,
histogram) / minToMaxLength
// GreaterThan and GreaterThanOrEqual share the same logic,
// but their datumInBinId may be different
case GreaterThan(_, _) | GreaterThanOrEqual(_, _) =>
EstimationUtils.getOccupationBins(maxBinId, datumInBinId, max, datumNumber,
histogram) / minToMaxLength
}
}

/**
* Returns a percentage of rows meeting a binary comparison expression containing two columns.
* In SQL queries, we also see predicate expressions involving two columns
Expand Down Expand Up @@ -784,11 +876,16 @@ case class ColumnStatsMap(originalMap: AttributeMap[ColumnStat]) {
def outputColumnStats(rowsBeforeFilter: BigInt, rowsAfterFilter: BigInt)
: AttributeMap[ColumnStat] = {
val newColumnStats = originalMap.map { case (attr, oriColStat) =>
// Update ndv based on the overall filter selectivity: scale down ndv if the number of rows
// decreases; otherwise keep it unchanged.
val newNdv = EstimationUtils.updateNdv(oldNumRows = rowsBeforeFilter,
newNumRows = rowsAfterFilter, oldNdv = oriColStat.distinctCount)
val colStat = updatedMap.get(attr.exprId).map(_._2).getOrElse(oriColStat)
val newNdv = if (colStat.distinctCount > 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to add extra check here, in EstimationUtils.updateNdv we already check this case. We can just revert the change here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old code does not work well for a couple of new skewed-distribution tests. For example, test("cintHgm < 3") would fail. Because it still computes to find newNdv in updateNdv() method. But, in reality, we already scale it down to 1.

// Update ndv based on the overall filter selectivity: scale down ndv if the number of rows
// decreases; otherwise keep it unchanged.
EstimationUtils.updateNdv(oldNumRows = rowsBeforeFilter,
newNumRows = rowsAfterFilter, oldNdv = oriColStat.distinctCount)
} else {
// no need to scale down since it is already down to 1 (for skewed distribution case)
colStat.distinctCount
}
attr -> colStat.copy(distinctCount = newNdv)
}
AttributeMap(newColumnStats.toSeq)
Expand Down