Skip to content

Commit

Permalink
[SPARK-38768][SQL] Remove Limit from plan if complete push down limit…
Browse files Browse the repository at this point in the history
… to data source (apache#474)

* [SPARK-38768][SQL] Remove `Limit` from plan if complete push down limit to data source

### What changes were proposed in this pull request?
Currently, Spark supports push down limit to data source.
If limit could pushed down and Data source only have one partition, DS V2 still do limit again.
This PR want remove `Limit` from plan if complete push down limit to data source.

### Why are the changes needed?
Improve performance.

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
Tests updated.

Closes apache#36043 from beliefer/SPARK-38768.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-38391][SPARK-38768][SQL][FOLLOWUP] Add comments for `pushLimit` and `pushTopN` of `PushDownUtils`

### What changes were proposed in this pull request?
`pushLimit` and `pushTopN` of `PushDownUtils` returns tuple of boolean. It will be good to explain what the boolean value represents.

### Why are the changes needed?
Make DS V2 API more friendly to developers.

### Does this PR introduce _any_ user-facing change?
'No'.
Just update comments.

### How was this patch tested?
N/A

Closes apache#36092 from beliefer/SPARK-38391_SPARK-38768_followup.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>

* [SPARK-37960][SQL][FOLLOWUP] Make the testing CASE WHEN query more reasonable

### What changes were proposed in this pull request?
Some testing CASE WHEN queries are not carefully written and do not make sense. In the future, the optimizer may get smarter and get rid of the CASE WHEN completely, and then we loose test coverage.

This PR updates some CASE WHEN queries to make them more reasonable.

### Why are the changes needed?
future-proof test coverage.

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
N/A

Closes apache#36125 from beliefer/SPARK-37960_followup3.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* update spark version

Co-authored-by: Jiaan Geng <beliefer@163.com>
  • Loading branch information
chenzhx and beliefer committed Jun 2, 2022
1 parent 3477b8c commit 0553594
Show file tree
Hide file tree
Showing 40 changed files with 105 additions and 62 deletions.
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/kvstore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/sketch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/tags/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/unsafe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/docker-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kafka-0-10-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kafka-0-10-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kafka-0-10-token-provider/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kafka-0-10/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kinesis-asl-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kinesis-asl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/spark-ganglia-lgpl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion graphx/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion hadoop-cloud/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion launcher/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion mllib-local/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion mllib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<packaging>pom</packaging>
<name>Spark Project Parent POM</name>
<url>http://spark.apache.org/</url>
Expand Down
2 changes: 1 addition & 1 deletion repl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion resource-managers/kubernetes/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion resource-managers/mesos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion resource-managers/yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion sql/catalyst/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,10 @@ public interface SupportsPushDownLimit extends ScanBuilder {
* Pushes down LIMIT to the data source.
*/
boolean pushLimit(int limit);

/**
* Whether the LIMIT is partially pushed or not. If it returns true, then Spark will do LIMIT
* again. This method will only be called when {@link #pushLimit} returns true.
*/
default boolean isPartiallyPushed() { return true; }
}
2 changes: 1 addition & 1 deletion sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.2.0-kylin-4.x-r68</version>
<version>3.2.0-kylin-4.x-r69</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,26 @@ object PushDownUtils extends PredicateHelper {
}

/**
* Pushes down LIMIT to the data source Scan
* Pushes down LIMIT to the data source Scan.
*
* @return the tuple of Boolean. The first Boolean value represents whether to push down, and
* the second Boolean value represents whether to push down partially, which means
* Spark will keep the Limit and do it again.
*/
def pushLimit(scanBuilder: ScanBuilder, limit: Int): Boolean = {
def pushLimit(scanBuilder: ScanBuilder, limit: Int): (Boolean, Boolean) = {
scanBuilder match {
case s: SupportsPushDownLimit =>
s.pushLimit(limit)
case _ => false
case s: SupportsPushDownLimit if s.pushLimit(limit) =>
(true, s.isPartiallyPushed)
case _ => (false, false)
}
}

/**
* Pushes down top N to the data source Scan
* Pushes down top N to the data source Scan.
*
* @return the tuple of Boolean. The first Boolean value represents whether to push down, and
* the second Boolean value represents whether to push down partially, which means
* Spark will keep the Sort and Limit and do it again.
*/
def pushTopN(
scanBuilder: ScanBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,13 +360,13 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit
}
}

private def pushDownLimit(plan: LogicalPlan, limit: Int): LogicalPlan = plan match {
private def pushDownLimit(plan: LogicalPlan, limit: Int): (LogicalPlan, Boolean) = plan match {
case operation @ ScanOperation(_, filter, sHolder: ScanBuilderHolder) if filter.isEmpty =>
val limitPushed = PushDownUtils.pushLimit(sHolder.builder, limit)
if (limitPushed) {
val (isPushed, isPartiallyPushed) = PushDownUtils.pushLimit(sHolder.builder, limit)
if (isPushed) {
sHolder.pushedLimit = Some(limit)
}
operation
(operation, isPushed && !isPartiallyPushed)
case s @ Sort(order, _, operation @ ScanOperation(project, filter, sHolder: ScanBuilderHolder))
if filter.isEmpty && CollapseProject.canCollapseExpressions(
order, project, alwaysInline = true) =>
Expand All @@ -380,27 +380,32 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit
sHolder.pushedLimit = Some(limit)
sHolder.sortOrders = orders
if (isPartiallyPushed) {
s
(s, false)
} else {
operation
(operation, true)
}
} else {
s
(s, false)
}
} else {
s
(s, false)
}
case p: Project =>
val newChild = pushDownLimit(p.child, limit)
p.withNewChildren(Seq(newChild))
case other => other
val (newChild, isPartiallyPushed) = pushDownLimit(p.child, limit)
(p.withNewChildren(Seq(newChild)), isPartiallyPushed)
case other => (other, false)
}

def applyLimit(plan: LogicalPlan): LogicalPlan = plan.transform {
case globalLimit @ Limit(IntegerLiteral(limitValue), child) =>
val newChild = pushDownLimit(child, limitValue)
val newLocalLimit = globalLimit.child.asInstanceOf[LocalLimit].withNewChildren(Seq(newChild))
globalLimit.withNewChildren(Seq(newLocalLimit))
val (newChild, canRemoveLimit) = pushDownLimit(child, limitValue)
if (canRemoveLimit) {
newChild
} else {
val newLocalLimit =
globalLimit.child.asInstanceOf[LocalLimit].withNewChildren(Seq(newChild))
globalLimit.withNewChildren(Seq(newLocalLimit))
}
}

private def getWrappedScan(
Expand Down
Loading

0 comments on commit 0553594

Please sign in to comment.