From 3e24d3de985a07487dcebddd9aed1bd5161eecca Mon Sep 17 00:00:00 2001 From: "yuzhao.cyz" Date: Mon, 8 Nov 2021 12:05:17 +0800 Subject: [PATCH] [HUDI-2709] Add more options when initializing table --- .../src/main/java/org/apache/hudi/util/StreamerUtil.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index da962f97e979..3a31253a2c89 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -210,7 +211,6 @@ public static HoodieWriteConfig getHoodieClientConfig( .withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) .withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) .build()) - .withKeyGenerator(conf.getString(FlinkOptions.KEYGEN_CLASS_NAME)) // needed by TwoToThreeUpgradeHandler .withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService) .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton .withAutoCommit(false) @@ -239,6 +239,8 @@ public static TypedProperties flinkConf2TypedProperties(Configuration conf) { Properties properties = new Properties(); // put all the set options flatConf.addAllToProperties(properties); + // ugly: table keygen clazz, needed by TwoToThreeUpgradeHandler + properties.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), conf.getString(FlinkOptions.KEYGEN_CLASS_NAME)); // put all the default options for (ConfigOption option : FlinkOptions.optionalOptions()) { if (!flatConf.contains(option) && option.hasDefaultValue()) { @@ -268,9 +270,12 @@ public static HoodieTableMetaClient initTableIfNotExists(Configuration conf) thr .setTableName(conf.getString(FlinkOptions.TABLE_NAME)) .setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, null)) .setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME)) + .setPreCombineField(OptionsResolver.getPreCombineField(conf)) .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue()) .setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD, null)) - .setPreCombineField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) + .setKeyGeneratorClassProp(conf.getString(FlinkOptions.KEYGEN_CLASS_NAME)) + .setHiveStylePartitioningEnable(conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)) + .setUrlEncodePartitioning(conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING)) .setTimelineLayoutVersion(1) .initTable(hadoopConf, basePath); LOG.info("Table initialized under base path {}", basePath);