diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index c14749c50da2d..438e7e3b1f210 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -54,6 +54,7 @@ import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.keygen.SimpleAvroKeyGenerator; import org.apache.hudi.schema.FilebasedSchemaProvider; import org.apache.hudi.sink.transform.ChainedTransformer; import org.apache.hudi.sink.transform.Transformer; @@ -314,7 +315,8 @@ public static HoodieTableMetaClient initTableIfNotExists( .setPreCombineField(OptionsResolver.getPreCombineField(conf)) .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue()) .setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD, null)) - .setKeyGeneratorClassProp(conf.getString(FlinkOptions.KEYGEN_CLASS_NAME)) + .setKeyGeneratorClassProp( + conf.getOptional(FlinkOptions.KEYGEN_CLASS_NAME).orElse(SimpleAvroKeyGenerator.class.getName())) .setHiveStylePartitioningEnable(conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)) .setUrlEncodePartitioning(conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING)) .setTimelineLayoutVersion(1) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java index 43b59bdf9e8bc..9630d9cd4d73c 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.keygen.SimpleAvroKeyGenerator; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.ViewStorageProperties; @@ -67,6 +68,7 @@ void testInitTableIfNotExists() throws IOException { "Missing partition columns in the hoodie.properties."); assertArrayEquals(metaClient1.getTableConfig().getPartitionFields().get(), new String[] {"p0", "p1"}); assertEquals(metaClient1.getTableConfig().getPreCombineField(), "ts"); + assertEquals(metaClient1.getTableConfig().getKeyGeneratorClassName(), SimpleAvroKeyGenerator.class.getName()); // Test for non-partitioned table. conf.removeConfig(FlinkOptions.PARTITION_PATH_FIELD); @@ -77,6 +79,7 @@ void testInitTableIfNotExists() throws IOException { .setConf(new org.apache.hadoop.conf.Configuration()) .build(); assertFalse(metaClient2.getTableConfig().getPartitionFields().isPresent()); + assertEquals(metaClient2.getTableConfig().getKeyGeneratorClassName(), SimpleAvroKeyGenerator.class.getName()); } @Test