From 30a1edd3bbe5c544ad569d356cc08eb4b0c8f0ef Mon Sep 17 00:00:00 2001 From: Zhixiong Chen Date: Thu, 2 Jun 2022 09:42:59 +0800 Subject: [PATCH] [SPARK-38768][SQL] Remove Limit from plan if complete push down limit to data source (#474) * [SPARK-38768][SQL] Remove `Limit` from plan if complete push down limit to data source Currently, Spark supports push down limit to data source. If limit could pushed down and Data source only have one partition, DS V2 still do limit again. This PR want remove `Limit` from plan if complete push down limit to data source. Improve performance. 'No'. New feature. Tests updated. Closes #36043 from beliefer/SPARK-38768. Authored-by: Jiaan Geng Signed-off-by: Wenchen Fan * [SPARK-38391][SPARK-38768][SQL][FOLLOWUP] Add comments for `pushLimit` and `pushTopN` of `PushDownUtils` `pushLimit` and `pushTopN` of `PushDownUtils` returns tuple of boolean. It will be good to explain what the boolean value represents. Make DS V2 API more friendly to developers. 'No'. Just update comments. N/A Closes #36092 from beliefer/SPARK-38391_SPARK-38768_followup. Authored-by: Jiaan Geng Signed-off-by: Dongjoon Hyun * [SPARK-37960][SQL][FOLLOWUP] Make the testing CASE WHEN query more reasonable Some testing CASE WHEN queries are not carefully written and do not make sense. In the future, the optimizer may get smarter and get rid of the CASE WHEN completely, and then we loose test coverage. This PR updates some CASE WHEN queries to make them more reasonable. future-proof test coverage. 'No'. N/A Closes #36125 from beliefer/SPARK-37960_followup3. Authored-by: Jiaan Geng Signed-off-by: Wenchen Fan * update spark version Co-authored-by: Jiaan Geng --- assembly/pom.xml | 2 +- common/kvstore/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml | 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- examples/pom.xml | 2 +- external/avro/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10-sql/pom.xml | 2 +- external/kafka-0-10-token-provider/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- hadoop-cloud/pom.xml | 2 +- launcher/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- resource-managers/kubernetes/core/pom.xml | 2 +- .../kubernetes/integration-tests/pom.xml | 2 +- resource-managers/mesos/pom.xml | 2 +- resource-managers/yarn/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- .../connector/read/SupportsPushDownLimit.java | 6 ++++ sql/core/pom.xml | 2 +- .../datasources/v2/PushDownUtils.scala | 8 ++--- .../v2/V2ScanRelationPushDown.scala | 33 +++++++++++-------- .../apache/spark/sql/jdbc/JDBCV2Suite.scala | 26 ++++++++++++++- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- 40 files changed, 90 insertions(+), 55 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index a56341cad8f13..3398506e512c7 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../pom.xml diff --git a/common/kvstore/pom.xml b/common/kvstore/pom.xml index 8930603640635..8f39d9ca2364d 100644 --- a/common/kvstore/pom.xml +++ b/common/kvstore/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index 4bbfbd78c374b..6cd081cbd6837 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index d65959cf5049d..6951f97ee3753 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index 5093c3cf8f3a3..3403a7070a5f4 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml index d0899fe6d9b6d..166c6a37bfdfa 100644 --- a/common/sketch/pom.xml +++ b/common/sketch/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/common/tags/pom.xml b/common/tags/pom.xml index f3101649aed60..4f7207197104d 100644 --- a/common/tags/pom.xml +++ b/common/tags/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml index 21e34bb61ccca..0ec7682eb79b6 100644 --- a/common/unsafe/pom.xml +++ b/common/unsafe/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/core/pom.xml b/core/pom.xml index 3186c6a2dcea7..d08d1fbf1f685 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../pom.xml diff --git a/examples/pom.xml b/examples/pom.xml index 41417c7f52cb8..4ef1240fe7f71 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../pom.xml diff --git a/external/avro/pom.xml b/external/avro/pom.xml index 8594e404f7119..97ea7ffdb1f59 100644 --- a/external/avro/pom.xml +++ b/external/avro/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/external/docker-integration-tests/pom.xml b/external/docker-integration-tests/pom.xml index 2bbd741443942..a3191e718e0b0 100644 --- a/external/docker-integration-tests/pom.xml +++ b/external/docker-integration-tests/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/external/kafka-0-10-assembly/pom.xml b/external/kafka-0-10-assembly/pom.xml index bb0aad994df3f..68ce953f0934d 100644 --- a/external/kafka-0-10-assembly/pom.xml +++ b/external/kafka-0-10-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index d0017253e4d3a..817c866771d32 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/external/kafka-0-10-token-provider/pom.xml b/external/kafka-0-10-token-provider/pom.xml index d9ae08ecad50f..dad549778803b 100644 --- a/external/kafka-0-10-token-provider/pom.xml +++ b/external/kafka-0-10-token-provider/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml index 600aed27a4ddb..eb07e49c2a7c2 100644 --- a/external/kafka-0-10/pom.xml +++ b/external/kafka-0-10/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/external/kinesis-asl-assembly/pom.xml b/external/kinesis-asl-assembly/pom.xml index 5e84422da3a4b..d2c2322e32b7e 100644 --- a/external/kinesis-asl-assembly/pom.xml +++ b/external/kinesis-asl-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/external/kinesis-asl/pom.xml b/external/kinesis-asl/pom.xml index 7c1dd9ce0f092..b1e79d6b2592f 100644 --- a/external/kinesis-asl/pom.xml +++ b/external/kinesis-asl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/external/spark-ganglia-lgpl/pom.xml b/external/spark-ganglia-lgpl/pom.xml index 1ed1d67a3c68b..d5395793b3acf 100644 --- a/external/spark-ganglia-lgpl/pom.xml +++ b/external/spark-ganglia-lgpl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/graphx/pom.xml b/graphx/pom.xml index 041c29ea66542..75db239f3f158 100644 --- a/graphx/pom.xml +++ b/graphx/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../pom.xml diff --git a/hadoop-cloud/pom.xml b/hadoop-cloud/pom.xml index 628b9dfe32060..9fae510a3997d 100644 --- a/hadoop-cloud/pom.xml +++ b/hadoop-cloud/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../pom.xml diff --git a/launcher/pom.xml b/launcher/pom.xml index 2b1daa295227c..bbd02a26caebc 100644 --- a/launcher/pom.xml +++ b/launcher/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../pom.xml diff --git a/mllib-local/pom.xml b/mllib-local/pom.xml index 60e09ecb75315..645ccf03ea5dc 100644 --- a/mllib-local/pom.xml +++ b/mllib-local/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../pom.xml diff --git a/mllib/pom.xml b/mllib/pom.xml index b05463a7d39f3..f1a91d6386324 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../pom.xml diff --git a/pom.xml b/pom.xml index 38f270f6041ef..66a8cc0f345e1 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 pom Spark Project Parent POM https://spark.apache.org/ diff --git a/repl/pom.xml b/repl/pom.xml index f13a7b18a6116..1d7207c79df0a 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../pom.xml diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index 29d283baef2d1..947f5aa2e4ed0 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../../pom.xml diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index 34f639e638188..942a58c658864 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../../pom.xml diff --git a/resource-managers/mesos/pom.xml b/resource-managers/mesos/pom.xml index 14c3470fbb401..c54a99d371f5e 100644 --- a/resource-managers/mesos/pom.xml +++ b/resource-managers/mesos/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/resource-managers/yarn/pom.xml b/resource-managers/yarn/pom.xml index 6b822f92c7356..46155271336eb 100644 --- a/resource-managers/yarn/pom.xml +++ b/resource-managers/yarn/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 5501ef9f37004..8d9666cd240a3 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownLimit.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownLimit.java index fa6447bc068d5..035154d08450a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownLimit.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownLimit.java @@ -33,4 +33,10 @@ public interface SupportsPushDownLimit extends ScanBuilder { * Pushes down LIMIT to the data source. */ boolean pushLimit(int limit); + + /** + * Whether the LIMIT is partially pushed or not. If it returns true, then Spark will do LIMIT + * again. This method will only be called when {@link #pushLimit} returns true. + */ + default boolean isPartiallyPushed() { return true; } } diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 80ddae54a35b5..8a3f88644fae6 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala index 6b366fbd68a1d..60371d6bf432a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala @@ -122,11 +122,11 @@ object PushDownUtils extends PredicateHelper { * the second Boolean value represents whether to push down partially, which means * Spark will keep the Limit and do it again. */ - def pushLimit(scanBuilder: ScanBuilder, limit: Int): Boolean = { + def pushLimit(scanBuilder: ScanBuilder, limit: Int): (Boolean, Boolean) = { scanBuilder match { - case s: SupportsPushDownLimit => - s.pushLimit(limit) - case _ => false + case s: SupportsPushDownLimit if s.pushLimit(limit) => + (true, s.isPartiallyPushed) + case _ => (false, false) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 60048f83fb183..0c500e3a83075 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -369,13 +369,13 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit } } - private def pushDownLimit(plan: LogicalPlan, limit: Int): LogicalPlan = plan match { + private def pushDownLimit(plan: LogicalPlan, limit: Int): (LogicalPlan, Boolean) = plan match { case operation @ ScanOperation(_, filter, sHolder: ScanBuilderHolder) if filter.isEmpty => - val limitPushed = PushDownUtils.pushLimit(sHolder.builder, limit) - if (limitPushed) { + val (isPushed, isPartiallyPushed) = PushDownUtils.pushLimit(sHolder.builder, limit) + if (isPushed) { sHolder.pushedLimit = Some(limit) } - operation + (operation, isPushed && !isPartiallyPushed) case s @ Sort(order, _, operation @ ScanOperation(project, filter, sHolder: ScanBuilderHolder)) if filter.isEmpty && CollapseProject.canCollapseExpressions( order, project, alwaysInline = true) => @@ -389,27 +389,32 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit sHolder.pushedLimit = Some(limit) sHolder.sortOrders = orders if (isPartiallyPushed) { - s + (s, false) } else { - operation + (operation, true) } } else { - s + (s, false) } } else { - s + (s, false) } case p: Project => - val newChild = pushDownLimit(p.child, limit) - p.withNewChildren(Seq(newChild)) - case other => other + val (newChild, isPartiallyPushed) = pushDownLimit(p.child, limit) + (p.withNewChildren(Seq(newChild)), isPartiallyPushed) + case other => (other, false) } def pushDownLimits(plan: LogicalPlan): LogicalPlan = plan.transform { case globalLimit @ Limit(IntegerLiteral(limitValue), child) => - val newChild = pushDownLimit(child, limitValue) - val newLocalLimit = globalLimit.child.asInstanceOf[LocalLimit].withNewChildren(Seq(newChild)) - globalLimit.withNewChildren(Seq(newLocalLimit)) + val (newChild, canRemoveLimit) = pushDownLimit(child, limitValue) + if (canRemoveLimit) { + newChild + } else { + val newLocalLimit = + globalLimit.child.asInstanceOf[LocalLimit].withNewChildren(Seq(newChild)) + globalLimit.withNewChildren(Seq(newLocalLimit)) + } } private def getWrappedScan( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 06f4c1907bf5f..4dfe600fdb53d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -23,7 +23,7 @@ import java.util.Properties import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{DataFrame, ExplainSuiteHelper, QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Sort} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, GlobalLimit, LocalLimit, Sort} import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper} import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog import org.apache.spark.sql.functions.{abs, avg, coalesce, count, count_distinct, exp, lit, log => ln, not, pow, sqrt, sum, udf, when} @@ -142,9 +142,22 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel assert(scan.schema.names.sameElements(names)) } + private def checkLimitRemoved(df: DataFrame, removed: Boolean = true): Unit = { + val limits = df.queryExecution.optimizedPlan.collect { + case g: GlobalLimit => g + case limit: LocalLimit => limit + } + if (removed) { + assert(limits.isEmpty) + } else { + assert(limits.nonEmpty) + } + } + test("simple scan with LIMIT") { val df1 = spark.read.table("h2.test.employee") .where($"dept" === 1).limit(1) + checkLimitRemoved(df1) checkPushedInfo(df1, "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 1, ") checkAnswer(df1, Seq(Row(1, "amy", 10000.00, 1000.0, true))) @@ -157,12 +170,14 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .table("h2.test.employee") .filter($"dept" > 1) .limit(1) + checkLimitRemoved(df2, false) checkPushedInfo(df2, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], PushedLimit: LIMIT 1, ") checkAnswer(df2, Seq(Row(2, "alex", 12000.00, 1200.0, false))) val df3 = sql("SELECT name FROM h2.test.employee WHERE dept > 1 LIMIT 1") checkSchemaNames(df3, Seq("NAME")) + checkLimitRemoved(df3) checkPushedInfo(df3, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], PushedLimit: LIMIT 1, ") checkAnswer(df3, Seq(Row("alex"))) @@ -171,6 +186,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .table("h2.test.employee") .groupBy("DEPT").sum("SALARY") .limit(1) + checkLimitRemoved(df4, false) checkPushedInfo(df4, "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByExpressions: [DEPT], ") checkAnswer(df4, Seq(Row(1, 19000.00))) @@ -182,6 +198,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .select($"SALARY", $"BONUS", sub($"NAME").as("shortName")) .filter(name($"shortName")) .limit(1) + checkLimitRemoved(df5, false) // LIMIT is pushed down only if all the filters are pushed down checkPushedInfo(df5, "PushedFilters: [], ") checkAnswer(df5, Seq(Row(10000.00, 1000.0, "amy"))) @@ -204,6 +221,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .sort("salary") .limit(1) checkSortRemoved(df1) + checkLimitRemoved(df1) checkPushedInfo(df1, "PushedFilters: [], PushedTopN: ORDER BY [salary ASC NULLS FIRST] LIMIT 1, ") checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) @@ -218,6 +236,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .orderBy($"salary") .limit(1) checkSortRemoved(df2) + checkLimitRemoved(df2) checkPushedInfo(df2, "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], " + "PushedTopN: ORDER BY [salary ASC NULLS FIRST] LIMIT 1, ") checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) @@ -232,6 +251,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .orderBy($"salary".desc) .limit(1) checkSortRemoved(df3, false) + checkLimitRemoved(df3, false) checkPushedInfo(df3, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], " + "PushedTopN: ORDER BY [salary DESC NULLS LAST] LIMIT 1, ") checkAnswer(df3, Seq(Row(2, "alex", 12000.00, 1200.0, false))) @@ -240,6 +260,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel sql("SELECT name FROM h2.test.employee WHERE dept > 1 ORDER BY salary NULLS LAST LIMIT 1") checkSchemaNames(df4, Seq("NAME")) checkSortRemoved(df4) + checkLimitRemoved(df4) checkPushedInfo(df4, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], " + "PushedTopN: ORDER BY [salary ASC NULLS LAST] LIMIT 1, ") checkAnswer(df4, Seq(Row("david"))) @@ -257,6 +278,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .orderBy("DEPT") .limit(1) checkSortRemoved(df6, false) + checkLimitRemoved(df6, false) checkPushedInfo(df6, "PushedAggregates: [SUM(SALARY)]," + " PushedFilters: [], PushedGroupByExpressions: [DEPT], ") checkAnswer(df6, Seq(Row(1, 19000.00))) @@ -271,6 +293,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .limit(1) // LIMIT is pushed down only if all the filters are pushed down checkSortRemoved(df7, false) + checkLimitRemoved(df7, false) checkPushedInfo(df7, "PushedFilters: [], ") checkAnswer(df7, Seq(Row(10000.00, 1000.0, "amy"))) @@ -279,6 +302,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .sort(sub($"NAME")) .limit(1) checkSortRemoved(df8, false) + checkLimitRemoved(df8, false) checkPushedInfo(df8, "PushedFilters: [], ") checkAnswer(df8, Seq(Row(2, "alex", 12000.00, 1200.0, false))) } diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml index ae6282d43aa98..eb298546d9fd0 100644 --- a/sql/hive-thriftserver/pom.xml +++ b/sql/hive-thriftserver/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index b54ae4877e4cb..aa8b77b384a16 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../../pom.xml diff --git a/streaming/pom.xml b/streaming/pom.xml index 977abda45f376..659c0e44cba42 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../pom.xml diff --git a/tools/pom.xml b/tools/pom.xml index 62565d0b09d99..e54c1f4d453f6 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.12 - 3.2.0-kylin-4.x-r66 + 3.2.0-kylin-4.x-r69 ../pom.xml