Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3403] Ensure keygen props are set for bootstrap #6645

Merged
merged 3 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -876,11 +876,10 @@ public PropertyBuilder setInflightMetadataPartitions(String partitions) {
return this;
}

public PropertyBuilder set(String key, Object value) {
private void set(String key, Object value) {
if (HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(key)) {
this.others.put(key, value);
}
return this;
}

public PropertyBuilder set(Map<String, Object> props) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{CommitUtils, Functions, StringUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME}
import org.apache.hudi.common.util.{CommitUtils, StringUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME, KEYGEN_CLASS_NAME}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows}
Expand Down Expand Up @@ -469,7 +469,10 @@ object HoodieSparkSqlWriter {
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters)
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
val keyGenProp =
if (StringUtils.nonEmpty(hoodieConfig.getString(KEYGEN_CLASS_NAME))) hoodieConfig.getString(KEYGEN_CLASS_NAME)
else hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(keyGenProp, parameters)
val populateMetaFields = java.lang.Boolean.parseBoolean(parameters.getOrElse(
HoodieTableConfig.POPULATE_META_FIELDS.key(),
String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())
Expand All @@ -493,6 +496,7 @@ object HoodieSparkSqlWriter {
.setPartitionFields(partitionColumns)
.setPopulateMetaFields(populateMetaFields)
.setKeyGeneratorClassProp(keyGenProp)
.set(timestampKeyGeneratorConfigs)
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package org.apache.hudi

import java.io.IOException
import java.time.Instant
import java.util.{Collections, Date, UUID}
import org.apache.commons.io.FileUtils
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils.gteqSpark3_0
Expand All @@ -43,12 +40,15 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue,
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments.arguments
import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, MethodSource, ValueSource}
import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource, ValueSource}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{spy, times, verify}
import org.scalatest.Assertions.assertThrows
import org.scalatest.Matchers.{be, convertToAnyShouldWrapper, intercept}

import java.io.IOException
import java.time.Instant
import java.util.{Collections, Date, UUID}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters

Expand Down Expand Up @@ -508,7 +508,7 @@ class TestHoodieSparkSqlWriter {
val records = DataSourceTestUtils.generateRandomRows(100)
val recordsSeq = convertRowListToSeq(records)
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = false)
initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = false, initBasePath = true)
val client = spy(DataSourceUtils.createHoodieClient(
new JavaSparkContext(sc), modifiedSchema.toString, tempBasePath, hoodieFooTableName,
mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
Expand Down Expand Up @@ -565,7 +565,7 @@ class TestHoodieSparkSqlWriter {
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getCanonicalName)
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = true)
initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = true, initBasePath = false)

val client = spy(DataSourceUtils.createHoodieClient(
new JavaSparkContext(sc),
Expand Down Expand Up @@ -593,7 +593,7 @@ class TestHoodieSparkSqlWriter {
}
}

def initializeMetaClientForBootstrap(fooTableParams : Map[String, String], tableType: String, addBootstrapPath : Boolean) : Unit = {
def initializeMetaClientForBootstrap(fooTableParams : Map[String, String], tableType: String, addBootstrapPath : Boolean, initBasePath: Boolean) : Unit = {
// when metadata is enabled, directly instantiating write client using DataSourceUtils.createHoodieClient
// will hit a code which tries to instantiate meta client for data table. if table does not exist, it fails.
// hence doing an explicit instantiation here.
Expand All @@ -612,7 +612,9 @@ class TestHoodieSparkSqlWriter {
tableMetaClientBuilder
.setBootstrapBasePath(fooTableParams(HoodieBootstrapConfig.BASE_PATH.key))
}
tableMetaClientBuilder.initTable(sc.hadoopConfiguration, tempBasePath)
if (initBasePath) {
tableMetaClientBuilder.initTable(sc.hadoopConfiguration, tempBasePath)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieCompactionConfig, HoodieWriteConfig}
import org.apache.hudi.keygen.SimpleKeyGenerator
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.io.TempDir
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}

import java.time.Instant
import java.util.Collections

import scala.collection.JavaConverters._

class TestDataSourceForBootstrap {
Expand Down Expand Up @@ -102,9 +102,12 @@ class TestDataSourceForBootstrap {
.save(srcPath)

// Perform bootstrap
val bootstrapKeygenClass = classOf[NonpartitionedKeyGenerator].getName
val options = commonOpts.-(DataSourceWriteOptions.PARTITIONPATH_FIELD.key)
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
extraOpts = Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
extraOpts = options ++ Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> bootstrapKeygenClass),
bootstrapKeygenClass = bootstrapKeygenClass
)
// check marked directory clean up
assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
Expand All @@ -123,10 +126,10 @@ class TestDataSourceForBootstrap {

updateDF.write
.format("hudi")
.options(commonOpts)
.options(options)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, bootstrapKeygenClass)
.mode(SaveMode.Append)
.save(basePath)

Expand Down Expand Up @@ -163,8 +166,8 @@ class TestDataSourceForBootstrap {
// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
Some("datestr"),
Map(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true"))
commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr") ++ Map(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true"),
classOf[SimpleKeyGenerator].getName)

// check marked directory clean up
assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
Expand Down Expand Up @@ -227,7 +230,9 @@ class TestDataSourceForBootstrap {

// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, Some("datestr"))
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"),
classOf[SimpleKeyGenerator].getName)

// Read bootstrapped table and verify count using glob path
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
Expand Down Expand Up @@ -302,7 +307,9 @@ class TestDataSourceForBootstrap {

// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, Some("datestr"))
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"),
classOf[SimpleKeyGenerator].getName)

// Read bootstrapped table and verify count
val hoodieROViewDF1 = spark.read.format("hudi")
Expand Down Expand Up @@ -367,7 +374,9 @@ class TestDataSourceForBootstrap {

// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, Some("datestr"))
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"),
classOf[SimpleKeyGenerator].getName)

// Read bootstrapped table and verify count
val hoodieROViewDF1 = spark.read.format("hudi")
Expand Down Expand Up @@ -497,18 +506,16 @@ class TestDataSourceForBootstrap {
}

def runMetadataBootstrapAndVerifyCommit(tableType: String,
partitionColumns: Option[String] = None,
extraOpts: Map[String, String] = Map.empty): String = {
extraOpts: Map[String, String] = Map.empty,
bootstrapKeygenClass: String): String = {
val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write
.format("hudi")
.options(commonOpts)
.options(extraOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType)
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, partitionColumns.getOrElse(""))
.option(HoodieBootstrapConfig.BASE_PATH.key, srcPath)
.option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName)
.option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, bootstrapKeygenClass)
.mode(SaveMode.Overwrite)
.save(basePath)

Expand All @@ -528,7 +535,7 @@ class TestDataSourceForBootstrap {
.load(basePath)

assertEquals(numRecords, hoodieIncViewDF1.count())
var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect()
assertEquals(1, countsPerCommit.length)
assertEquals(bootstrapCommitInstantTime, countsPerCommit(0).get(0))

Expand All @@ -537,10 +544,10 @@ class TestDataSourceForBootstrap {
val hoodieIncViewDF2 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, bootstrapCommitInstantTime)
.load(basePath);
.load(basePath)

assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect()
assertEquals(1, countsPerCommit.length)
assertEquals(latestCommitInstantTime, countsPerCommit(0).get(0))

Expand Down