Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19350] [SQL] Cardinality estimation of Limit and Sample #16696

Closed
wants to merge 3 commits into from

Conversation

Projects
None yet
6 participants
@wzhfy
Copy link
Contributor

commented Jan 25, 2017

What changes were proposed in this pull request?

Before this pr, LocalLimit/GlobalLimit/Sample propagates the same row count and column stats from its child, which is incorrect.
We can get the correct rowCount in Statistics for GlobalLimit/Sample whether cbo is enabled or not.
We don't know the rowCount for LocalLimit because we don't know the partition number at that time. Column stats should not be propagated because we don't know the distribution of columns after Limit or Sample.

How was this patch tested?

Added test cases.

@wzhfy wzhfy force-pushed the wzhfy:limitEstimation branch Jan 25, 2017

@wzhfy

This comment has been minimized.

Copy link
Contributor Author

commented Jan 25, 2017

cc @cloud-fan @gatorsmile please review

@SparkQA

This comment has been minimized.

Copy link

commented Jan 25, 2017

Test build #71963 has finished for PR 16696 at commit 62013f5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented Jan 25, 2017

Test build #71964 has finished for PR 16696 at commit b88fac5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class LimitNode extends UnaryNode
  • case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends LimitNode
  • case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends LimitNode
@SparkQA

This comment has been minimized.

Copy link

commented Jan 25, 2017

Test build #71988 has finished for PR 16696 at commit 05fcd81.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
...alyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala Outdated
val childStats = child.stats(conf)
// Don't propagate column stats, because we don't know the distribution after a limit operation
Statistics(
sizeInBytes = EstimationUtils.getOutputSize(output, limit, childStats.attributeStats),

This comment has been minimized.

Copy link
@viirya

viirya Jan 26, 2017

Contributor

Why don't we use childStats.rowCount? If childStats.rowCount is less than limit number, I think we should use it instead of limit.

This comment has been minimized.

Copy link
@gatorsmile

gatorsmile Jan 26, 2017

Member

I think @wzhfy is just keeping the existing code logics. Sure, we can improve it.

This comment has been minimized.

Copy link
@viirya

viirya Jan 27, 2017

Contributor

We should. Otherwise the rowCount is not correct.

This comment has been minimized.

Copy link
@ron8hu

ron8hu Jan 31, 2017

Contributor

Agreed. We can pick the smaller value between the child node's row count and the limit number.

...main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala Outdated
@@ -29,6 +31,8 @@ object EstimationUtils {
def rowCountsExist(conf: CatalystConf, plans: LogicalPlan*): Boolean =
plans.forall(_.stats(conf).rowCount.isDefined)

def ceil(bigDecimal: BigDecimal): BigInt = bigDecimal.setScale(0, RoundingMode.CEILING).toBigInt()

This comment has been minimized.

Copy link
@gatorsmile

gatorsmile Jan 26, 2017

Member

ceil -> ceiling

...alyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala Outdated
if (sizeInBytes == 0) {
sizeInBytes = 1
}
child.stats(conf).copy(sizeInBytes = sizeInBytes)
val sampledNumber = childStats.rowCount.map(c => EstimationUtils.ceil(BigDecimal(c) * ratio))

This comment has been minimized.

Copy link
@gatorsmile

gatorsmile Jan 26, 2017

Member

sampledNumber -> sampledRowCount

@@ -727,37 +728,18 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN
}
override def computeStats(conf: CatalystConf): Statistics = {
val limit = limitExpr.eval().asInstanceOf[Int]

This comment has been minimized.

Copy link
@gatorsmile

gatorsmile Jan 26, 2017

Member

To make the stats more accurate, yes, we can use a smaller number between childStats.rowCounts and limit as outputRowCount of getOutputSize

...lyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationSuite.scala Outdated
outputList = Seq(ar),
attributeStats = AttributeMap(Seq(ar -> colStat)),
rowCount = 10,
size = Some(10 * (8 + 4)))

This comment has been minimized.

Copy link
@gatorsmile

gatorsmile Jan 26, 2017

Member

I still prefer to adding a comment above this line:

      // rowCount * (overhead + column size)
...lyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationSuite.scala Outdated
@@ -48,6 +77,14 @@ class StatsConfSuite extends StatsEstimationTestBase {
// Return the simple statistics
assert(plan.stats(conf.copy(cboEnabled = false)) == expectedDefaultStats)

This comment has been minimized.

Copy link
@gatorsmile

gatorsmile Jan 26, 2017

Member

Could you replace the above three lines by checkStats?

...lyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationSuite.scala Outdated
@@ -48,6 +77,14 @@ class StatsConfSuite extends StatsEstimationTestBase {
// Return the simple statistics
assert(plan.stats(conf.copy(cboEnabled = false)) == expectedDefaultStats)
}

/** Check estimated stats which is the same when cbo is turned on/off. */
private def checkStats(plan: LogicalPlan, expected: Statistics): Unit = {

This comment has been minimized.

Copy link
@gatorsmile

gatorsmile Jan 26, 2017

Member

You know, this is a utility function. We can make it more general by having two expected stats values

...lyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationSuite.scala Outdated

// Test if Sample's child doesn't have rowCount in stats
val stats2 = Statistics(sizeInBytes = 120)
val plan2 = DummyLogicalPlan(stats2, stats2)

This comment has been minimized.

Copy link
@gatorsmile

gatorsmile Jan 26, 2017

Member

rename plan2 to childPlan

...lyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationSuite.scala Outdated
checkStats(sample, expected = Statistics(sizeInBytes = 60, rowCount = Some(5)))

// Test if Sample's child doesn't have rowCount in stats
val stats2 = Statistics(sizeInBytes = 120)

This comment has been minimized.

Copy link
@gatorsmile

gatorsmile Jan 26, 2017

Member

The same here

This comment has been minimized.

Copy link
@ron8hu

ron8hu Feb 4, 2017

Contributor

For limit estimation test cases, we may add a test with limit number greater than a child node's row count. This test can show if we properly select the smaller value between limit number child node's row count.

@gatorsmile

This comment has been minimized.

Copy link
Member

commented Jan 26, 2017

Overall looks good to me. : ) Could you add a few more test cases?

  • One is the child has less row counts than the limit.
  • Another is having zero row counts but sizeInBytes is not zero.
...alyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala Outdated
// Don't propagate column stats, because we don't know the distribution after a limit operation
Statistics(
sizeInBytes = EstimationUtils.getOutputSize(output, limit, childStats.attributeStats),
rowCount = Some(limit),

This comment has been minimized.

Copy link
@viirya

viirya Jan 26, 2017

Contributor

Actually the rowCount for LocalLimit and GlobalLimit should be different. For LocalLimit, limit is just the row count for one partition. But we can't get the number of partitions here, I think. As the actual row number might be quite bigger than the limit, maybe we should set it as None.

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Feb 7, 2017

Contributor

This is a good point, maybe we should still separate the stats calculation of GlobalLimit and LocalLimit.

...lyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationSuite.scala Outdated
checkStats(sample, expected = Statistics(sizeInBytes = 60, rowCount = Some(5)))

// Test if Sample's child doesn't have rowCount in stats
val stats2 = Statistics(sizeInBytes = 120)

This comment has been minimized.

Copy link
@ron8hu

ron8hu Feb 4, 2017

Contributor

For limit estimation test cases, we may add a test with limit number greater than a child node's row count. This test can show if we properly select the smaller value between limit number child node's row count.

...alyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala Outdated
@@ -717,7 +717,8 @@ object Limit {
}
}

case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
abstract class LimitNode extends UnaryNode {
def limitExpr: Expression

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Feb 7, 2017

Contributor

do we assume limitExpr is foldable? but seems there is no type checking logic for it.

@wzhfy wzhfy force-pushed the wzhfy:limitEstimation branch Feb 24, 2017

@SparkQA

This comment has been minimized.

Copy link

commented Feb 24, 2017

Test build #73372 has finished for PR 16696 at commit 5692939.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@wzhfy

This comment has been minimized.

Copy link
Contributor Author

commented Feb 24, 2017

retest this please

@SparkQA

This comment has been minimized.

Copy link

commented Feb 24, 2017

Test build #73391 has finished for PR 16696 at commit 5692939.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@wzhfy

This comment has been minimized.

Copy link
Contributor Author

commented Feb 24, 2017

@cloud-fan @gatorsmile I've updated this pr and also added test cases, please review.

@cloud-fan

This comment has been minimized.

Copy link
Contributor

commented Mar 3, 2017

retest this please

import org.apache.spark.sql.types.IntegerType


class StatsConfSuite extends StatsEstimationTestBase {

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Mar 3, 2017

Contributor

why remove this test suite?

This comment has been minimized.

Copy link
@wzhfy

wzhfy Mar 3, 2017

Author Contributor

I didn't remove it, just renamed it.

This comment has been minimized.

Copy link
@gatorsmile

gatorsmile Mar 3, 2017

Member

Can you use git mv? Then, it will keep the change history.

This comment has been minimized.

Copy link
@wzhfy

wzhfy Mar 4, 2017

Author Contributor

How to use git mv now? Do I need to revert to the unchanged version, and git mv, and then do all the changes all over again?

This comment has been minimized.

Copy link
@gatorsmile

gatorsmile Mar 6, 2017

Member

yes. :)

@SparkQA

This comment has been minimized.

Copy link

commented Mar 3, 2017

Test build #73824 has started for PR 16696 at commit 5692939.

...lyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationSuite.scala Outdated
import org.apache.spark.sql.types.IntegerType


class StatsEstimationSuite extends StatsEstimationTestBase {

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Mar 3, 2017

Contributor

BasicStatsEstimationSuite?

This comment has been minimized.

Copy link
@wzhfy

wzhfy Mar 3, 2017

Author Contributor

Good name:)

sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala Outdated
@@ -116,22 +116,22 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
withTempView("test") {

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Mar 3, 2017

Contributor

is this test duplicated with the newly added limit test?

This comment has been minimized.

Copy link
@wzhfy

wzhfy Mar 3, 2017

Author Contributor

yea I think so, let me remove it.

@wzhfy wzhfy force-pushed the wzhfy:limitEstimation branch Mar 3, 2017

@SparkQA

This comment has been minimized.

Copy link

commented Mar 3, 2017

Test build #73845 has finished for PR 16696 at commit 516b114.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class BasicStatsEstimationSuite extends StatsEstimationTestBase
...alyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala Outdated
(limit: Long) * output.map(a => a.dataType.defaultSize).sum
// The output row count of LocalLimit should be the sum of row count from each partition, but
// since the partition number is not available here, we just use statistics of the child
// except column stats, because we don't know the distribution after a limit operation

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Mar 3, 2017

Contributor

but I think the max/min should still be corrected?

This comment has been minimized.

Copy link
@wzhfy

wzhfy Mar 4, 2017

Author Contributor

How can we make sure max/min values are still there after limit? Otherwise it will be a very loose bound of max/min.

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Mar 4, 2017

Contributor

hmm, what's the strategy here? is a loose bound better than nothing?

This comment has been minimized.

Copy link
@wzhfy

wzhfy Mar 4, 2017

Author Contributor

A loose bound can lead to significant under-estimation. E.g. a > 50, after local limit the actual range is [40, 60], while max and min in stats are still [0, 60], then the filter factor will be 1/6 instead of 1/2.

@wzhfy

This comment has been minimized.

Copy link
Contributor Author

commented Mar 6, 2017

@cloud-fan Does this look good to you now?

...src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala Outdated


class BasicStatsEstimationSuite extends StatsEstimationTestBase {
val (ar, colStat) = (attr("key"), ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Mar 6, 2017

Contributor

nit:

val attr = ...
vak colStat = ...
@cloud-fan

This comment has been minimized.

Copy link
Contributor

commented Mar 6, 2017

LGTM except one minor comment

...alyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala Outdated
(limit: Long) * output.map(a => a.dataType.defaultSize).sum
// The output row count of LocalLimit should be the sum of row count from each partition, but
// since the partition number is not available here, we just use statistics of the child
// except column stats, because we don't know the distribution after a limit operation

This comment has been minimized.

Copy link
@gatorsmile

gatorsmile Mar 6, 2017

Member

Nit: the whole sentence does not have a period. How about rewriting it like?

The output row count of LocalLimit should be the sum of row counts from each partition. However, since the number of partitions is not available here, we just use statistics of the child. Because the distirubtion after a limit operation is unknown, we do not propapage the column stats.

...alyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala Outdated
child.stats(conf).copy(sizeInBytes = sizeInBytes)
val childStats = child.stats(conf)
val rowCount: BigInt =
if (childStats.rowCount.isDefined) childStats.rowCount.get.min(limit) else limit

This comment has been minimized.

Copy link
@gatorsmile

gatorsmile Mar 6, 2017

Member

Nit: val rowCount: BigInt = childStats.rowCount.map(_.min(limit)).getOrElse(limit)

...alyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala Outdated
// The output row count of LocalLimit should be the sum of row count from each partition, but
// since the partition number is not available here, we just use statistics of the child
// except column stats, because we don't know the distribution after a limit operation
child.stats(conf).copy(attributeStats = AttributeMap(Nil))

This comment has been minimized.

Copy link
@gatorsmile

gatorsmile Mar 6, 2017

Member

Nit: childStats.copy(attributeStats = AttributeMap(Nil))

@gatorsmile

This comment has been minimized.

Copy link
Member

commented Mar 6, 2017

LGTM except minor comments.

@wzhfy wzhfy force-pushed the wzhfy:limitEstimation branch to 0c42ea2 Mar 7, 2017

@SparkQA

This comment has been minimized.

Copy link

commented Mar 7, 2017

Test build #74060 has finished for PR 16696 at commit 0c42ea2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@gatorsmile

This comment has been minimized.

Copy link
Member

commented Mar 7, 2017

Thanks! Merging to master.

@asfgit asfgit closed this in 9909f6d Mar 7, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.