Skip to content
Permalink
Browse files
[HUDI-3369] New ScheduleAndExecute mode for HoodieCompactor and hudi-…
…cli (#4750)

Schedule and execute compaction plan in one single mode.
  • Loading branch information
zhangyue19921010 committed Feb 7, 2022
1 parent 0880a8a commit de206acbae06f733ba14cec2650eef9a0139db7a
Showing 4 changed files with 246 additions and 45 deletions.
@@ -264,6 +264,41 @@ public String compact(
return "Compaction successfully completed for " + compactionInstantTime;
}

@CliCommand(value = "compaction scheduleAndExecute", help = "Schedule compaction plan and execute this plan")
public String compact(
@CliOption(key = {"parallelism"}, mandatory = true,
help = "Parallelism for hoodie compaction") final String parallelism,
@CliOption(key = "schemaFilePath", mandatory = true,
help = "Path for Avro schema file") final String schemaFilePath,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local",
help = "Spark Master") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
help = "Spark executor memory") final String sparkMemory,
@CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") final String retry,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting",
unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
unspecifiedDefaultValue = "") final String[] configs)
throws Exception {
HoodieTableMetaClient client = checkAndGetMetaClient();
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_SCHEDULE_AND_EXECUTE.toString(), master, sparkMemory, client.getBasePath(),
client.getTableConfig().getTableName(), parallelism, schemaFilePath,
retry, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to schedule and execute compaction ";
}
return "Schedule and execute compaction successfully completed";
}

/**
* Prints all compaction details.
*/
@@ -74,7 +74,7 @@ public class SparkMain {
* Commands.
*/
enum SparkCommand {
BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN,
BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_SCHEDULE_AND_EXECUTE,
COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLUSTERING_SCHEDULE,
CLUSTERING_RUN, CLEAN, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE
}
@@ -128,7 +128,21 @@ public static void main(String[] args) throws Exception {
configs.addAll(Arrays.asList(args).subList(9, args.length));
}
returnCode = compact(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), args[7],
Integer.parseInt(args[8]), false, propsFilePath, configs);
Integer.parseInt(args[8]), HoodieCompactor.EXECUTE, propsFilePath, configs);
break;
case COMPACT_SCHEDULE_AND_EXECUTE:
assert (args.length >= 9);
propsFilePath = null;
if (!StringUtils.isNullOrEmpty(args[8])) {
propsFilePath = args[8];
}
configs = new ArrayList<>();
if (args.length > 9) {
configs.addAll(Arrays.asList(args).subList(8, args.length));
}

returnCode = compact(jsc, args[3], args[4], null, Integer.parseInt(args[5]), args[6],
Integer.parseInt(args[7]), HoodieCompactor.SCHEDULE_AND_EXECUTE, propsFilePath, configs);
break;
case COMPACT_SCHEDULE:
assert (args.length >= 7);
@@ -140,7 +154,7 @@ public static void main(String[] args) throws Exception {
if (args.length > 7) {
configs.addAll(Arrays.asList(args).subList(7, args.length));
}
returnCode = compact(jsc, args[3], args[4], args[5], 1, "", 0, true, propsFilePath, configs);
returnCode = compact(jsc, args[3], args[4], args[5], 1, "", 0, HoodieCompactor.SCHEDULE, propsFilePath, configs);
break;
case COMPACT_VALIDATE:
assert (args.length == 7);
@@ -320,7 +334,7 @@ private static void doCompactUnscheduleFile(JavaSparkContext jsc, String basePat
}

private static int compact(JavaSparkContext jsc, String basePath, String tableName, String compactionInstant,
int parallelism, String schemaFile, int retry, boolean schedule, String propsFilePath,
int parallelism, String schemaFile, int retry, String mode, String propsFilePath,
List<String> configs) {
HoodieCompactor.Config cfg = new HoodieCompactor.Config();
cfg.basePath = basePath;
@@ -330,7 +344,7 @@ private static int compact(JavaSparkContext jsc, String basePath, String tableNa
cfg.strategyClassName = UnBoundedCompactionStrategy.class.getCanonicalName();
cfg.parallelism = parallelism;
cfg.schemaFile = schemaFile;
cfg.runSchedule = schedule;
cfg.runningMode = mode;
cfg.propsFilePath = propsFilePath;
cfg.configs = configs;
return new HoodieCompactor(jsc, cfg).compact(retry);
@@ -140,6 +140,33 @@ public void testCompact() throws IOException {
"Pending compaction must be completed");
}

/**
* Test case for command 'compaction scheduleAndExecute'.
*/
@Test
public void testCompactScheduleAndExecute() throws IOException {
// generate commits
generateCommits();

String schemaPath = Paths.get(basePath, "compaction.schema").toString();
writeSchemaToTmpFile(schemaPath);

CommandResult cr2 = getShell().executeCommand(
String.format("compaction scheduleAndExecute --parallelism %s --schemaFilePath %s --sparkMaster %s",
2, schemaPath, "local"));

assertAll("Command run failed",
() -> assertTrue(cr2.isSuccess()),
() -> assertTrue(
cr2.getResult().toString().startsWith("Schedule and execute compaction successfully completed")));

// assert compaction complete
assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
.filterCompletedInstants().getInstants()
.map(HoodieInstant::getTimestamp).count() > 0,
"Completed compaction couldn't be 0");
}

/**
* Test case for command 'compaction validate'.
*/

0 comments on commit de206ac

Please sign in to comment.