Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21322][SQL] support histogram in filter cardinality estimation #19783

Closed
wants to merge 10 commits into from

Conversation

ron8hu
Copy link
Contributor

@ron8hu ron8hu commented Nov 19, 2017

What changes were proposed in this pull request?

Histogram is effective in dealing with skewed distribution. After we generate histogram information for column statistics, we need to adjust filter estimation based on histogram data structure.

How was this patch tested?

We revised all the unit test cases by including histogram data structure.

Please review http://spark.apache.org/contributing.html before opening a pull request.

@ron8hu ron8hu changed the title support histogram in filter cardinality estimation [SPARK-21322][SQL] support histogram in filter cardinality estimation Nov 19, 2017
@SparkQA
Copy link

SparkQA commented Nov 19, 2017

Test build #84003 has finished for PR 19783 at commit dd5b975.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ron8hu
Copy link
Contributor Author

ron8hu commented Nov 20, 2017

@@ -158,8 +196,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val condition = Not(And(LessThan(attrInt, Literal(3)), Literal(null, IntegerType)))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
Seq(attrInt -> colStatInt.copy(distinctCount = 8)),
expectedRowCount = 8)
Seq(attrInt -> colStatInt.copy(distinctCount = 7)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add new test cases for filter estimation based on histogram, instead of modifying existing test results?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@SparkQA
Copy link

SparkQA commented Nov 26, 2017

Test build #84190 has finished for PR 19783 at commit 8e5d04e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 28, 2017

Test build #84236 has finished for PR 19783 at commit 052d111.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* @return the number of the first bin/bucket into which a column values falls.
*/

def findFirstBucketForValue(value: Double, histogram: Histogram): Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we unify all names to bin/bins in code and comments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had bucket(s) and bin(s) used interchangeably. To avoid confusion, I will unify them to use only bin/bins.

@SparkQA
Copy link

SparkQA commented Nov 30, 2017

Test build #84317 has finished for PR 19783 at commit 241089c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Some(1.0 / BigDecimal(ndv))
} else {
// We compute filter selectivity using Histogram information
attr.dataType match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use if (colStat.histogram.isEmpty) to seperate the logic of basic stats (Some(1.0 / BigDecimal(ndv))) and histogram computation.

@@ -332,8 +332,45 @@ case class FilterEstimation(plan: Filter) extends Logging {
colStatsMap.update(attr, newStats)
}

Some(1.0 / BigDecimal(ndv))
} else {
// We compute filter selectivity using Histogram information
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this comment where the histogram computation really starts

// returns 1/ndv if there is no histogram
if (colStat.histogram.isEmpty) return Some(1.0 / BigDecimal(ndv))

// We traverse histogram bins to locate the literal value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not accurate, here we want to get the bins occupied by the literal value, because if the value is skewed, it can occupy multiple bins.

// We traverse histogram bins to locate the literal value
val hgmBins = colStat.histogram.get.bins
val datum = EstimationUtils.toDecimal(literal.value, literal.dataType).toDouble
// find the interval where this datum locates
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove this comment, it's explained above

// find the interval where this datum locates
var lowerId, higherId = -1
for (i <- hgmBins.indices) {
// if datum > upperBound, just move to next bin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove the comment, it does not match the logic at next line (there's no "move" logic)

(1.0 / hgmBins.length) / math.max(lowerBinNdv, 1) +
(1.0 / hgmBins.length) / math.max(higherBinNdv, 1)
}
Some(percent)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about simplifying the above logic as:

val occupiedBins = if (lowerId == higherId) {
  1.0 / lowerBinNdv
} else {
  (higherId - lowerId - 1) + 1.0 / lowerBinNdv + 1.0 / higherBinNdv
}
Some(occupiedBins / hgmBins.length)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

binId += 1
}
if ((value == histogram.bins(i).hi) && (i < histogram.bins.length - 1)) {
if (value == histogram.bins(i + 1).lo) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge two ifs: if ((value == histogram.bins(i).hi) && (value == histogram.bins(i + 1).lo) && (i < histogram.bins.length - 1))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used two statements instead of one statement is because, when i points to the last bin, this condition "value == histogram.bins(i + 1).lo" may be out of bound. By separating the conditions into two statements, we can be sure that the out-of-bound error will not happen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By "out of bound", do you mean it exceeds 100 length limit? You can just switch new line after &&

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I meant the upper bound for the array of bins in a histogram. The default length of the histogram bin array is 254. When i is equal to 253 (the last bin), then i+1 is 254 leading to out-of-bound error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just move this condition after the length check:

if ((value == histogram.bins(i).hi) && (i < histogram.bins.length - 1) && (value == histogram.bins(i + 1).lo))

* @param histogram a numeric equi-height histogram
* @return the number of the first bin into which a column values falls.
*/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you remove the empty line between method comment and its definition?
same for other methods here.

histogram: Histogram): Double = {
// find bins where current min and max locate
val minBinId = findFirstBinForValue(lowerEnd, histogram)
val maxBinId = findLastBinForValue(higherEnd, histogram)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about lowerBinId, higherBinId?

1.0
} else if (binId == 0 && curBin.hi != curBin.lo) {
if (higherValue == lowerValue) {
// in the case curBin.binNdv == 0, current bin is occupied by one value, which
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

binNdv will never be zero

} else if (lowerId == higherId) {
getOccupation(lowerId, higherEnd, lowerEnd, histogram) * histogram.bins(lowerId).ndv
} else {
// compute how much lowerEnd/higherEnd occupy its bin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: occupies its bin

@SparkQA
Copy link

SparkQA commented Dec 1, 2017

Test build #84365 has finished for PR 19783 at commit 6e6c49b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 2, 2017

Test build #84385 has finished for PR 19783 at commit 9d2a463.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

def findFirstBinForValue(value: Double, bins: Array[HistogramBin]): Int = {
var binId = 0
bins.foreach { bin =>
if (value > bin.hi) binId += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks more like a while loop pattern, can we use while loop here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Actually while loop is better because it can exit early when the condition no longer qualifies.

* @param bins an array of bins for a given numeric equi-height histogram
* @return the number of the last bin into which a column values falls.
*/
def findLastBinForValue(value: Double, bins: Array[HistogramBin]): Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this method so different from findFirstBinForValue? It looks like we just need to reverse the iteration order, i.e. from bins.length to 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. We can simplify the logic by iterating from bins.length-1 to 0.

1.0 / curBin.ndv.toDouble
} else {
// Use proration since the range falls inside this bin.
(higherValue - lowerValue) / (curBin.hi - curBin.lo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the only branch we need to specialize for binId=0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to specialize it?

* @param higherEnd a given upper bound value of a specified column value range
* @param lowerEnd a given lower bound value of a specified column value range
* @param histogram a numeric equi-height histogram
* @return the selectivity percentage for column values in [lowerEnd, higherEnd].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't match the java doc: Returns the number of bins...

@SparkQA
Copy link

SparkQA commented Dec 6, 2017

Test build #84517 has finished for PR 19783 at commit d068888.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*
* @param value a literal value of a column
* @param bins an array of bins for a given numeric equi-height histogram
* @return the number of the first bin into which a column values falls.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the id of the first bin into which the given value falls.

def findFirstBinForValue(value: Double, bins: Array[HistogramBin]): Int = {
var i = 0
while ((i < bins.length) && (value > bins(i).hi)) {
i +=1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s space after +=

@@ -114,4 +114,171 @@ object EstimationUtils {
}
}

/**
* Returns the number of the first bin into which a column values falls for a specified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}

/**
* Returns the number of the last bin into which a column values falls for a specified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

*
* @param value a literal value of a column
* @param bins an array of bins for a given numeric equi-height histogram
* @return the number of the last bin into which a column values falls.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

def findLastBinForValue(value: Double, bins: Array[HistogramBin]): Int = {
var i = bins.length - 1
while ((i >= 0) && (value < bins(i).lo)) {
i -=1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a space after -=

binId: Int,
higherValue: Double,
lowerValue: Double,
histogram: Histogram): Double = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the method signature looks weird, shouldn't it be

private def getOccupation(
  higherValue: Double,
  lowerValue: Double,
  bin: HistogramBin)

val curBin = histogram.bins(binId)
if (curBin.hi == curBin.lo) {
// the entire bin is covered in the range
1.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it, shouldn't we check lowerValue <= curBin.lo <= higherValue here?

1.0
} else if (higherValue == lowerValue) {
// set percentage to 1/NDV
1.0 / curBin.ndv.toDouble
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we check the lowerValue/higherValues fits in the bin value range?

// of next bin is equal to the hi value of the previous bin. We bump up
// ndv value only if the hi values of two consecutive bins are different.
var middleNdv: Long = 0
for (i <- histogram.bins.indices) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again this is a typical while loop pattern.

var middleNdv: Long = 0
for (i <- histogram.bins.indices) {
val bin = histogram.bins(i)
if (bin.hi != bin.lo && i >= lowerId + 1 && i <= higherId - 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var i = lowerId + 1
while (i < higherId) {
  ...
  i += 1
}

// The total ndv is minPartNdv + maxPartNdv + Ndvs between them.
// In order to avoid counting same distinct value twice, we check if the upperBound value
// of next bin is equal to the hi value of the previous bin. We bump up
// ndv value only if the hi values of two consecutive bins are different.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't match the code, the actual logic is: only if the lo and hi values of the bin are different

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change the comment so that it matches with the code. Actually my original comment means the same thing as your comment. This is because the hi value of a bin is equal to the lo value of the next bin.

// Here we traverse histogram bins to locate the range of bins the literal values falls
// into. For skewed distribution, a literal value can occupy multiple bins.
val hgmBins = colStat.histogram.get.bins
val datum = EstimationUtils.toDecimal(literal.value, literal.dataType).toDouble
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @wzhfy , you would refactor this part to always use Double for CBO computing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I'll refactor this part.

((datum == hgmBins(i).hi) && (datum < hgmBins(i + 1).hi))) {
higherId = i
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

var lowerId = -1
var highIdFound = false
var i = 0
while (i < hgmBins.length || highIdFound) {
  if (datum <= hgmBins(i).hi && lowerId < 0) lowerId = i
  if (datum >= hgmBins(i).lo) highIdFound = true
}
val highId = i

@cloud-fan
Copy link
Contributor

LGTM overall


// compute how many bins the column's current valid range [min, max] occupies.
// Note that a column's [min, max] range may vary after we apply some filter conditions.
val minToMaxLength = EstimationUtils.getOccupationBins(maxBinId, minBinId, max,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I prefer to have this method unit-tested, because it's the core part of filter estimation. We can do this in follow-up anyway.

@SparkQA
Copy link

SparkQA commented Dec 11, 2017

Test build #84700 has finished for PR 19783 at commit be1e7ba.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ron8hu
Copy link
Contributor Author

ron8hu commented Dec 11, 2017

retest this please.

@SparkQA
Copy link

SparkQA commented Dec 11, 2017

Test build #84725 has finished for PR 19783 at commit be1e7ba.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Dec 12, 2017

Test build #84732 has finished for PR 19783 at commit be1e7ba.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ron8hu
Copy link
Contributor Author

ron8hu commented Dec 12, 2017

For the past 2 test builds #84725 and #84732, I checked the test result on the web. Actually there were no failures. See https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84725/testReport/. It appears that there is a bug in the jenkins test system.

@ron8hu
Copy link
Contributor Author

ron8hu commented Dec 12, 2017

retest this please.

@SparkQA
Copy link

SparkQA commented Dec 12, 2017

Test build #84751 has finished for PR 19783 at commit be1e7ba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except some code style issue, we can address them later

@@ -114,4 +114,99 @@ object EstimationUtils {
}
}

/**
* Returns the number of the first bin into which a column value falls for a specified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: number -> index?

*
* @param value a literal value of a column
* @param bins an array of bins for a given numeric equi-height histogram
* @return the id of the first bin into which a column value falls.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this is redundant, shall we remove it?

}

/**
* Returns the number of the last bin into which a column value falls for a specified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

*
* @param value a literal value of a column
* @param bins an array of bins for a given numeric equi-height histogram
* @return the id of the last bin into which a column value falls.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

* @param higherValue a given upper bound value of a specified column value range
* @param lowerValue a given lower bound value of a specified column value range
* @param bin a single histogram bin
* @return the percentage of a single bin holding values in [lowerValue, higherValue].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant

* double value is because we may return a portion of a bin. For example, a predicate
* "column = 8" may return the number of bins 0.2 if the holding bin has 5 distinct values.
*
* @param higherId id of the high end bin holding the high end value of a column range
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: higherIndex

} else {
// compute how much lowerEnd/higherEnd occupies its bin
val lowerCurBin = histogram.bins(lowerId)
val lowerPart = getOccupation(lowerCurBin.hi, lowerEnd, lowerCurBin)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we assert that lowerBin.lo <= lowerEnd

// returns 1/ndv if there is no histogram
Some(1.0 / BigDecimal(ndv))
} else {
// We compute filter selectivity using Histogram information.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you create a new method?

// range may change due to another condition applied earlier.
val min = EstimationUtils.toDecimal(colStat.min.get, literal.dataType).toDouble
val max = EstimationUtils.toDecimal(colStat.max.get, literal.dataType).toDouble
val minBinId = EstimationUtils.findFirstBinForValue(min, hgmBins)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: minBinIndex

val lowerBinNdv = hgmBins(lowerBinId).ndv
val higherBinNdv = hgmBins(higherBinId).ndv
// assume uniform distribution in each bin
val occupiedBins = if (lowerBinId == higherBinId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this just EstimationUtils.getOccupationBins(higherBinId, lowerBinId, datum, datum, histogram)?

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in ecc179e Dec 12, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants