Skip to content

Commit a3d9ca3

Browse files
committed
[KYUUBI #1974] Support merge small files in multi insert statement
### _Why are the changes needed?_ This PR aims to support auto merge small files in multi insert statement, for example `FROM VALUES(1) INSERT INTO tmp1 SELECT * INSERT INTO tmp2 SELECT *;` will generate the following plan, `Union` is the root node instead of `InsertIntoHiveTable` ``` Union :- InsertIntoHiveTable : +- Project : +- LocalRelation +- InsertIntoHiveTable +- Project +- LocalRelation ``` This PR also fixed the `canInsertRepartitionByExpression`, previous it did not consider the `SubqueryAlias` which may cause inserting error `Repartition`/`Reblance` node and currupt the data distribution, e.g. `FROM (SELECT * FROM VALUES(1) DOSTRIBUTE BY col1) INSERT INTO tmp1 SELECT * INSERT INTO tmp2 SELECT *;` ``` Union :- InsertIntoHiveTable : +- Project : +- SubqueryAlias : +- RepartitionByExpression : +- Project : +- LocalRelation +- InsertIntoHiveTable +- Project +- SubqueryAlias +- RepartitionByExpression +- Project +- LocalRelation ``` ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1974 from pan3793/ext. Closes #1974 56cd773 [Cheng Pan] nit e0155c2 [Cheng Pan] Support merge small files in multi table insertion Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent 6d757a3 commit a3d9ca3

File tree

3 files changed

+66
-4
lines changed

3 files changed

+66
-4
lines changed

dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWritingSuite.scala

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
2626

2727
class RepartitionBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
2828
test("check repartition exists") {
29-
def check(df: DataFrame): Unit = {
29+
def check(df: DataFrame, expectedRepartitionNum: Int = 1): Unit = {
3030
assert(
3131
df.queryExecution.analyzed.collect {
3232
case r: RepartitionByExpression =>
3333
assert(r.optNumPartitions ===
3434
spark.sessionState.conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM))
3535
r
36-
}.size == 1)
36+
}.size == expectedRepartitionNum)
3737
}
3838

3939
// It's better to set config explicitly in case of we change the default value.
@@ -45,13 +45,44 @@ class RepartitionBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
4545
"SELECT * FROM VALUES(1),(2) AS t(c1)"))
4646
}
4747

48+
withTable("tmp1", "tmp2") {
49+
sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 string)")
50+
sql(s"CREATE TABLE tmp2 (c1 int) $storage PARTITIONED BY (c2 string)")
51+
check(
52+
sql(
53+
"""FROM VALUES(1),(2) AS t(c1)
54+
|INSERT INTO TABLE tmp1 PARTITION(c2='a') SELECT *
55+
|INSERT INTO TABLE tmp2 PARTITION(c2='a') SELECT *
56+
|""".stripMargin),
57+
2)
58+
}
59+
4860
withTable("tmp1") {
4961
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
5062
check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS t(c1)"))
5163
check(sql("INSERT INTO TABLE tmp1 " +
5264
"SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1"))
5365
}
5466

67+
withTable("tmp1", "tmp2") {
68+
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
69+
sql(s"CREATE TABLE tmp2 (c1 int) $storage")
70+
check(
71+
sql(
72+
"""FROM VALUES(1),(2),(3)
73+
|INSERT INTO TABLE tmp1 SELECT *
74+
|INSERT INTO TABLE tmp2 SELECT *
75+
|""".stripMargin),
76+
2)
77+
check(
78+
sql(
79+
"""FROM (SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1)
80+
|INSERT INTO TABLE tmp1 SELECT *
81+
|INSERT INTO TABLE tmp2 SELECT *
82+
|""".stripMargin),
83+
2)
84+
}
85+
5586
withTable("tmp1") {
5687
sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3) AS t(c1)")
5788
}

dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
2626

2727
class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
2828
test("check rebalance exists") {
29-
def check(df: DataFrame): Unit = {
29+
def check(df: DataFrame, expectedRebalanceNum: Int = 1): Unit = {
3030
assert(
3131
df.queryExecution.analyzed.collect {
3232
case r: RebalancePartitions => r
33-
}.size == 1)
33+
}.size == expectedRebalanceNum)
3434
}
3535

3636
// It's better to set config explicitly in case of we change the default value.
@@ -42,11 +42,35 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
4242
"SELECT * FROM VALUES(1),(2) AS t(c1)"))
4343
}
4444

45+
withTable("tmp1", "tmp2") {
46+
sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 string)")
47+
sql(s"CREATE TABLE tmp2 (c1 int) $storage PARTITIONED BY (c2 string)")
48+
check(
49+
sql(
50+
"""FROM VALUES(1),(2)
51+
|INSERT INTO TABLE tmp1 PARTITION(c2='a') SELECT *
52+
|INSERT INTO TABLE tmp2 PARTITION(c2='a') SELECT *
53+
|""".stripMargin),
54+
2)
55+
}
56+
4557
withTable("tmp1") {
4658
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
4759
check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS t(c1)"))
4860
}
4961

62+
withTable("tmp1", "tmp2") {
63+
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
64+
sql(s"CREATE TABLE tmp2 (c1 int) $storage")
65+
check(
66+
sql(
67+
"""FROM VALUES(1),(2),(3)
68+
|INSERT INTO TABLE tmp1 SELECT *
69+
|INSERT INTO TABLE tmp2 SELECT *
70+
|""".stripMargin),
71+
2)
72+
}
73+
5074
withTable("tmp1") {
5175
sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3) AS t(c1)")
5276
}

dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ abstract class RepartitionBeforeWritingDatasourceBase extends RepartitionBuilder
5959
query.output.filter(attr => table.partitionColumnNames.contains(attr.name))
6060
c.copy(query = buildRepartition(dynamicPartitionColumns, query))
6161

62+
case u @ Union(children, _, _) =>
63+
u.copy(children = children.map(addRepartition))
64+
6265
case _ => plan
6366
}
6467
}
@@ -98,13 +101,17 @@ abstract class RepartitionBeforeWritingHiveBase extends RepartitionBuilder {
98101
query.output.filter(attr => table.partitionColumnNames.contains(attr.name))
99102
c.copy(query = buildRepartition(dynamicPartitionColumns, query))
100103

104+
case u @ Union(children, _, _) =>
105+
u.copy(children = children.map(addRepartition))
106+
101107
case _ => plan
102108
}
103109
}
104110

105111
trait RepartitionBeforeWriteHelper {
106112
def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = plan match {
107113
case Project(_, child) => canInsertRepartitionByExpression(child)
114+
case SubqueryAlias(_, child) => canInsertRepartitionByExpression(child)
108115
case Limit(_, _) => false
109116
case _: Sort => false
110117
case _: RepartitionByExpression => false

0 commit comments

Comments
 (0)