diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index caab37162cb1..16dd373486f6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -876,11 +876,10 @@ public PropertyBuilder setInflightMetadataPartitions(String partitions) { return this; } - public PropertyBuilder set(String key, Object value) { + private void set(String key, Object value) { if (HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(key)) { this.others.put(key, value); } - return this; } public PropertyBuilder set(Map props) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 028de00f7ff4..99be676f1413 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -31,8 +31,8 @@ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model._ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.common.util.{CommitUtils, Functions, StringUtils} -import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME} +import org.apache.hudi.common.util.{CommitUtils, StringUtils} +import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME, KEYGEN_CLASS_NAME} import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieException import org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows} @@ -469,7 +469,10 @@ object HoodieSparkSqlWriter { val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER) val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters) val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD) - val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME) + val keyGenProp = + if (StringUtils.nonEmpty(hoodieConfig.getString(KEYGEN_CLASS_NAME))) hoodieConfig.getString(KEYGEN_CLASS_NAME) + else hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME) + val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(keyGenProp, parameters) val populateMetaFields = java.lang.Boolean.parseBoolean(parameters.getOrElse( HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()) @@ -493,6 +496,7 @@ object HoodieSparkSqlWriter { .setPartitionFields(partitionColumns) .setPopulateMetaFields(populateMetaFields) .setKeyGeneratorClassProp(keyGenProp) + .set(timestampKeyGeneratorConfigs) .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 93469f2796cf..89f0acaf0163 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -17,9 +17,6 @@ package org.apache.hudi -import java.io.IOException -import java.time.Instant -import java.util.{Collections, Date, UUID} import org.apache.commons.io.FileUtils import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieSparkUtils.gteqSpark3_0 @@ -43,12 +40,15 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments.arguments -import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, MethodSource, ValueSource} +import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource, ValueSource} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{spy, times, verify} import org.scalatest.Assertions.assertThrows import org.scalatest.Matchers.{be, convertToAnyShouldWrapper, intercept} +import java.io.IOException +import java.time.Instant +import java.util.{Collections, Date, UUID} import scala.collection.JavaConversions._ import scala.collection.JavaConverters @@ -508,7 +508,7 @@ class TestHoodieSparkSqlWriter { val records = DataSourceTestUtils.generateRandomRows(100) val recordsSeq = convertRowListToSeq(records) val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) - initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = false) + initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = false, initBasePath = true) val client = spy(DataSourceUtils.createHoodieClient( new JavaSparkContext(sc), modifiedSchema.toString, tempBasePath, hoodieFooTableName, mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]) @@ -565,7 +565,7 @@ class TestHoodieSparkSqlWriter { DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getCanonicalName) val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) - initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = true) + initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = true, initBasePath = false) val client = spy(DataSourceUtils.createHoodieClient( new JavaSparkContext(sc), @@ -593,7 +593,7 @@ class TestHoodieSparkSqlWriter { } } - def initializeMetaClientForBootstrap(fooTableParams : Map[String, String], tableType: String, addBootstrapPath : Boolean) : Unit = { + def initializeMetaClientForBootstrap(fooTableParams : Map[String, String], tableType: String, addBootstrapPath : Boolean, initBasePath: Boolean) : Unit = { // when metadata is enabled, directly instantiating write client using DataSourceUtils.createHoodieClient // will hit a code which tries to instantiate meta client for data table. if table does not exist, it fails. // hence doing an explicit instantiation here. @@ -612,7 +612,9 @@ class TestHoodieSparkSqlWriter { tableMetaClientBuilder .setBootstrapBasePath(fooTableParams(HoodieBootstrapConfig.BASE_PATH.key)) } - tableMetaClientBuilder.initTable(sc.hadoopConfiguration, tempBasePath) + if (initBasePath) { + tableMetaClientBuilder.initTable(sc.hadoopConfiguration, tempBasePath) + } } /** diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala index f0dd89df1c6d..23b21b315f99 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala @@ -23,7 +23,7 @@ import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.table.timeline.HoodieTimeline import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieCompactionConfig, HoodieWriteConfig} -import org.apache.hudi.keygen.SimpleKeyGenerator +import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.functions.{col, lit} @@ -31,9 +31,9 @@ import org.apache.spark.sql.{SaveMode, SparkSession} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.io.TempDir import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} + import java.time.Instant import java.util.Collections - import scala.collection.JavaConverters._ class TestDataSourceForBootstrap { @@ -102,9 +102,12 @@ class TestDataSourceForBootstrap { .save(srcPath) // Perform bootstrap + val bootstrapKeygenClass = classOf[NonpartitionedKeyGenerator].getName + val options = commonOpts.-(DataSourceWriteOptions.PARTITIONPATH_FIELD.key) val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, - extraOpts = Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator") + extraOpts = options ++ Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> bootstrapKeygenClass), + bootstrapKeygenClass = bootstrapKeygenClass ) // check marked directory clean up assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001"))) @@ -123,10 +126,10 @@ class TestDataSourceForBootstrap { updateDF.write .format("hudi") - .options(commonOpts) + .options(options) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) - .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, "org.apache.hudi.keygen.NonpartitionedKeyGenerator") + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, bootstrapKeygenClass) .mode(SaveMode.Append) .save(basePath) @@ -163,8 +166,8 @@ class TestDataSourceForBootstrap { // Perform bootstrap val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, - Some("datestr"), - Map(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true")) + commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr") ++ Map(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true"), + classOf[SimpleKeyGenerator].getName) // check marked directory clean up assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001"))) @@ -227,7 +230,9 @@ class TestDataSourceForBootstrap { // Perform bootstrap val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( - DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, Some("datestr")) + DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, + commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"), + classOf[SimpleKeyGenerator].getName) // Read bootstrapped table and verify count using glob path val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") @@ -302,7 +307,9 @@ class TestDataSourceForBootstrap { // Perform bootstrap val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( - DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, Some("datestr")) + DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, + commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"), + classOf[SimpleKeyGenerator].getName) // Read bootstrapped table and verify count val hoodieROViewDF1 = spark.read.format("hudi") @@ -367,7 +374,9 @@ class TestDataSourceForBootstrap { // Perform bootstrap val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( - DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, Some("datestr")) + DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, + commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"), + classOf[SimpleKeyGenerator].getName) // Read bootstrapped table and verify count val hoodieROViewDF1 = spark.read.format("hudi") @@ -497,18 +506,16 @@ class TestDataSourceForBootstrap { } def runMetadataBootstrapAndVerifyCommit(tableType: String, - partitionColumns: Option[String] = None, - extraOpts: Map[String, String] = Map.empty): String = { + extraOpts: Map[String, String] = Map.empty, + bootstrapKeygenClass: String): String = { val bootstrapDF = spark.emptyDataFrame bootstrapDF.write .format("hudi") - .options(commonOpts) .options(extraOpts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType) - .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, partitionColumns.getOrElse("")) .option(HoodieBootstrapConfig.BASE_PATH.key, srcPath) - .option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName) + .option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, bootstrapKeygenClass) .mode(SaveMode.Overwrite) .save(basePath) @@ -528,7 +535,7 @@ class TestDataSourceForBootstrap { .load(basePath) assertEquals(numRecords, hoodieIncViewDF1.count()) - var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect(); + var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect() assertEquals(1, countsPerCommit.length) assertEquals(bootstrapCommitInstantTime, countsPerCommit(0).get(0)) @@ -537,10 +544,10 @@ class TestDataSourceForBootstrap { val hoodieIncViewDF2 = spark.read.format("hudi") .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, bootstrapCommitInstantTime) - .load(basePath); + .load(basePath) assertEquals(numRecordsUpdate, hoodieIncViewDF2.count()) - countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect(); + countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect() assertEquals(1, countsPerCommit.length) assertEquals(latestCommitInstantTime, countsPerCommit(0).get(0))