From 795a99ba73183b5f7697bb3964843a43a64e74e9 Mon Sep 17 00:00:00 2001 From: Kumud Kumar Srivatsava Tirupati Date: Tue, 31 May 2022 20:27:50 +0530 Subject: [PATCH] [HUDI-4107] Added --sync-tool-classes config option in HoodieMultiTableDeltaStreamer (#5597) * added --sync-tool-classes config option in multitable delta streamer * added a testcase to assert if syncClientToolClassNames is getting picked to the deltastreamer execution context --- .../apache/hudi/utilities/deltastreamer/DeltaSync.java | 2 +- .../utilities/deltastreamer/HoodieDeltaStreamer.java | 10 +++++++--- .../deltastreamer/HoodieMultiTableDeltaStreamer.java | 5 +++++ .../functional/TestHoodieMultiTableDeltaStreamer.java | 9 +++++++++ 4 files changed, 22 insertions(+), 4 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 0ae72f94b82e..736e416162d2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -691,7 +691,7 @@ private String getSyncClassShortName(String syncClassName) { } private void syncMeta(HoodieDeltaStreamerMetrics metrics) { - Set syncClientToolClasses = new HashSet<>(Arrays.asList(cfg.syncClientToolClass.split(","))); + Set syncClientToolClasses = new HashSet<>(Arrays.asList(cfg.syncClientToolClassNames.split(","))); // for backward compatibility if (cfg.enableHiveSync) { cfg.enableMetaSync = true; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 7a688b50c709..a22a3581ae94 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -306,7 +306,7 @@ public static class Config implements Serializable { public Boolean enableMetaSync = false; @Parameter(names = {"--sync-tool-classes"}, description = "Meta sync client tool, using comma to separate multi tools") - public String syncClientToolClass = HiveSyncTool.class.getName(); + public String syncClientToolClassNames = HiveSyncTool.class.getName(); @Parameter(names = {"--max-pending-compactions"}, description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unless" @@ -442,6 +442,8 @@ public boolean equals(Object o) { && operation == config.operation && Objects.equals(filterDupes, config.filterDupes) && Objects.equals(enableHiveSync, config.enableHiveSync) + && Objects.equals(enableMetaSync, config.enableMetaSync) + && Objects.equals(syncClientToolClassNames, config.syncClientToolClassNames) && Objects.equals(maxPendingCompactions, config.maxPendingCompactions) && Objects.equals(maxPendingClustering, config.maxPendingClustering) && Objects.equals(continuousMode, config.continuousMode) @@ -466,8 +468,8 @@ public int hashCode() { baseFileFormat, propsFilePath, configs, sourceClassName, sourceOrderingField, payloadClassName, schemaProviderClassName, transformerClassNames, sourceLimit, operation, filterDupes, - enableHiveSync, maxPendingCompactions, maxPendingClustering, continuousMode, - minSyncIntervalSeconds, sparkMaster, commitOnErrors, + enableHiveSync, enableMetaSync, syncClientToolClassNames, maxPendingCompactions, maxPendingClustering, + continuousMode, minSyncIntervalSeconds, sparkMaster, commitOnErrors, deltaSyncSchedulingWeight, compactSchedulingWeight, clusterSchedulingWeight, deltaSyncSchedulingMinShare, compactSchedulingMinShare, clusterSchedulingMinShare, forceDisableCompaction, checkpoint, initialCheckpointProvider, help); @@ -491,6 +493,8 @@ public String toString() { + ", operation=" + operation + ", filterDupes=" + filterDupes + ", enableHiveSync=" + enableHiveSync + + ", enableMetaSync=" + enableMetaSync + + ", syncClientToolClassNames=" + syncClientToolClassNames + ", maxPendingCompactions=" + maxPendingCompactions + ", maxPendingClustering=" + maxPendingClustering + ", continuousMode=" + continuousMode diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java index 376c9cfae373..84aee29dec81 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.hive.HiveSyncTool; import org.apache.hudi.sync.common.HoodieSyncConfig; import org.apache.hudi.utilities.IdentitySplitter; import org.apache.hudi.exception.HoodieException; @@ -203,6 +204,7 @@ static String getTableWithDatabase(TableExecutionContext context) { static void deepCopyConfigs(Config globalConfig, HoodieDeltaStreamer.Config tableConfig) { tableConfig.enableHiveSync = globalConfig.enableHiveSync; tableConfig.enableMetaSync = globalConfig.enableMetaSync; + tableConfig.syncClientToolClassNames = globalConfig.syncClientToolClassNames; tableConfig.schemaProviderClassName = globalConfig.schemaProviderClassName; tableConfig.sourceOrderingField = globalConfig.sourceOrderingField; tableConfig.sourceClassName = globalConfig.sourceClassName; @@ -325,6 +327,9 @@ public static class Config implements Serializable { @Parameter(names = {"--enable-sync"}, description = "Enable syncing meta") public Boolean enableMetaSync = false; + @Parameter(names = {"--sync-tool-classes"}, description = "Meta sync client tool, using comma to separate multi tools") + public String syncClientToolClassNames = HiveSyncTool.class.getName(); + @Parameter(names = {"--max-pending-compactions"}, description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unless" + "outstanding compactions is less than this number") diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java index cc2c96f2c851..8f54b0d34dcc 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java @@ -72,10 +72,19 @@ static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String co } config.enableHiveSync = enableHiveSync; config.enableMetaSync = enableMetaSync; + config.syncClientToolClassNames = "com.example.DummySyncTool1,com.example.DummySyncTool2"; return config; } } + @Test + public void testMetaSyncConfig() throws IOException { + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null); + HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); + TableExecutionContext executionContext = streamer.getTableExecutionContexts().get(1); + assertEquals("com.example.DummySyncTool1,com.example.DummySyncTool2", executionContext.getConfig().syncClientToolClassNames); + } + @Test public void testInvalidHiveSyncProps() throws IOException { HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null);