From ea92e40855b790585f1d42b39df6f9aad38fd35c Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Fri, 21 Oct 2022 10:18:44 +0800 Subject: [PATCH] [HUDI-5046] Support all the hive sync options for flink sql (#6985) --- .../hudi/configuration/FlinkOptions.java | 3 ++- .../hudi/sink/utils/HiveSyncContext.java | 3 ++- .../hudi/sink/utils/TestHiveSyncContext.java | 21 ++++++++++++------- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 43d842241abf5..55abeaaa56c2c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -722,9 +722,10 @@ private FlinkOptions() { // ------------------------------------------------------------------------ public static final ConfigOption HIVE_SYNC_ENABLED = ConfigOptions - .key("hive_sync.enable") + .key("hive_sync.enabled") .booleanType() .defaultValue(false) + .withFallbackKeys("hive_sync.enable") .withDescription("Asynchronously sync Hive meta to HMS, default false"); public static final ConfigOption HIVE_SYNC_DB = ConfigOptions diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java index d2f56d9a3e6ca..b1c8457c1ac1d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java @@ -26,6 +26,7 @@ import org.apache.hudi.hive.HiveSyncTool; import org.apache.hudi.hive.ddl.HiveSyncMode; import org.apache.hudi.table.format.FilePathUtils; +import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; @@ -93,7 +94,7 @@ public static HiveSyncContext create(Configuration conf, SerializableConfigurati @VisibleForTesting public static Properties buildSyncConfig(Configuration conf) { - TypedProperties props = new TypedProperties(); + TypedProperties props = StreamerUtil.flinkConf2TypedProperties(conf); props.setPropertyIfNonNull(META_SYNC_BASE_PATH.key(), conf.getString(FlinkOptions.PATH)); props.setPropertyIfNonNull(META_SYNC_BASE_FILE_FORMAT.key(), conf.getString(FlinkOptions.HIVE_SYNC_FILE_FORMAT)); props.setPropertyIfNonNull(HIVE_USE_PRE_APACHE_INPUT_FORMAT.key(), "false"); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java index 381b292c25f9f..ae30b39906e0b 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java @@ -19,25 +19,26 @@ package org.apache.hudi.sink.utils; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.hive.HiveSyncConfig; import org.apache.flink.configuration.Configuration; import org.junit.jupiter.api.Test; -import java.lang.reflect.Method; import java.util.Properties; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Test cases for {@link HiveSyncContext}. */ public class TestHiveSyncContext { /** - * Test that the file ids generated by the task can finally shuffled to itself. + * Test partition path fields sync. */ @Test - public void testBuildSyncConfig() throws Exception { + void testSyncedPartitions() { Configuration configuration1 = new Configuration(); Configuration configuration2 = new Configuration(); String hiveSyncPartitionField = "hiveSyncPartitionField"; @@ -48,15 +49,21 @@ public void testBuildSyncConfig() throws Exception { configuration2.setString(FlinkOptions.PARTITION_PATH_FIELD, partitionPathField); - Class threadClazz = Class.forName("org.apache.hudi.sink.utils.HiveSyncContext"); - Method buildSyncConfigMethod = threadClazz.getDeclaredMethod("buildSyncConfig", Configuration.class); - buildSyncConfigMethod.setAccessible(true); - Properties props1 = HiveSyncContext.buildSyncConfig(configuration1); Properties props2 = HiveSyncContext.buildSyncConfig(configuration2); assertEquals(hiveSyncPartitionField, props1.getProperty(META_SYNC_PARTITION_FIELDS.key())); assertEquals(partitionPathField, props2.getProperty(META_SYNC_PARTITION_FIELDS.key())); + } + /** + * Test an option that has no shortcut key. + */ + @Test + void testOptionWithoutShortcutKey() { + Configuration configuration3 = new Configuration(); + configuration3.setBoolean(HiveSyncConfig.HIVE_CREATE_MANAGED_TABLE.key(), true); + Properties props3 = HiveSyncContext.buildSyncConfig(configuration3); + assertTrue(Boolean.parseBoolean(props3.getProperty(HiveSyncConfig.HIVE_CREATE_MANAGED_TABLE.key(), "false"))); } }