Skip to content

Commit 9d706e5

Browse files
committed
[KYUUBI #2830] Imporve Z-Order with Spark3.3
### _Why are the changes needed?_ We can inject rebalance before Z-Order to avoid data skew. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2830 from ulysses-you/improve-zorder. Closes #2830 789aba4 [ulysses-you] cleanup e169a20 [ulysses-you] resolver 9134496 [ulysses-you] style 048fe29 [ulysses-you] docs e06f1ef [ulysses-you] imporve zorder Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: ulysses-you <ulyssesyou@apache.org>
1 parent a06a2ca commit 9d706e5

File tree

4 files changed

+346
-5
lines changed

4 files changed

+346
-5
lines changed

extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
1919

2020
import org.apache.spark.sql.SparkSessionExtensions
2121

22-
import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder}
22+
import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource33, InsertZorderBeforeWritingHive33, ResolveZorder}
2323

2424
class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) {
2525
override def apply(extensions: SparkSessionExtensions): Unit = {
@@ -38,8 +38,8 @@ object KyuubiSparkSQLCommonExtension {
3838
// should be applied before
3939
// RepartitionBeforeWriting and RebalanceBeforeWriting
4040
// because we can only apply one of them (i.e. Global Sort or Repartition/Rebalance)
41-
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource)
42-
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive)
41+
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource33)
42+
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive33)
4343
extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule)
4444

4545
extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.sql.zorder
19+
20+
import org.apache.spark.sql.SparkSession
21+
import org.apache.spark.sql.catalyst.catalog.CatalogTable
22+
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, NullsLast, SortOrder}
23+
import org.apache.spark.sql.catalyst.plans.logical._
24+
import org.apache.spark.sql.catalyst.rules.Rule
25+
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
26+
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
27+
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand}
28+
29+
import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
30+
31+
trait InsertZorderHelper33 extends Rule[LogicalPlan] with ZorderBuilder {
32+
private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled"
33+
private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols"
34+
35+
def isZorderEnabled(props: Map[String, String]): Boolean = {
36+
props.contains(KYUUBI_ZORDER_ENABLED) &&
37+
"true".equalsIgnoreCase(props(KYUUBI_ZORDER_ENABLED)) &&
38+
props.contains(KYUUBI_ZORDER_COLS)
39+
}
40+
41+
def getZorderColumns(props: Map[String, String]): Seq[String] = {
42+
val cols = props.get(KYUUBI_ZORDER_COLS)
43+
assert(cols.isDefined)
44+
cols.get.split(",").map(_.trim)
45+
}
46+
47+
def canInsertZorder(query: LogicalPlan): Boolean = query match {
48+
case Project(_, child) => canInsertZorder(child)
49+
// TODO: actually, we can force zorder even if existed some shuffle
50+
case _: Sort => false
51+
case _: RepartitionByExpression => false
52+
case _: Repartition => false
53+
case _ => true
54+
}
55+
56+
def insertZorder(
57+
catalogTable: CatalogTable,
58+
plan: LogicalPlan,
59+
dynamicPartitionColumns: Seq[Attribute]): LogicalPlan = {
60+
if (!canInsertZorder(plan)) {
61+
return plan
62+
}
63+
val cols = getZorderColumns(catalogTable.properties)
64+
val resolver = session.sessionState.conf.resolver
65+
val output = plan.output
66+
val bound = cols.flatMap(col => output.find(attr => resolver(attr.name, col)))
67+
if (bound.size < cols.size) {
68+
logWarning(s"target table does not contain all zorder cols: ${cols.mkString(",")}, " +
69+
s"please check your table properties ${KYUUBI_ZORDER_COLS}.")
70+
plan
71+
} else {
72+
if (conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) &&
73+
conf.getConf(KyuubiSQLConf.REBALANCE_BEFORE_ZORDER)) {
74+
throw new KyuubiSQLExtensionException(s"${KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED.key} " +
75+
s"and ${KyuubiSQLConf.REBALANCE_BEFORE_ZORDER.key} can not be enabled together.")
76+
}
77+
if (conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) &&
78+
dynamicPartitionColumns.nonEmpty) {
79+
logWarning(s"Dynamic partition insertion with global sort may produce small files.")
80+
}
81+
82+
val zorderExpr =
83+
if (bound.length == 1) {
84+
bound
85+
} else if (conf.getConf(KyuubiSQLConf.ZORDER_USING_ORIGINAL_ORDERING_ENABLED)) {
86+
bound.asInstanceOf[Seq[Expression]]
87+
} else {
88+
buildZorder(bound) :: Nil
89+
}
90+
val (global, orderExprs, child) =
91+
if (conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED)) {
92+
(true, zorderExpr, plan)
93+
} else if (conf.getConf(KyuubiSQLConf.REBALANCE_BEFORE_ZORDER)) {
94+
val rebalanceExpr =
95+
if (dynamicPartitionColumns.isEmpty) {
96+
// static partition insert
97+
bound
98+
} else if (conf.getConf(KyuubiSQLConf.REBALANCE_ZORDER_COLUMNS_ENABLED)) {
99+
// improve data compression ratio
100+
dynamicPartitionColumns.asInstanceOf[Seq[Expression]] ++ bound
101+
} else {
102+
dynamicPartitionColumns.asInstanceOf[Seq[Expression]]
103+
}
104+
// for dynamic partition insert, Spark always sort the partition columns,
105+
// so here we sort partition columns + zorder.
106+
val rebalance =
107+
if (dynamicPartitionColumns.nonEmpty &&
108+
conf.getConf(KyuubiSQLConf.TWO_PHASE_REBALANCE_BEFORE_ZORDER)) {
109+
// improve compression ratio
110+
RebalancePartitions(
111+
rebalanceExpr,
112+
RebalancePartitions(dynamicPartitionColumns, plan))
113+
} else {
114+
RebalancePartitions(rebalanceExpr, plan)
115+
}
116+
(false, dynamicPartitionColumns.asInstanceOf[Seq[Expression]] ++ zorderExpr, rebalance)
117+
} else {
118+
(false, zorderExpr, plan)
119+
}
120+
val order = orderExprs.map { expr =>
121+
SortOrder(expr, Ascending, NullsLast, Seq.empty)
122+
}
123+
Sort(order, global, child)
124+
}
125+
}
126+
127+
override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children)
128+
129+
def session: SparkSession
130+
def applyInternal(plan: LogicalPlan): LogicalPlan
131+
132+
final override def apply(plan: LogicalPlan): LogicalPlan = {
133+
if (conf.getConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING)) {
134+
applyInternal(plan)
135+
} else {
136+
plan
137+
}
138+
}
139+
}
140+
141+
case class InsertZorderBeforeWritingDatasource33(session: SparkSession)
142+
extends InsertZorderHelper33 {
143+
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
144+
case insert: InsertIntoHadoopFsRelationCommand
145+
if insert.query.resolved &&
146+
insert.bucketSpec.isEmpty && insert.catalogTable.isDefined &&
147+
isZorderEnabled(insert.catalogTable.get.properties) =>
148+
val dynamicPartition =
149+
insert.partitionColumns.filterNot(attr => insert.staticPartitions.contains(attr.name))
150+
val newQuery = insertZorder(insert.catalogTable.get, insert.query, dynamicPartition)
151+
if (newQuery.eq(insert.query)) {
152+
insert
153+
} else {
154+
insert.copy(query = newQuery)
155+
}
156+
157+
case ctas: CreateDataSourceTableAsSelectCommand
158+
if ctas.query.resolved &&
159+
ctas.table.bucketSpec.isEmpty && isZorderEnabled(ctas.table.properties) =>
160+
val dynamicPartition =
161+
ctas.query.output.filter(attr => ctas.table.partitionColumnNames.contains(attr.name))
162+
val newQuery = insertZorder(ctas.table, ctas.query, dynamicPartition)
163+
if (newQuery.eq(ctas.query)) {
164+
ctas
165+
} else {
166+
ctas.copy(query = newQuery)
167+
}
168+
169+
case _ => plan
170+
}
171+
}
172+
173+
case class InsertZorderBeforeWritingHive33(session: SparkSession)
174+
extends InsertZorderHelper33 {
175+
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
176+
case insert: InsertIntoHiveTable
177+
if insert.query.resolved &&
178+
insert.table.bucketSpec.isEmpty && isZorderEnabled(insert.table.properties) =>
179+
val dynamicPartition = insert.partition.filter(_._2.isEmpty).keys
180+
.flatMap(name => insert.query.output.find(_.name == name)).toSeq
181+
val newQuery = insertZorder(insert.table, insert.query, dynamicPartition)
182+
if (newQuery.eq(insert.query)) {
183+
insert
184+
} else {
185+
insert.copy(query = newQuery)
186+
}
187+
188+
case ctas: CreateHiveTableAsSelectCommand
189+
if ctas.query.resolved &&
190+
ctas.tableDesc.bucketSpec.isEmpty && isZorderEnabled(ctas.tableDesc.properties) =>
191+
val dynamicPartition =
192+
ctas.query.output.filter(attr => ctas.tableDesc.partitionColumnNames.contains(attr.name))
193+
val newQuery = insertZorder(ctas.tableDesc, ctas.query, dynamicPartition)
194+
if (newQuery.eq(ctas.query)) {
195+
ctas
196+
} else {
197+
ctas.copy(query = newQuery)
198+
}
199+
200+
case octas: OptimizedCreateHiveTableAsSelectCommand
201+
if octas.query.resolved &&
202+
octas.tableDesc.bucketSpec.isEmpty && isZorderEnabled(octas.tableDesc.properties) =>
203+
val dynamicPartition =
204+
octas.query.output.filter(attr => octas.tableDesc.partitionColumnNames.contains(attr.name))
205+
val newQuery = insertZorder(octas.tableDesc, octas.query, dynamicPartition)
206+
if (newQuery.eq(octas.query)) {
207+
octas
208+
} else {
209+
octas.copy(query = newQuery)
210+
}
211+
212+
case _ => plan
213+
}
214+
}

extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ZorderSuite.scala

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,95 @@
1717

1818
package org.apache.spark.sql
1919

20-
class ZorderWithCodegenEnabledSuite extends ZorderWithCodegenEnabledSuiteBase {}
20+
import org.apache.spark.sql.catalyst.plans.logical.{RebalancePartitions, Sort}
21+
import org.apache.spark.sql.internal.SQLConf
2122

22-
class ZorderWithCodegenDisabledSuite extends ZorderWithCodegenDisabledSuiteBase {}
23+
import org.apache.kyuubi.sql.KyuubiSQLConf
24+
import org.apache.kyuubi.sql.zorder.Zorder
25+
26+
trait ZorderWithCodegenEnabledSuiteBase33 extends ZorderWithCodegenEnabledSuiteBase {
27+
28+
test("Add rebalance before zorder") {
29+
Seq("true" -> false, "false" -> true).foreach { case (useOriginalOrdering, zorder) =>
30+
withSQLConf(
31+
KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED.key -> "false",
32+
KyuubiSQLConf.REBALANCE_BEFORE_ZORDER.key -> "true",
33+
KyuubiSQLConf.REBALANCE_ZORDER_COLUMNS_ENABLED.key -> "true",
34+
KyuubiSQLConf.ZORDER_USING_ORIGINAL_ORDERING_ENABLED.key -> useOriginalOrdering) {
35+
withTable("t") {
36+
sql(
37+
"""
38+
|CREATE TABLE t (c1 int, c2 string) PARTITIONED BY (d string)
39+
| TBLPROPERTIES (
40+
|'kyuubi.zorder.enabled'= 'true',
41+
|'kyuubi.zorder.cols'= 'c1,C2')
42+
|""".stripMargin)
43+
val p = sql("INSERT INTO TABLE t PARTITION(d='a') SELECT * FROM VALUES(1,'a')")
44+
.queryExecution.analyzed
45+
assert(p.collect {
46+
case sort: Sort
47+
if !sort.global &&
48+
((sort.order.exists(_.child.isInstanceOf[Zorder]) && zorder) ||
49+
(!sort.order.exists(_.child.isInstanceOf[Zorder]) && !zorder)) => sort
50+
}.size == 1)
51+
assert(p.collect {
52+
case rebalance: RebalancePartitions
53+
if rebalance.references.map(_.name).exists(_.equals("c1")) => rebalance
54+
}.size == 1)
55+
56+
val p2 = sql("INSERT INTO TABLE t PARTITION(d) SELECT * FROM VALUES(1,'a','b')")
57+
.queryExecution.analyzed
58+
assert(p2.collect {
59+
case sort: Sort
60+
if (!sort.global && Seq("c1", "c2", "d").forall(x =>
61+
sort.references.map(_.name).exists(_.equals(x)))) &&
62+
((sort.order.exists(_.child.isInstanceOf[Zorder]) && zorder) ||
63+
(!sort.order.exists(_.child.isInstanceOf[Zorder]) && !zorder)) => sort
64+
}.size == 1)
65+
assert(p2.collect {
66+
case rebalance: RebalancePartitions
67+
if Seq("c1", "c2", "d").forall(x =>
68+
rebalance.references.map(_.name).exists(_.equals(x))) => rebalance
69+
}.size == 1)
70+
}
71+
}
72+
}
73+
}
74+
75+
test("Two phase rebalance before Z-Order") {
76+
withSQLConf(
77+
SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
78+
"org.apache.spark.sql.catalyst.optimizer.CollapseRepartition",
79+
KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED.key -> "false",
80+
KyuubiSQLConf.REBALANCE_BEFORE_ZORDER.key -> "true",
81+
KyuubiSQLConf.TWO_PHASE_REBALANCE_BEFORE_ZORDER.key -> "true",
82+
KyuubiSQLConf.REBALANCE_ZORDER_COLUMNS_ENABLED.key -> "true") {
83+
withTable("t") {
84+
sql(
85+
"""
86+
|CREATE TABLE t (c1 int) PARTITIONED BY (d string)
87+
| TBLPROPERTIES (
88+
|'kyuubi.zorder.enabled'= 'true',
89+
|'kyuubi.zorder.cols'= 'c1')
90+
|""".stripMargin)
91+
val p = sql("INSERT INTO TABLE t PARTITION(d) SELECT * FROM VALUES(1,'a')")
92+
val rebalance = p.queryExecution.optimizedPlan.innerChildren
93+
.flatMap(_.collect { case r: RebalancePartitions => r })
94+
assert(rebalance.size == 2)
95+
assert(rebalance.head.partitionExpressions.flatMap(_.references.map(_.name))
96+
.contains("d"))
97+
assert(rebalance.head.partitionExpressions.flatMap(_.references.map(_.name))
98+
.contains("c1"))
99+
100+
assert(rebalance(1).partitionExpressions.flatMap(_.references.map(_.name))
101+
.contains("d"))
102+
assert(!rebalance(1).partitionExpressions.flatMap(_.references.map(_.name))
103+
.contains("c1"))
104+
}
105+
}
106+
}
107+
}
108+
109+
class ZorderWithCodegenEnabledSuite extends ZorderWithCodegenEnabledSuiteBase33 {}
110+
111+
class ZorderWithCodegenDisabledSuite extends ZorderWithCodegenEnabledSuiteBase33 {}

extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,44 @@ object KyuubiSQLConf {
9797
.booleanConf
9898
.createWithDefault(true)
9999

100+
val REBALANCE_BEFORE_ZORDER =
101+
buildConf("spark.sql.optimizer.rebalanceBeforeZorder.enabled")
102+
.doc("when true, we do a rebalance before zorder in case data skew. " +
103+
"Note that, if the insertion is dynamic partition we will use the partition " +
104+
"columns to rebalance. Note that, this config only affects with Spark 3.3.x")
105+
.version("1.6.0")
106+
.booleanConf
107+
.createWithDefault(false)
108+
109+
val REBALANCE_ZORDER_COLUMNS_ENABLED =
110+
buildConf("spark.sql.optimizer.rebalanceZorderColumns.enabled")
111+
.doc(s"When true and ${REBALANCE_BEFORE_ZORDER.key} is true, we do rebalance before " +
112+
s"Z-Order. If it's dynamic partition insert, the rebalance expression will include " +
113+
s"both partition columns and Z-Order columns. Note that, this config only " +
114+
s"affects with Spark 3.3.x")
115+
.version("1.6.0")
116+
.booleanConf
117+
.createWithDefault(false)
118+
119+
val TWO_PHASE_REBALANCE_BEFORE_ZORDER =
120+
buildConf("spark.sql.optimizer.twoPhaseRebalanceBeforeZorder.enabled")
121+
.doc(s"When true and ${REBALANCE_BEFORE_ZORDER.key} is true, we do two phase rebalance " +
122+
s"before Z-Order for the dynamic partition write. The first phase rebalance using " +
123+
s"dynamic partition column; The second phase rebalance using dynamic partition column + " +
124+
s"Z-Order columns. Note that, this config only affects with Spark 3.3.x")
125+
.version("1.6.0")
126+
.booleanConf
127+
.createWithDefault(false)
128+
129+
val ZORDER_USING_ORIGINAL_ORDERING_ENABLED =
130+
buildConf("spark.sql.optimizer.zorderUsingOriginalOrdering.enabled")
131+
.doc(s"When true and ${REBALANCE_BEFORE_ZORDER.key} is true, we do sort by " +
132+
s"the original ordering i.e. lexicographical order. Note that, this config only " +
133+
s"affects with Spark 3.3.x")
134+
.version("1.6.0")
135+
.booleanConf
136+
.createWithDefault(false)
137+
100138
val WATCHDOG_MAX_PARTITIONS =
101139
buildConf("spark.sql.watchdog.maxPartitions")
102140
.doc("Set the max partition number when spark scans a data source. " +

0 commit comments

Comments
 (0)