Skip to content

Commit

Permalink
[HUDI-2709] Add more options when initializing table
Browse files Browse the repository at this point in the history
  • Loading branch information
danny0405 committed Nov 8, 2021
1 parent cf2ecd7 commit 3e24d3d
Showing 1 changed file with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
Expand Down Expand Up @@ -210,7 +211,6 @@ public static HoodieWriteConfig getHoodieClientConfig(
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
.withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
.build())
.withKeyGenerator(conf.getString(FlinkOptions.KEYGEN_CLASS_NAME)) // needed by TwoToThreeUpgradeHandler
.withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService)
.withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
.withAutoCommit(false)
Expand Down Expand Up @@ -239,6 +239,8 @@ public static TypedProperties flinkConf2TypedProperties(Configuration conf) {
Properties properties = new Properties();
// put all the set options
flatConf.addAllToProperties(properties);
// ugly: table keygen clazz, needed by TwoToThreeUpgradeHandler
properties.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), conf.getString(FlinkOptions.KEYGEN_CLASS_NAME));
// put all the default options
for (ConfigOption<?> option : FlinkOptions.optionalOptions()) {
if (!flatConf.contains(option) && option.hasDefaultValue()) {
Expand Down Expand Up @@ -268,9 +270,12 @@ public static HoodieTableMetaClient initTableIfNotExists(Configuration conf) thr
.setTableName(conf.getString(FlinkOptions.TABLE_NAME))
.setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, null))
.setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
.setPreCombineField(OptionsResolver.getPreCombineField(conf))
.setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
.setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD, null))
.setPreCombineField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
.setKeyGeneratorClassProp(conf.getString(FlinkOptions.KEYGEN_CLASS_NAME))
.setHiveStylePartitioningEnable(conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING))
.setTimelineLayoutVersion(1)
.initTable(hadoopConf, basePath);
LOG.info("Table initialized under base path {}", basePath);
Expand Down

0 comments on commit 3e24d3d

Please sign in to comment.