Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2242] Add configuration inference logic for few options #3359

Merged
merged 2 commits into from Nov 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.lock.LockProvider;
import org.apache.hudi.common.util.Option;

import java.io.File;
import java.io.FileReader;
Expand Down Expand Up @@ -159,9 +160,11 @@ public class HoodieLockConfig extends HoodieConfig {
public static final ConfigProperty<String> ZK_LOCK_KEY = ConfigProperty
.key(ZK_LOCK_KEY_PROP_KEY)
.noDefaultValue()
.withInferFunction(p -> Option.ofNullable(p.getStringOrDefault(HoodieWriteConfig.TBL_NAME, null)))
.sinceVersion("0.8.0")
.withDocumentation("Key name under base_path at which to create a ZNode and acquire lock. "
+ "Final path on zk will look like base_path/lock_key. We recommend setting this to the table name");
+ "Final path on zk will look like base_path/lock_key. If this parameter is not set, we would "
+ "set it as the table name");

// Pluggable type of lock provider
public static final ConfigProperty<String> LOCK_PROVIDER_CLASS_NAME = ConfigProperty
Expand Down
Expand Up @@ -23,6 +23,8 @@
import org.apache.hudi.exception.HoodieException;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.Objects;

Expand Down Expand Up @@ -93,8 +95,8 @@ Option<Function<HoodieConfig, Option<T>>> getInferFunc() {
return inferFunction;
}

public String[] getAlternatives() {
return alternatives;
public List<String> getAlternatives() {
return Arrays.asList(alternatives);
}

public ConfigProperty<T> withDocumentation(String doc) {
Expand Down
Expand Up @@ -96,7 +96,7 @@ public static TypedProperties loadGlobalProps() {
try {
conf.addPropsFromFile(new Path(DEFAULT_CONF_FILE_DIR));
} catch (Exception ignored) {
LOG.debug("Didn't find config file under default conf file dir: " + DEFAULT_CONF_FILE_DIR);
LOG.warn("Didn't find config file under default conf file dir: " + DEFAULT_CONF_FILE_DIR);
}
}
return conf.getProps();
Expand Down
Expand Up @@ -90,7 +90,7 @@ public <T> boolean contains(ConfigProperty<T> configProperty) {
if (props.containsKey(configProperty.key())) {
return true;
}
return Arrays.stream(configProperty.getAlternatives()).anyMatch(props::containsKey);
return configProperty.getAlternatives().stream().anyMatch(props::containsKey);
}

private <T> Option<Object> getRawValue(ConfigProperty<T> configProperty) {
Expand Down
Expand Up @@ -18,18 +18,23 @@
package org.apache.hudi

import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.common.config.ConfigProperty
import org.apache.hudi.common.config.{ConfigProperty, HoodieConfig}
import org.apache.hudi.common.fs.ConsistencyGuardConfig
import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.Option
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.hive.{HiveSyncTool, SlashEncodedDayPartitionValueExtractor}
import org.apache.hudi.hive.{HiveStylePartitionValueExtractor, HiveSyncTool, MultiPartKeysValueExtractor, NonPartitionedExtractor, SlashEncodedDayPartitionValueExtractor}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{CustomKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.datasources.{DataSourceUtils => SparkDataSourceUtils}

import java.util.function.{Function => JavaFunction}
import scala.collection.JavaConverters._
import scala.language.implicitConversions

/**
* List of options that can be passed to the Hoodie datasource,
* in addition to the hoodie client configs
Expand Down Expand Up @@ -211,7 +216,7 @@ object DataSourceWriteOptions {
.map(SparkDataSourceUtils.decodePartitioningColumns)
.getOrElse(Nil)
val keyGeneratorClass = optParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key(),
DataSourceWriteOptions.DEFAULT_KEYGENERATOR_CLASS_OPT_VAL)
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.defaultValue)

val partitionPathField =
keyGeneratorClass match {
Expand Down Expand Up @@ -273,8 +278,26 @@ object DataSourceWriteOptions {
*/
val HIVE_STYLE_PARTITIONING = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE

val KEYGENERATOR_CLASS_NAME = ConfigProperty.key("hoodie.datasource.write.keygenerator.class")
/**
* Key generator class, that implements will extract the key out of incoming record
*
*/
val keyGeneraterInferFunc = DataSourceOptionsHelper.scalaFunctionToJavaFunction((p: HoodieConfig) => {
if (!p.contains(PARTITIONPATH_FIELD)) {
Option.of(classOf[NonpartitionedKeyGenerator].getName)
} else {
val numOfPartFields = p.getString(PARTITIONPATH_FIELD).split(",").length
if (numOfPartFields == 1) {
Option.of(classOf[SimpleKeyGenerator].getName)
} else {
Option.of(classOf[ComplexKeyGenerator].getName)
}
}
})
val KEYGENERATOR_CLASS_NAME: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.keygenerator.class")
.defaultValue(classOf[SimpleKeyGenerator].getName)
.withInferFunction(keyGeneraterInferFunc)
xushiyan marked this conversation as resolved.
Show resolved Hide resolved
.withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator`")

val ENABLE_ROW_WRITER: ConfigProperty[String] = ConfigProperty
Expand Down Expand Up @@ -364,9 +387,19 @@ object DataSourceWriteOptions {
.defaultValue("default")
.withDocumentation("database to sync to")

val hiveTableOptKeyInferFunc = DataSourceOptionsHelper.scalaFunctionToJavaFunction((p: HoodieConfig) => {
if (p.contains(TABLE_NAME)) {
Option.of(p.getString(TABLE_NAME))
} else if (p.contains(HoodieWriteConfig.TBL_NAME)) {
Option.of(p.getString(HoodieWriteConfig.TBL_NAME))
} else {
Option.empty[String]()
}
})
val HIVE_TABLE: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.hive_sync.table")
.defaultValue("unknown")
.withInferFunction(hiveTableOptKeyInferFunc)
.withDocumentation("table to sync to")

val HIVE_BASE_FILE_FORMAT: ConfigProperty[String] = ConfigProperty
Expand All @@ -389,16 +422,37 @@ object DataSourceWriteOptions {
.defaultValue("jdbc:hive2://localhost:10000")
.withDocumentation("Hive metastore url")

val hivePartitionFieldsInferFunc = DataSourceOptionsHelper.scalaFunctionToJavaFunction((p: HoodieConfig) => {
if (p.contains(PARTITIONPATH_FIELD)) {
Option.of(p.getString(PARTITIONPATH_FIELD))
} else {
Option.empty[String]()
}
})
val HIVE_PARTITION_FIELDS: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.hive_sync.partition_fields")
.defaultValue("")
.withDocumentation("Field in the table to use for determining hive partition columns.")

.withInferFunction(hivePartitionFieldsInferFunc)

val hivePartitionExtractorInferFunc = DataSourceOptionsHelper.scalaFunctionToJavaFunction((p: HoodieConfig) => {
if (!p.contains(PARTITIONPATH_FIELD)) {
Option.of(classOf[NonPartitionedExtractor].getName)
} else {
val numOfPartFields = p.getString(PARTITIONPATH_FIELD).split(",").length
if (numOfPartFields == 1 && p.contains(HIVE_STYLE_PARTITIONING) && p.getString(HIVE_STYLE_PARTITIONING) == "true") {
Option.of(classOf[HiveStylePartitionValueExtractor].getName)
} else {
Option.of(classOf[MultiPartKeysValueExtractor].getName)
}
}
})
val HIVE_PARTITION_EXTRACTOR_CLASS: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.hive_sync.partition_extractor_class")
.defaultValue(classOf[SlashEncodedDayPartitionValueExtractor].getCanonicalName)
.withDocumentation("Class which implements PartitionValueExtractor to extract the partition values, "
+ "default 'SlashEncodedDayPartitionValueExtractor'.")
.withInferFunction(hivePartitionExtractorInferFunc)

val HIVE_ASSUME_DATE_PARTITION: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.hive_sync.assume_date_partitioning")
Expand Down Expand Up @@ -755,7 +809,7 @@ object DataSourceOptionsHelper {
// maps the deprecated config name to its latest name
val allAlternatives: Map[String, String] = {
val alterMap = scala.collection.mutable.Map[String, String]()
allConfigsWithAlternatives.foreach(cfg => cfg.getAlternatives.foreach(alternative => alterMap(alternative) = cfg.key))
allConfigsWithAlternatives.foreach(cfg => cfg.getAlternatives.asScala.foreach(alternative => alterMap(alternative) = cfg.key))
alterMap.toMap
}

Expand Down Expand Up @@ -794,4 +848,10 @@ object DataSourceOptionsHelper {
QUERY_TYPE.key -> queryType
) ++ translateConfigurations(parameters)
}

implicit def scalaFunctionToJavaFunction[From, To](function: (From) => To): JavaFunction[From, To] = {
new JavaFunction[From, To] {
override def apply (input: From): To = function (input)
}
}
}
Expand Up @@ -719,8 +719,8 @@ object HoodieSparkSqlWriter {

private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String],
tableConfig: HoodieTableConfig): (Map[String, String], HoodieConfig) = {
val mergedParams = mutable.Map.empty ++
DataSourceWriteOptions.translateSqlOptions(HoodieWriterUtils.parametersWithWriteDefaults(optParams))
val translatedOptions = DataSourceWriteOptions.translateSqlOptions(optParams)
val mergedParams = mutable.Map.empty ++ HoodieWriterUtils.parametersWithWriteDefaults(translatedOptions)
if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)
&& mergedParams.contains(KEYGENERATOR_CLASS_NAME.key)) {
mergedParams(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = mergedParams(KEYGENERATOR_CLASS_NAME.key)
Expand Down
Expand Up @@ -48,41 +48,44 @@ object HoodieWriterUtils {
*/
def parametersWithWriteDefaults(parameters: Map[String, String]): Map[String, String] = {
val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala
Map(OPERATION.key -> OPERATION.defaultValue,
TABLE_TYPE.key -> TABLE_TYPE.defaultValue,
PRECOMBINE_FIELD.key -> PRECOMBINE_FIELD.defaultValue,
PAYLOAD_CLASS_NAME.key -> PAYLOAD_CLASS_NAME.defaultValue,
RECORDKEY_FIELD.key -> RECORDKEY_FIELD.defaultValue,
PARTITIONPATH_FIELD.key -> PARTITIONPATH_FIELD.defaultValue,
KEYGENERATOR_CLASS_NAME.key -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL,
ENABLE.key -> ENABLE.defaultValue.toString,
COMMIT_METADATA_KEYPREFIX.key -> COMMIT_METADATA_KEYPREFIX.defaultValue,
INSERT_DROP_DUPS.key -> INSERT_DROP_DUPS.defaultValue,
STREAMING_RETRY_CNT.key -> STREAMING_RETRY_CNT.defaultValue,
STREAMING_RETRY_INTERVAL_MS.key -> STREAMING_RETRY_INTERVAL_MS.defaultValue,
STREAMING_IGNORE_FAILED_BATCH.key -> STREAMING_IGNORE_FAILED_BATCH.defaultValue,
META_SYNC_CLIENT_TOOL_CLASS_NAME.key -> META_SYNC_CLIENT_TOOL_CLASS_NAME.defaultValue,
HIVE_SYNC_ENABLED.key -> HIVE_SYNC_ENABLED.defaultValue,
META_SYNC_ENABLED.key -> META_SYNC_ENABLED.defaultValue,
HIVE_DATABASE.key -> HIVE_DATABASE.defaultValue,
HIVE_TABLE.key -> HIVE_TABLE.defaultValue,
HIVE_BASE_FILE_FORMAT.key -> HIVE_BASE_FILE_FORMAT.defaultValue,
HIVE_USER.key -> HIVE_USER.defaultValue,
HIVE_PASS.key -> HIVE_PASS.defaultValue,
HIVE_URL.key -> HIVE_URL.defaultValue,
HIVE_PARTITION_FIELDS.key -> HIVE_PARTITION_FIELDS.defaultValue,
HIVE_PARTITION_EXTRACTOR_CLASS.key -> HIVE_PARTITION_EXTRACTOR_CLASS.defaultValue,
HIVE_STYLE_PARTITIONING.key -> HIVE_STYLE_PARTITIONING.defaultValue,
HIVE_USE_JDBC.key -> HIVE_USE_JDBC.defaultValue,
HIVE_CREATE_MANAGED_TABLE.key() -> HIVE_CREATE_MANAGED_TABLE.defaultValue.toString,
HIVE_SYNC_AS_DATA_SOURCE_TABLE.key() -> HIVE_SYNC_AS_DATA_SOURCE_TABLE.defaultValue(),
ASYNC_COMPACT_ENABLE.key -> ASYNC_COMPACT_ENABLE.defaultValue,
INLINE_CLUSTERING_ENABLE.key -> INLINE_CLUSTERING_ENABLE.defaultValue,
ASYNC_CLUSTERING_ENABLE.key -> ASYNC_CLUSTERING_ENABLE.defaultValue,
ENABLE_ROW_WRITER.key -> ENABLE_ROW_WRITER.defaultValue,
RECONCILE_SCHEMA.key -> RECONCILE_SCHEMA.defaultValue.toString,
DROP_PARTITION_COLUMNS.key -> DROP_PARTITION_COLUMNS.defaultValue
) ++ globalProps ++ DataSourceOptionsHelper.translateConfigurations(parameters)
val props = new Properties()
props.putAll(parameters)
val hoodieConfig: HoodieConfig = new HoodieConfig(props)
hoodieConfig.setDefaultValue(OPERATION)
hoodieConfig.setDefaultValue(TABLE_TYPE)
hoodieConfig.setDefaultValue(PRECOMBINE_FIELD)
hoodieConfig.setDefaultValue(PAYLOAD_CLASS_NAME)
hoodieConfig.setDefaultValue(RECORDKEY_FIELD)
hoodieConfig.setDefaultValue(PARTITIONPATH_FIELD)
hoodieConfig.setDefaultValue(KEYGENERATOR_CLASS_NAME)
hoodieConfig.setDefaultValue(ENABLE)
hoodieConfig.setDefaultValue(COMMIT_METADATA_KEYPREFIX)
hoodieConfig.setDefaultValue(INSERT_DROP_DUPS)
hoodieConfig.setDefaultValue(STREAMING_RETRY_CNT)
hoodieConfig.setDefaultValue(STREAMING_RETRY_INTERVAL_MS)
hoodieConfig.setDefaultValue(STREAMING_IGNORE_FAILED_BATCH)
hoodieConfig.setDefaultValue(META_SYNC_CLIENT_TOOL_CLASS_NAME)
hoodieConfig.setDefaultValue(HIVE_SYNC_ENABLED)
hoodieConfig.setDefaultValue(META_SYNC_ENABLED)
hoodieConfig.setDefaultValue(HIVE_DATABASE)
hoodieConfig.setDefaultValue(HIVE_TABLE)
hoodieConfig.setDefaultValue(HIVE_BASE_FILE_FORMAT)
hoodieConfig.setDefaultValue(HIVE_USER)
hoodieConfig.setDefaultValue(HIVE_PASS)
hoodieConfig.setDefaultValue(HIVE_URL)
hoodieConfig.setDefaultValue(HIVE_PARTITION_FIELDS)
hoodieConfig.setDefaultValue(HIVE_PARTITION_EXTRACTOR_CLASS)
hoodieConfig.setDefaultValue(HIVE_STYLE_PARTITIONING)
hoodieConfig.setDefaultValue(HIVE_USE_JDBC)
hoodieConfig.setDefaultValue(HIVE_CREATE_MANAGED_TABLE)
hoodieConfig.setDefaultValue(HIVE_SYNC_AS_DATA_SOURCE_TABLE)
hoodieConfig.setDefaultValue(ASYNC_COMPACT_ENABLE)
hoodieConfig.setDefaultValue(INLINE_CLUSTERING_ENABLE)
hoodieConfig.setDefaultValue(ASYNC_CLUSTERING_ENABLE)
hoodieConfig.setDefaultValue(ENABLE_ROW_WRITER)
hoodieConfig.setDefaultValue(RECONCILE_SCHEMA)
hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS)
Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ DataSourceOptionsHelper.translateConfigurations(parameters)
}

def toProperties(params: Map[String, String]): TypedProperties = {
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.hive.NonPartitionedExtractor;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;

Expand Down Expand Up @@ -270,7 +271,9 @@ private DataFrameWriter<Row> updateHiveSyncConfig(DataFrameWriter<Row> writer) {
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(),
MultiPartKeysValueExtractor.class.getCanonicalName());
} else {
writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), "dateStr");
writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), "dateStr").option(
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(),
SlashEncodedDayPartitionValueExtractor.class.getCanonicalName());
}
}
return writer;
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.hive.NonPartitionedExtractor;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;

Expand Down Expand Up @@ -140,7 +141,9 @@ private DataFrameWriter<Row> updateHiveSyncConfig(DataFrameWriter<Row> writer) {
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(),
MultiPartKeysValueExtractor.class.getCanonicalName());
} else {
writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), "dateStr");
writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), "dateStr").option(
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(),
SlashEncodedDayPartitionValueExtractor.class.getCanonicalName());
}
}
return writer;
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
Expand Down Expand Up @@ -391,7 +392,9 @@ private DataStreamWriter<Row> updateHiveSyncConfig(DataStreamWriter<Row> writer)
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(),
MultiPartKeysValueExtractor.class.getCanonicalName());
} else {
writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), "dateStr");
writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), "dateStr").option(
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(),
SlashEncodedDayPartitionValueExtractor.class.getCanonicalName());
}
}
return writer;
Expand Down