Skip to content

Commit

Permalink
[HUDI-4071] Remove default value for mandatory record key field
Browse files Browse the repository at this point in the history
  • Loading branch information
codope committed Sep 15, 2022
1 parent 851c6e1 commit 3ae2406
Show file tree
Hide file tree
Showing 13 changed files with 39 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -676,10 +676,10 @@ private void validateBucketIndexConfig() {
// check the bucket index hash field
if (StringUtils.isNullOrEmpty(hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD))) {
hoodieIndexConfig.setValue(BUCKET_INDEX_HASH_FIELD,
hoodieIndexConfig.getStringOrDefault(KeyGeneratorOptions.RECORDKEY_FIELD_NAME));
hoodieIndexConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME));
} else {
boolean valid = Arrays
.stream(hoodieIndexConfig.getStringOrDefault(KeyGeneratorOptions.RECORDKEY_FIELD_NAME).split(","))
.stream(hoodieIndexConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME).split(","))
.collect(Collectors.toSet())
.containsAll(Arrays.asList(hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD).split(",")));
if (!valid) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.index.hbase.SparkHoodieHBaseIndex;
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex;
import org.apache.hudi.index.simple.HoodieSimpleIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -39,6 +40,7 @@
import org.junit.jupiter.params.provider.EnumSource;

import java.nio.file.Path;
import java.util.Properties;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -88,13 +90,15 @@ public void testCreateIndex(IndexType indexType) {
assertTrue(SparkHoodieIndexFactory.createIndex(config) instanceof SparkHoodieHBaseIndex);
break;
case BUCKET:
Properties props = new Properties();
props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid");
config = clientConfigBuilder.withPath(basePath)
.withIndexConfig(indexConfigBuilder.withIndexType(IndexType.BUCKET)
.withIndexConfig(indexConfigBuilder.fromProperties(props).withIndexType(IndexType.BUCKET)
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE).build()).build();
assertTrue(SparkHoodieIndexFactory.createIndex(config) instanceof HoodieSimpleBucketIndex);

config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withIndexConfig(indexConfigBuilder.withIndexType(IndexType.BUCKET)
.withIndexConfig(indexConfigBuilder.fromProperties(props).withIndexType(IndexType.BUCKET)
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING).build())
.build();
assertTrue(SparkHoodieIndexFactory.createIndex(config) instanceof HoodieSparkConsistentBucketIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public void testBucketIndexValidityCheck() {
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE)
.withBucketNum("8").build();
});
props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid");
props.setProperty(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key(), "uuid");
HoodieIndexConfig.newBuilder().fromProperties(props)
.withIndexType(HoodieIndex.IndexType.BUCKET)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,17 @@ private Properties makeIndexConfig(HoodieIndex.IndexType indexType) {
Properties props = new Properties();
HoodieIndexConfig.Builder indexConfig = HoodieIndexConfig.newBuilder()
.withIndexType(indexType);
props.putAll(indexConfig.build().getProps());
if (indexType.equals(HoodieIndex.IndexType.BUCKET)) {
props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
indexConfig.fromProperties(props)
.withIndexKeyField("_row_key")
.withBucketNum("1")
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE);
props.putAll(indexConfig.build().getProps());
props.putAll(HoodieLayoutConfig.newBuilder().fromProperties(props)
.withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build().getProps());
}
props.putAll(indexConfig.build().getProps());
return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public void setup() {

@Test
public void testWriteDuringCompaction() throws IOException {
Properties props = getPropertiesForKeyGen(true);
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
.withPath(basePath())
Expand All @@ -99,10 +100,8 @@ public void testWriteDuringCompaction() throws IOException {
.withLayoutConfig(HoodieLayoutConfig.newBuilder()
.withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
.build();

Properties props = getPropertiesForKeyGen(true);
props.putAll(config.getProps());

metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
Expand Down Expand Up @@ -131,6 +130,7 @@ public void testWriteDuringCompaction() throws IOException {
@ParameterizedTest
@MethodSource("writeLogTest")
public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean enableTimelineServer) throws IOException {
Properties props = getPropertiesForKeyGen(true);
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
.withPath(basePath())
Expand All @@ -144,10 +144,8 @@ public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean en
.withLayoutConfig(HoodieLayoutConfig.newBuilder()
.withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
.build();

Properties props = getPropertiesForKeyGen(true);
props.putAll(config.getProps());

metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.Objects;

/**
* ConfigProperty describes a configuration property. It contains the configuration
Expand Down Expand Up @@ -76,7 +76,7 @@ public String key() {

public T defaultValue() {
if (defaultValue == null) {
throw new HoodieException("There's no default value for this config");
throw new HoodieException(String.format("There's no default value for this config: %s", key));
}
return defaultValue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class KeyGeneratorOptions extends HoodieConfig {

public static final ConfigProperty<String> RECORDKEY_FIELD_NAME = ConfigProperty
.key("hoodie.datasource.write.recordkey.field")
.defaultValue("uuid")
.noDefaultValue()
.withDocumentation("Record key field. Value to be used as the `recordKey` component of `HoodieKey`.\n"
+ "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using\n"
+ "the dot notation eg: `a.b.c`");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,9 +605,6 @@ object DataSourceWriteOptions {
val RECORDKEY_FIELD_OPT_KEY = KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()
/** @deprecated Use {@link RECORDKEY_FIELD} and its methods instead */
@Deprecated
val DEFAULT_RECORDKEY_FIELD_OPT_VAL = RECORDKEY_FIELD.defaultValue()
/** @deprecated Use {@link PARTITIONPATH_FIELD} and its methods instead */
@Deprecated
val PARTITIONPATH_FIELD_OPT_KEY = KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()
/** @deprecated Use {@link PARTITIONPATH_FIELD} and its methods instead */
@Deprecated
Expand Down Expand Up @@ -789,7 +786,7 @@ object DataSourceOptionsHelper {
val partitionFields = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), null)
if (partitionFields != null) {
val numPartFields = partitionFields.split(",").length
val recordsKeyFields = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key(), DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue())
val recordsKeyFields = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key())
val numRecordKeyFields = recordsKeyFields.split(",").length
if (numPartFields == 1 && numRecordKeyFields == 1) {
classOf[SimpleKeyGenerator].getName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ object HoodieWriterUtils {
hoodieConfig.setDefaultValue(TABLE_TYPE)
hoodieConfig.setDefaultValue(PRECOMBINE_FIELD)
hoodieConfig.setDefaultValue(PAYLOAD_CLASS_NAME)
hoodieConfig.setDefaultValue(RECORDKEY_FIELD)
hoodieConfig.setDefaultValue(KEYGENERATOR_CLASS_NAME)
hoodieConfig.setDefaultValue(ENABLE)
hoodieConfig.setDefaultValue(COMMIT_METADATA_KEYPREFIX)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ object HoodieOptionConfig {
.withSqlKey("primaryKey")
.withHoodieKey(DataSourceWriteOptions.RECORDKEY_FIELD.key)
.withTableConfigKey(HoodieTableConfig.RECORDKEY_FIELDS.key)
.defaultValue(DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue())
.build()

val SQL_KEY_TABLE_TYPE: HoodieSQLOption[String] = buildConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Test
class TestDataSourceOptions {
@Test def inferDataSourceOptions(): Unit = {
val inputOptions1 = Map(
RECORDKEY_FIELD.key -> "uuid",
TABLE_NAME.key -> "hudi_table",
PARTITIONPATH_FIELD.key -> "year,month"
)
Expand All @@ -38,6 +39,7 @@ class TestDataSourceOptions {
modifiedOptions1(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key))

val inputOptions2 = Map(
RECORDKEY_FIELD.key -> "uuid",
TABLE_NAME.key -> "hudi_table",
PARTITIONPATH_FIELD.key -> "year",
HIVE_STYLE_PARTITIONING.key -> "true"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,14 +261,22 @@ class TestHoodieSparkSqlWriter {
@Test
def testThrowExceptionAlreadyExistsWithAppendSaveMode(): Unit = {
//create a new table
val fooTableModifier = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
"hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4")
val fooTableModifier = Map(
"path" -> tempBasePath,
HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
"hoodie.datasource.write.recordkey.field" -> "uuid",
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4")
val dataFrame = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime)))
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, dataFrame)

//on same path try append with different("hoodie_bar_tbl") table name which should throw an exception
val barTableModifier = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> "hoodie_bar_tbl",
"hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4")
val barTableModifier = Map(
"path" -> tempBasePath,
HoodieWriteConfig.TBL_NAME.key -> "hoodie_bar_tbl",
"hoodie.datasource.write.recordkey.field" -> "uuid",
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4")
val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime)))
val tableAlreadyExistException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, barTableModifier, dataFrame2))
assert(tableAlreadyExistException.getMessage.contains("Config conflict"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,25 @@

package org.apache.hudi.functional

import scala.collection.JavaConversions._

import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.config.{HoodieIndexConfig, HoodieLayoutConfig, HoodieWriteConfig}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.{HoodieIndexConfig, HoodieLayoutConfig, HoodieWriteConfig}
import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner
import org.apache.hudi.table.storage.HoodieStorageLayout
import org.apache.hudi.testutils.HoodieClientTestBase

import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.spark.sql._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}

import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner
import org.apache.hudi.table.storage.HoodieStorageLayout
import scala.collection.JavaConversions._

/**
*
*/
class TestDataSourceForBucketIndex extends HoodieClientTestBase {
class TestMORDataSourceWithBucketIndex extends HoodieClientTestBase {

var spark: SparkSession = null
val commonOpts = Map(
Expand Down

0 comments on commit 3ae2406

Please sign in to comment.