Skip to content

Commit

Permalink
[SPARK-38768][SQL] Remove Limit from plan if complete push down limit…
Browse files Browse the repository at this point in the history
… to data source (#474)

* [SPARK-38768][SQL] Remove `Limit` from plan if complete push down limit to data source

Currently, Spark supports push down limit to data source.
If limit could pushed down and Data source only have one partition, DS V2 still do limit again.
This PR want remove `Limit` from plan if complete push down limit to data source.

Improve performance.

'No'.
New feature.

Tests updated.

Closes apache#36043 from beliefer/SPARK-38768.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-38391][SPARK-38768][SQL][FOLLOWUP] Add comments for `pushLimit` and `pushTopN` of `PushDownUtils`

`pushLimit` and `pushTopN` of `PushDownUtils` returns tuple of boolean. It will be good to explain what the boolean value represents.

Make DS V2 API more friendly to developers.

'No'.
Just update comments.

N/A

Closes apache#36092 from beliefer/SPARK-38391_SPARK-38768_followup.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>

* [SPARK-37960][SQL][FOLLOWUP] Make the testing CASE WHEN query more reasonable

Some testing CASE WHEN queries are not carefully written and do not make sense. In the future, the optimizer may get smarter and get rid of the CASE WHEN completely, and then we loose test coverage.

This PR updates some CASE WHEN queries to make them more reasonable.

future-proof test coverage.

'No'.

N/A

Closes apache#36125 from beliefer/SPARK-37960_followup3.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* update spark version

Co-authored-by: Jiaan Geng <beliefer@163.com>
  • Loading branch information
2 people authored and RolatZhang committed Aug 28, 2023
1 parent ddadcf4 commit 30a1edd
Show file tree
Hide file tree
Showing 40 changed files with 90 additions and 55 deletions.
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/kvstore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/sketch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/tags/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/unsafe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/docker-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kafka-0-10-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kafka-0-10-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kafka-0-10-token-provider/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kafka-0-10/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kinesis-asl-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kinesis-asl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/spark-ganglia-lgpl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion graphx/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion hadoop-cloud/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion launcher/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion mllib-local/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion mllib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<packaging>pom</packaging>
<name>Spark Project Parent POM</name>
<url>https://spark.apache.org/</url>
Expand Down
2 changes: 1 addition & 1 deletion repl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion resource-managers/kubernetes/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion resource-managers/mesos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion resource-managers/yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion sql/catalyst/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,10 @@ public interface SupportsPushDownLimit extends ScanBuilder {
* Pushes down LIMIT to the data source.
*/
boolean pushLimit(int limit);

/**
* Whether the LIMIT is partially pushed or not. If it returns true, then Spark will do LIMIT
* again. This method will only be called when {@link #pushLimit} returns true.
*/
default boolean isPartiallyPushed() { return true; }
}
2 changes: 1 addition & 1 deletion sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r66</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ object PushDownUtils extends PredicateHelper {
* the second Boolean value represents whether to push down partially, which means
* Spark will keep the Limit and do it again.
*/
def pushLimit(scanBuilder: ScanBuilder, limit: Int): Boolean = {
def pushLimit(scanBuilder: ScanBuilder, limit: Int): (Boolean, Boolean) = {
scanBuilder match {
case s: SupportsPushDownLimit =>
s.pushLimit(limit)
case _ => false
case s: SupportsPushDownLimit if s.pushLimit(limit) =>
(true, s.isPartiallyPushed)
case _ => (false, false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,13 +369,13 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit
}
}

private def pushDownLimit(plan: LogicalPlan, limit: Int): LogicalPlan = plan match {
private def pushDownLimit(plan: LogicalPlan, limit: Int): (LogicalPlan, Boolean) = plan match {
case operation @ ScanOperation(_, filter, sHolder: ScanBuilderHolder) if filter.isEmpty =>
val limitPushed = PushDownUtils.pushLimit(sHolder.builder, limit)
if (limitPushed) {
val (isPushed, isPartiallyPushed) = PushDownUtils.pushLimit(sHolder.builder, limit)
if (isPushed) {
sHolder.pushedLimit = Some(limit)
}
operation
(operation, isPushed && !isPartiallyPushed)
case s @ Sort(order, _, operation @ ScanOperation(project, filter, sHolder: ScanBuilderHolder))
if filter.isEmpty && CollapseProject.canCollapseExpressions(
order, project, alwaysInline = true) =>
Expand All @@ -389,27 +389,32 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit
sHolder.pushedLimit = Some(limit)
sHolder.sortOrders = orders
if (isPartiallyPushed) {
s
(s, false)
} else {
operation
(operation, true)
}
} else {
s
(s, false)
}
} else {
s
(s, false)
}
case p: Project =>
val newChild = pushDownLimit(p.child, limit)
p.withNewChildren(Seq(newChild))
case other => other
val (newChild, isPartiallyPushed) = pushDownLimit(p.child, limit)
(p.withNewChildren(Seq(newChild)), isPartiallyPushed)
case other => (other, false)
}

def pushDownLimits(plan: LogicalPlan): LogicalPlan = plan.transform {
case globalLimit @ Limit(IntegerLiteral(limitValue), child) =>
val newChild = pushDownLimit(child, limitValue)
val newLocalLimit = globalLimit.child.asInstanceOf[LocalLimit].withNewChildren(Seq(newChild))
globalLimit.withNewChildren(Seq(newLocalLimit))
val (newChild, canRemoveLimit) = pushDownLimit(child, limitValue)
if (canRemoveLimit) {
newChild
} else {
val newLocalLimit =
globalLimit.child.asInstanceOf[LocalLimit].withNewChildren(Seq(newChild))
globalLimit.withNewChildren(Seq(newLocalLimit))
}
}

private def getWrappedScan(
Expand Down
Loading

0 comments on commit 30a1edd

Please sign in to comment.