Skip to content

Commit

Permalink
[SPARK-38391][SQL] Datasource v2 supports partial topN push-down
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Currently , Spark supports push down topN completely . But for some data source (e.g. JDBC ) that have multiple partition , we should preserve partial push down topN.

### Why are the changes needed?
Make behavior of sort pushdown correctly.

### Does this PR introduce _any_ user-facing change?
'No'. Just change the inner implement.

### How was this patch tested?
New tests.

Closes #35710 from beliefer/SPARK-38391.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit a8629a1)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
beliefer authored and cloud-fan committed Mar 28, 2022
1 parent bcd01bb commit ac93918
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,10 @@ public interface SupportsPushDownTopN extends ScanBuilder {
* Pushes down top N to the data source.
*/
boolean pushTopN(SortOrder[] orders, int limit);

/**
* Whether the top N is partially pushed or not. If it returns true, then Spark will do top N
* again. This method will only be called when {@link #pushTopN} returns true.
*/
default boolean isPartiallyPushed() { return true; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,14 @@ object PushDownUtils extends PredicateHelper {
/**
* Pushes down top N to the data source Scan
*/
def pushTopN(scanBuilder: ScanBuilder, order: Array[SortOrder], limit: Int): Boolean = {
def pushTopN(
scanBuilder: ScanBuilder,
order: Array[SortOrder],
limit: Int): (Boolean, Boolean) = {
scanBuilder match {
case s: SupportsPushDownTopN =>
s.pushTopN(order, limit)
case _ => false
case s: SupportsPushDownTopN if s.pushTopN(order, limit) =>
(true, s.isPartiallyPushed)
case _ => (false, false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,11 +381,16 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit
val newOrder = order.map(replaceAlias(_, aliasMap)).asInstanceOf[Seq[SortOrder]]
val orders = DataSourceStrategy.translateSortOrders(newOrder)
if (orders.length == order.length) {
val topNPushed = PushDownUtils.pushTopN(sHolder.builder, orders.toArray, limit)
if (topNPushed) {
val (isPushed, isPartiallyPushed) =
PushDownUtils.pushTopN(sHolder.builder, orders.toArray, limit)
if (isPushed) {
sHolder.pushedLimit = Some(limit)
sHolder.sortOrders = orders
operation
if (isPartiallyPushed) {
s
} else {
operation
}
} else {
s
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ case class JDBCScanBuilder(
false
}

override def isPartiallyPushed(): Boolean = jdbcOptions.numPartitions.map(_ > 1).getOrElse(false)

override def pruneColumns(requiredSchema: StructType): Unit = {
// JDBC doesn't support nested column pruning.
// TODO (SPARK-32593): JDBC support nested column and nested column pruning.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,15 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
"PushedFilters: [], PushedTopN: ORDER BY [salary ASC NULLS FIRST] LIMIT 1, ")
checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))

val df2 = spark.read.table("h2.test.employee")
.where($"dept" === 1).orderBy($"salary").limit(1)
val df2 = spark.read
.option("partitionColumn", "dept")
.option("lowerBound", "0")
.option("upperBound", "2")
.option("numPartitions", "1")
.table("h2.test.employee")
.where($"dept" === 1)
.orderBy($"salary")
.limit(1)
checkSortRemoved(df2)
checkPushedInfo(df2, "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], " +
"PushedTopN: ORDER BY [salary ASC NULLS FIRST] LIMIT 1, ")
Expand All @@ -215,7 +222,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.filter($"dept" > 1)
.orderBy($"salary".desc)
.limit(1)
checkSortRemoved(df3)
checkSortRemoved(df3, false)
checkPushedInfo(df3, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], " +
"PushedTopN: ORDER BY [salary DESC NULLS LAST] LIMIT 1, ")
checkAnswer(df3, Seq(Row(2, "alex", 12000.00, 1200.0, false)))
Expand Down

0 comments on commit ac93918

Please sign in to comment.