Skip to content

Commit

Permalink
[HUDI-4107] Added --sync-tool-classes config option in HoodieMultiTab…
Browse files Browse the repository at this point in the history
…leDeltaStreamer (#5597)

* added --sync-tool-classes config option in multitable delta streamer

* added a testcase to assert if syncClientToolClassNames is getting picked to the deltastreamer execution context
  • Loading branch information
kumudkumartirupati committed May 31, 2022
1 parent 918c4f4 commit 795a99b
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ private String getSyncClassShortName(String syncClassName) {
}

private void syncMeta(HoodieDeltaStreamerMetrics metrics) {
Set<String> syncClientToolClasses = new HashSet<>(Arrays.asList(cfg.syncClientToolClass.split(",")));
Set<String> syncClientToolClasses = new HashSet<>(Arrays.asList(cfg.syncClientToolClassNames.split(",")));
// for backward compatibility
if (cfg.enableHiveSync) {
cfg.enableMetaSync = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public static class Config implements Serializable {
public Boolean enableMetaSync = false;

@Parameter(names = {"--sync-tool-classes"}, description = "Meta sync client tool, using comma to separate multi tools")
public String syncClientToolClass = HiveSyncTool.class.getName();
public String syncClientToolClassNames = HiveSyncTool.class.getName();

@Parameter(names = {"--max-pending-compactions"},
description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unless"
Expand Down Expand Up @@ -442,6 +442,8 @@ public boolean equals(Object o) {
&& operation == config.operation
&& Objects.equals(filterDupes, config.filterDupes)
&& Objects.equals(enableHiveSync, config.enableHiveSync)
&& Objects.equals(enableMetaSync, config.enableMetaSync)
&& Objects.equals(syncClientToolClassNames, config.syncClientToolClassNames)
&& Objects.equals(maxPendingCompactions, config.maxPendingCompactions)
&& Objects.equals(maxPendingClustering, config.maxPendingClustering)
&& Objects.equals(continuousMode, config.continuousMode)
Expand All @@ -466,8 +468,8 @@ public int hashCode() {
baseFileFormat, propsFilePath, configs, sourceClassName,
sourceOrderingField, payloadClassName, schemaProviderClassName,
transformerClassNames, sourceLimit, operation, filterDupes,
enableHiveSync, maxPendingCompactions, maxPendingClustering, continuousMode,
minSyncIntervalSeconds, sparkMaster, commitOnErrors,
enableHiveSync, enableMetaSync, syncClientToolClassNames, maxPendingCompactions, maxPendingClustering,
continuousMode, minSyncIntervalSeconds, sparkMaster, commitOnErrors,
deltaSyncSchedulingWeight, compactSchedulingWeight, clusterSchedulingWeight, deltaSyncSchedulingMinShare,
compactSchedulingMinShare, clusterSchedulingMinShare, forceDisableCompaction, checkpoint,
initialCheckpointProvider, help);
Expand All @@ -491,6 +493,8 @@ public String toString() {
+ ", operation=" + operation
+ ", filterDupes=" + filterDupes
+ ", enableHiveSync=" + enableHiveSync
+ ", enableMetaSync=" + enableMetaSync
+ ", syncClientToolClassNames=" + syncClientToolClassNames
+ ", maxPendingCompactions=" + maxPendingCompactions
+ ", maxPendingClustering=" + maxPendingClustering
+ ", continuousMode=" + continuousMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -203,6 +204,7 @@ static String getTableWithDatabase(TableExecutionContext context) {
static void deepCopyConfigs(Config globalConfig, HoodieDeltaStreamer.Config tableConfig) {
tableConfig.enableHiveSync = globalConfig.enableHiveSync;
tableConfig.enableMetaSync = globalConfig.enableMetaSync;
tableConfig.syncClientToolClassNames = globalConfig.syncClientToolClassNames;
tableConfig.schemaProviderClassName = globalConfig.schemaProviderClassName;
tableConfig.sourceOrderingField = globalConfig.sourceOrderingField;
tableConfig.sourceClassName = globalConfig.sourceClassName;
Expand Down Expand Up @@ -325,6 +327,9 @@ public static class Config implements Serializable {
@Parameter(names = {"--enable-sync"}, description = "Enable syncing meta")
public Boolean enableMetaSync = false;

@Parameter(names = {"--sync-tool-classes"}, description = "Meta sync client tool, using comma to separate multi tools")
public String syncClientToolClassNames = HiveSyncTool.class.getName();

@Parameter(names = {"--max-pending-compactions"},
description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unless"
+ "outstanding compactions is less than this number")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,19 @@ static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String co
}
config.enableHiveSync = enableHiveSync;
config.enableMetaSync = enableMetaSync;
config.syncClientToolClassNames = "com.example.DummySyncTool1,com.example.DummySyncTool2";
return config;
}
}

@Test
public void testMetaSyncConfig() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null);
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
TableExecutionContext executionContext = streamer.getTableExecutionContexts().get(1);
assertEquals("com.example.DummySyncTool1,com.example.DummySyncTool2", executionContext.getConfig().syncClientToolClassNames);
}

@Test
public void testInvalidHiveSyncProps() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null);
Expand Down

0 comments on commit 795a99b

Please sign in to comment.