From 7d363294b7af212836e7a444ad82c716f3560278 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 22 Feb 2022 21:04:43 +0800 Subject: [PATCH] [SPARK-38271] PoissonSampler may output more rows than MaxRows ### What changes were proposed in this pull request? when `replacement=true`, `Sample.maxRows` returns `None` ### Why are the changes needed? the underlying impl of `SampleExec` can not guarantee that its number of output rows <= `Sample.maxRows` ``` scala> val df = spark.range(0, 1000) df: org.apache.spark.sql.Dataset[Long] = [id: bigint] scala> df.count res0: Long = 1000 scala> df.sample(true, 0.999999, 10).count res1: Long = 1004 ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing testsuites Closes #35593 from zhengruifeng/fix_sample_maxRows. Authored-by: Ruifeng Zheng Signed-off-by: Wenchen Fan (cherry picked from commit b68327968a7a5f7ac1afa9cc270204c9eaddcb75) Signed-off-by: Wenchen Fan --- .../plans/logical/basicLogicalOperators.scala | 6 +++++- .../catalyst/optimizer/CombiningLimitsSuite.scala | 13 +++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 7f33f28bbaa13..6748db57d9b7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -1344,7 +1344,11 @@ case class Sample( s"Sampling fraction ($fraction) must be on interval [0, 1] without replacement") } - override def maxRows: Option[Long] = child.maxRows + override def maxRows: Option[Long] = { + // when withReplacement is true, PoissonSampler is applied in SampleExec, + // which may output more rows than child.maxRows. + if (withReplacement) None else child.maxRows + } override def output: Seq[Attribute] = child.output override protected def withNewChildInternal(newChild: LogicalPlan): Sample = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala index 46e9dea730eb7..d3cbaa8c41e2d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala @@ -159,6 +159,19 @@ class CombiningLimitsSuite extends PlanTest { ) } + test("SPARK-38271: PoissonSampler may output more rows than child.maxRows") { + val query = testRelation.select().sample(0, 0.2, true, 1) + assert(query.maxRows.isEmpty) + val optimized = Optimize.execute(query.analyze) + assert(optimized.maxRows.isEmpty) + // can not eliminate Limit since Sample.maxRows is None + checkPlanAndMaxRow( + query.limit(10), + query.limit(10), + 10 + ) + } + test("SPARK-33497: Eliminate Limit if Deduplicate max rows not larger than Limit") { checkPlanAndMaxRow( testRelation.deduplicate("a".attr).limit(10),