Skip to content
Permalink
Browse files
[HUDI-4051] Allow nested field as primary key and preCombineField in …
…spark sql (#5517)

* [HUDI-4051] Allow nested field as preCombineField in spark sql

* relax validation for primary key
  • Loading branch information
xushiyan committed May 22, 2022
1 parent 32a5d26 commit 271d1a79c0bf486d8a11c39e4569ec70cba25cd1
Showing 2 changed files with 36 additions and 2 deletions.
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hudi

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.avro.HoodieAvroUtils.getRootLevelFieldName
import org.apache.hudi.common.model.DefaultHoodieRecordPayload
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.ValidationUtils
@@ -198,14 +199,14 @@ object HoodieOptionConfig {
.map(_.split(",").filter(_.length > 0))
ValidationUtils.checkArgument(primaryKeys.nonEmpty, "No `primaryKey` is specified.")
primaryKeys.get.foreach { primaryKey =>
ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, primaryKey)),
ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, getRootLevelFieldName(primaryKey))),
s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.")
}

// validate preCombine key
val preCombineKey = sqlOptions.get(SQL_KEY_PRECOMBINE_FIELD.sqlKeyName)
if (preCombineKey.isDefined && preCombineKey.get.nonEmpty) {
ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, preCombineKey.get)),
ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, getRootLevelFieldName(preCombineKey.get))),
s"Can't find preCombineKey `${preCombineKey.get}` in ${schema.treeString}.")
}

@@ -663,4 +663,37 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
}
}

test("Test nested field as primaryKey and preCombineField") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| name string,
| price double,
| ts long,
| nestedcol struct<a1:string, a2:struct<b1:string, b2:struct<c1:string, c2:int>>>
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| options (
| type = '$tableType',
| primaryKey = 'nestedcol.a1',
| preCombineField = 'nestedcol.a2.b2.c2'
| )
""".stripMargin)
// insert data to table
spark.sql(
s"""insert into $tableName values
|('name_1', 10, 1000, struct('a', struct('b', struct('c', 999)))),
|('name_2', 20, 2000, struct('a', struct('b', struct('c', 333))))
|""".stripMargin)
checkAnswer(s"select name, price, ts, nestedcol.a1, nestedcol.a2.b2.c2 from $tableName")(
Seq("name_1", 10.0, 1000, "a", 999)
)
}
}
}
}

0 comments on commit 271d1a7

Please sign in to comment.