From 301d3936066b421fccf8070f0602bbaa1ebf415a Mon Sep 17 00:00:00 2001 From: Jintao Guan Date: Mon, 5 Apr 2021 10:37:08 -0700 Subject: [PATCH 01/10] tmp base --- .../hudi/cli/commands/ClusteringCommand.java | 100 ++++++++++++++++++ .../apache/hudi/cli/commands/SparkMain.java | 43 +++++++- 2 files changed, 142 insertions(+), 1 deletion(-) create mode 100644 hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java new file mode 100644 index 000000000000..c4f4483adadd --- /dev/null +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.commands; + +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.utils.InputStreamConsumer; +import org.apache.hudi.cli.utils.SparkUtil; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.utilities.UtilHelpers; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.launcher.SparkLauncher; +import org.apache.spark.util.Utils; +import org.springframework.shell.core.CommandMarker; +import org.springframework.shell.core.annotation.CliCommand; +import org.springframework.shell.core.annotation.CliOption; +import org.springframework.stereotype.Component; +import scala.collection.JavaConverters; + +@Component +public class ClusteringCommand implements CommandMarker { + + private static final Logger LOG = LogManager.getLogger(ClusteringCommand.class); + + @CliCommand(value = "clustering schedule", help = "Schedule Clustering") + public String scheduleClustering( + @CliOption(key = "sparkMemory", help = "Spark executor memory", + unspecifiedDefaultValue = "1G") final String sparkMemory, + @CliOption(key = "propsFilePath", mandatory = true, help = "path to properties file on localfs or dfs with configurations for hoodie client for clustering", + unspecifiedDefaultValue = "") final String propsFilePath, + @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array", + unspecifiedDefaultValue = "") final String[] configs) throws Exception { + HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); + String sparkPropertiesPath = + Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala()); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.setMaster("local[4]"); + + // First get a clustering instant time and pass it to spark launcher for scheduling clustering + String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); + + sparkLauncher.addAppArgs(SparkMain.SparkCommand.CLUSTERING_SCHEDULE.toString(), client.getBasePath(), + client.getTableConfig().getTableName(), clusteringInstantTime, sparkMemory, propsFilePath); + UtilHelpers.validateAndAddProperties(configs, sparkLauncher); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return "Failed to schedule clustering for " + clusteringInstantTime; + } + return "Attempted to schedule clustering for " + clusteringInstantTime; + } + + @CliCommand(value = "clustering run", help = "Run Clustering") + public String runClustering( + @CliOption(key = "parallelism", mandatory = true, + help = "Parallelism for hoodie clustering") final String parallelism, + @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", + help = "Spark executor memory") final String sparkMemory, + @CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") final String retry, + @CliOption(key = "clusteringInstant", help = "Base path for the target hoodie table") final String clusteringInstantTime, + @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting", + unspecifiedDefaultValue = "") final String propsFilePath, + @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array", + unspecifiedDefaultValue = "") final String[] configs + ) throws Exception { + HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); + String sparkPropertiesPath = + Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala()); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkMain.SparkCommand.CLUSTERING_RUN.toString(), client.getBasePath(), + client.getTableConfig().getTableName(), clusteringInstantTime, parallelism, sparkMemory, retry, propsFilePath); + UtilHelpers.validateAndAddProperties(configs, sparkLauncher); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return "Failed to run clustering for " + clusteringInstantTime; + } + return "Clustering successfully completed for " + clusteringInstantTime; + + } +} diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 7833ee750b5c..e47b4bc907ec 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -41,6 +41,7 @@ import org.apache.hudi.utilities.HDFSParquetImporter; import org.apache.hudi.utilities.HDFSParquetImporter.Config; import org.apache.hudi.utilities.HoodieCleaner; +import org.apache.hudi.utilities.HoodieClusteringJob; import org.apache.hudi.utilities.HoodieCompactionAdminTool; import org.apache.hudi.utilities.HoodieCompactionAdminTool.Operation; import org.apache.hudi.utilities.HoodieCompactor; @@ -70,7 +71,8 @@ public class SparkMain { */ enum SparkCommand { BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, - COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLEAN, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE + COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLUSTERING_SCHEDULE, + CLUSTERING_RUN, CLEAN, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE } public static void main(String[] args) throws Exception { @@ -159,6 +161,31 @@ public static void main(String[] args) throws Exception { Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8])); returnCode = 0; break; + case CLUSTERING_RUN: + assert (args.length >= 9); + propsFilePath = null; + if (!StringUtils.isNullOrEmpty(args[8])) { + propsFilePath = args[8]; + } + configs = new ArrayList<>(); + if (args.length > 9) { + configs.addAll(Arrays.asList(args).subList(9, args.length)); + } + returnCode = cluster(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], + Integer.parseInt(args[6]), false, propsFilePath, configs); + break; + case CLUSTERING_SCHEDULE: + assert (args.length >= 6); + propsFilePath = null; + if (!StringUtils.isNullOrEmpty(args[5])) { + propsFilePath = args[5]; + } + configs = new ArrayList<>(); + if (args.length > 6) { + configs.addAll(Arrays.asList(args).subList(6, args.length)); + } + returnCode = cluster(jsc, args[1], args[2], args[3], 1, args[4], 0, true, propsFilePath, configs); + break; case CLEAN: assert (args.length >= 5); propsFilePath = null; @@ -312,6 +339,20 @@ private static int compact(JavaSparkContext jsc, String basePath, String tableNa return new HoodieCompactor(jsc, cfg).compact(retry); } + private static int cluster(JavaSparkContext jsc, String basePath, String tableName, String clusteringInstant, + int parallelism, String sparkMemory, int retry, boolean schedule, String propsFilePath, List configs) { + HoodieClusteringJob.Config cfg = new HoodieClusteringJob.Config(); + cfg.basePath = basePath; + cfg.tableName = tableName; + cfg.clusteringInstantTime = clusteringInstant; + cfg.parallelism = parallelism; + cfg.runSchedule = schedule; + cfg.propsFilePath = propsFilePath; + cfg.configs = configs; + jsc.getConf().set("spark.executor.memory", sparkMemory); + return new HoodieClusteringJob(jsc, cfg).cluster(retry); + } + private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplicatedPartitionPath, String repairedOutputPath, String basePath, boolean dryRun, String dedupeType) { DedupeSparkJob job = new DedupeSparkJob(basePath, duplicatedPartitionPath, repairedOutputPath, new SQLContext(jsc), From 28db22c08c81d20e809466959df44d186d404434 Mon Sep 17 00:00:00 2001 From: Jintao Guan Date: Mon, 5 Apr 2021 16:07:02 -0700 Subject: [PATCH 02/10] update --- .../apache/hudi/cli/commands/ClusteringCommand.java | 5 +++-- .../java/org/apache/hudi/cli/commands/SparkMain.java | 10 +++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java index c4f4483adadd..6e1195f317cf 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java @@ -19,6 +19,7 @@ package org.apache.hudi.cli.commands; import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.commands.SparkMain.SparkCommand; import org.apache.hudi.cli.utils.InputStreamConsumer; import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -56,7 +57,7 @@ public String scheduleClustering( // First get a clustering instant time and pass it to spark launcher for scheduling clustering String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); - sparkLauncher.addAppArgs(SparkMain.SparkCommand.CLUSTERING_SCHEDULE.toString(), client.getBasePath(), + sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_SCHEDULE.toString(), client.getBasePath(), client.getTableConfig().getTableName(), clusteringInstantTime, sparkMemory, propsFilePath); UtilHelpers.validateAndAddProperties(configs, sparkLauncher); Process process = sparkLauncher.launch(); @@ -85,7 +86,7 @@ public String runClustering( String sparkPropertiesPath = Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala()); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); - sparkLauncher.addAppArgs(SparkMain.SparkCommand.CLUSTERING_RUN.toString(), client.getBasePath(), + sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_RUN.toString(), client.getBasePath(), client.getTableConfig().getTableName(), clusteringInstantTime, parallelism, sparkMemory, retry, propsFilePath); UtilHelpers.validateAndAddProperties(configs, sparkLauncher); Process process = sparkLauncher.launch(); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index e47b4bc907ec..27bc13c43ca6 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -162,14 +162,14 @@ public static void main(String[] args) throws Exception { returnCode = 0; break; case CLUSTERING_RUN: - assert (args.length >= 9); + assert (args.length >= 8); propsFilePath = null; - if (!StringUtils.isNullOrEmpty(args[8])) { - propsFilePath = args[8]; + if (!StringUtils.isNullOrEmpty(args[7])) { + propsFilePath = args[7]; } configs = new ArrayList<>(); - if (args.length > 9) { - configs.addAll(Arrays.asList(args).subList(9, args.length)); + if (args.length > 8) { + configs.addAll(Arrays.asList(args).subList(8, args.length)); } returnCode = cluster(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], Integer.parseInt(args[6]), false, propsFilePath, configs); From 1731cd748ecae70082e39790c6e61e22477f997a Mon Sep 17 00:00:00 2001 From: Jintao Guan Date: Mon, 5 Apr 2021 18:14:01 -0700 Subject: [PATCH 03/10] update unit test --- .../hudi/utilities/HoodieClusteringJob.java | 14 ++++------- .../functional/TestHoodieDeltaStreamer.java | 23 ++++++++----------- 2 files changed, 15 insertions(+), 22 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java index 44328d3d6597..6d613db6c536 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -122,12 +122,7 @@ public int cluster(int retry) { int ret = UtilHelpers.retry(retry, () -> { if (cfg.runSchedule) { LOG.info("Do schedule"); - Option instantTime = doSchedule(jsc); - int result = instantTime.isPresent() ? 0 : -1; - if (result == 0) { - LOG.info("The schedule instant time is " + instantTime.get()); - } - return result; + return doSchedule(jsc); } else { LOG.info("Do cluster"); return doCluster(jsc); @@ -156,15 +151,16 @@ private int doCluster(JavaSparkContext jsc) throws Exception { } @TestOnly - public Option doSchedule() throws Exception { + public int doSchedule() throws Exception { return this.doSchedule(jsc); } - private Option doSchedule(JavaSparkContext jsc) throws Exception { + private int doSchedule(JavaSparkContext jsc) throws Exception { String schemaStr = getSchemaFromLatestInstant(); SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props); - return client.scheduleClustering(Option.empty()); + client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty()); + return 0; } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 8d837fd0972b..62941cfbaf82 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; @@ -1013,26 +1014,22 @@ public void testHoodieAsyncClusteringJob() throws Exception { HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); + String scheduleClusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath, - null, true); + scheduleClusteringInstantTime, true); HoodieClusteringJob scheduleClusteringJob = new HoodieClusteringJob(jsc, scheduleClusteringConfig); - Option scheduleClusteringInstantTime = Option.empty(); try { - scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule(); + scheduleClusteringJob.doSchedule(); } catch (Exception e) { LOG.warn("Schedule clustering failed", e); return false; } - if (scheduleClusteringInstantTime.isPresent()) { - LOG.info("Schedule clustering success, now cluster with instant time " + scheduleClusteringInstantTime.get()); - HoodieClusteringJob.Config clusterClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath, - scheduleClusteringInstantTime.get(), false); - HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(jsc, clusterClusteringConfig); - clusterClusteringJob.cluster(clusterClusteringConfig.retry); - LOG.info("Cluster success"); - } else { - LOG.warn("Schedule clustering failed"); - } + LOG.info("Schedule clustering success, now cluster with instant time " + scheduleClusteringInstantTime); + HoodieClusteringJob.Config clusterClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath, + scheduleClusteringInstantTime, false); + HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(jsc, clusterClusteringConfig); + clusterClusteringJob.cluster(clusterClusteringConfig.retry); + LOG.info("Cluster success"); HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build(); int pendingReplaceSize = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length; int completeReplaceSize = metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length; From d49e8b01f6c9a212b0e9324a7f128d6c5e0fabda Mon Sep 17 00:00:00 2001 From: Jintao Guan Date: Mon, 5 Apr 2021 23:07:52 -0700 Subject: [PATCH 04/10] update --- .../apache/hudi/cli/commands/ClusteringCommand.java | 2 +- .../cluster/SparkClusteringPlanActionExecutor.java | 10 ++++++++++ .../org/apache/hudi/utilities/HoodieClusteringJob.java | 7 +++++-- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java index 6e1195f317cf..5f333fa8316a 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java @@ -44,7 +44,7 @@ public class ClusteringCommand implements CommandMarker { public String scheduleClustering( @CliOption(key = "sparkMemory", help = "Spark executor memory", unspecifiedDefaultValue = "1G") final String sparkMemory, - @CliOption(key = "propsFilePath", mandatory = true, help = "path to properties file on localfs or dfs with configurations for hoodie client for clustering", + @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for clustering", unspecifiedDefaultValue = "") final String propsFilePath, @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array", unspecifiedDefaultValue = "") final String[] configs) throws Exception { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java index 1f71aa4dfac7..9a0a94bb5a74 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java @@ -52,6 +52,16 @@ public SparkClusteringPlanActionExecutor(HoodieEngineContext context, @Override protected Option createClusteringPlan() { + LOG.info("=============> hoodie.clustering.inline.max.commits:" + config.getInlineClusterMaxCommits()); + LOG.info("=============> hoodie.clustering.plan.strategy.class:" + config.getClusteringPlanStrategyClass()); + LOG.info("=============> hoodie.clustering.execution.strategy.class:" + config.getClusteringExecutionStrategyClass()); + LOG.info("=============> hoodie.clustering.plan.strategy.small.file.limit:" + config.getClusteringSmallFileLimit()); + LOG.info("=============> hoodie.clustering.plan.strategy.max.bytes.per.group:" + config.getClusteringMaxBytesInGroup()); + LOG.info("=============> hoodie.clustering.plan.strategy.max.num.groups:" + config.getClusteringMaxNumGroups()); + LOG.info("=============> hoodie.clustering.plan.strategy.target.file.max.bytes:" + config.getClusteringTargetFileMaxBytes()); + LOG.info("=============> hoodie.clustering.plan.strategy.sort.columns:" + config.getClusteringSortColumns()); + LOG.info("=============> hoodie.cleaner.commits.retained:" + config.getCleanerCommitsRetained()); + LOG.info("Checking if clustering needs to be run on " + config.getBasePath()); Option lastClusteringInstant = table.getActiveTimeline().getCompletedReplaceTimeline().lastInstant(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java index 6d613db6c536..02d557a66c0c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -122,7 +122,11 @@ public int cluster(int retry) { int ret = UtilHelpers.retry(retry, () -> { if (cfg.runSchedule) { LOG.info("Do schedule"); - return doSchedule(jsc); + int result = doSchedule(jsc); + if (result == 0) { + LOG.info("The schedule instant time is " + cfg.clusteringInstantTime); + } + return result; } else { LOG.info("Do cluster"); return doCluster(jsc); @@ -162,5 +166,4 @@ private int doSchedule(JavaSparkContext jsc) throws Exception { client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty()); return 0; } - } From a85a0fe408250506519a8dd58be0e1ba93932257 Mon Sep 17 00:00:00 2001 From: Jintao Guan Date: Mon, 5 Apr 2021 23:24:28 -0700 Subject: [PATCH 05/10] update --- .../cluster/SparkClusteringPlanActionExecutor.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java index 9a0a94bb5a74..1f71aa4dfac7 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java @@ -52,16 +52,6 @@ public SparkClusteringPlanActionExecutor(HoodieEngineContext context, @Override protected Option createClusteringPlan() { - LOG.info("=============> hoodie.clustering.inline.max.commits:" + config.getInlineClusterMaxCommits()); - LOG.info("=============> hoodie.clustering.plan.strategy.class:" + config.getClusteringPlanStrategyClass()); - LOG.info("=============> hoodie.clustering.execution.strategy.class:" + config.getClusteringExecutionStrategyClass()); - LOG.info("=============> hoodie.clustering.plan.strategy.small.file.limit:" + config.getClusteringSmallFileLimit()); - LOG.info("=============> hoodie.clustering.plan.strategy.max.bytes.per.group:" + config.getClusteringMaxBytesInGroup()); - LOG.info("=============> hoodie.clustering.plan.strategy.max.num.groups:" + config.getClusteringMaxNumGroups()); - LOG.info("=============> hoodie.clustering.plan.strategy.target.file.max.bytes:" + config.getClusteringTargetFileMaxBytes()); - LOG.info("=============> hoodie.clustering.plan.strategy.sort.columns:" + config.getClusteringSortColumns()); - LOG.info("=============> hoodie.cleaner.commits.retained:" + config.getCleanerCommitsRetained()); - LOG.info("Checking if clustering needs to be run on " + config.getBasePath()); Option lastClusteringInstant = table.getActiveTimeline().getCompletedReplaceTimeline().lastInstant(); From 58748ab7e4679103c1681ddf72499d7aee4fe859 Mon Sep 17 00:00:00 2001 From: Jintao Guan Date: Tue, 6 Apr 2021 23:41:31 -0700 Subject: [PATCH 06/10] update CLI parameters --- .../hudi/cli/commands/ClusteringCommand.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java index 5f333fa8316a..ff25e926aafc 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java @@ -52,7 +52,6 @@ public String scheduleClustering( String sparkPropertiesPath = Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala()); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); - sparkLauncher.setMaster("local[4]"); // First get a clustering instant time and pass it to spark launcher for scheduling clustering String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); @@ -71,12 +70,14 @@ public String scheduleClustering( @CliCommand(value = "clustering run", help = "Run Clustering") public String runClustering( - @CliOption(key = "parallelism", mandatory = true, - help = "Parallelism for hoodie clustering") final String parallelism, - @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", - help = "Spark executor memory") final String sparkMemory, - @CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") final String retry, - @CliOption(key = "clusteringInstant", help = "Base path for the target hoodie table") final String clusteringInstantTime, + @CliOption(key = "parallelism", help = "Parallelism for hoodie clustering", + unspecifiedDefaultValue = "1") final String parallelism, + @CliOption(key = "sparkMemory", help = "Spark executor memory", + unspecifiedDefaultValue = "4G") final String sparkMemory, + @CliOption(key = "retry", help = "Number of retries", + unspecifiedDefaultValue = "1") final String retry, + @CliOption(key = "clusteringInstant", help = "Clustering instant time", + mandatory = true ) final String clusteringInstantTime, @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting", unspecifiedDefaultValue = "") final String propsFilePath, @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array", From fc2f340601578f0b3603b2e41db9944f553803f8 Mon Sep 17 00:00:00 2001 From: Jintao Guan Date: Wed, 7 Apr 2021 00:06:02 -0700 Subject: [PATCH 07/10] linting --- .../java/org/apache/hudi/cli/commands/ClusteringCommand.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java index ff25e926aafc..2f4eb8138c74 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java @@ -77,7 +77,7 @@ public String runClustering( @CliOption(key = "retry", help = "Number of retries", unspecifiedDefaultValue = "1") final String retry, @CliOption(key = "clusteringInstant", help = "Clustering instant time", - mandatory = true ) final String clusteringInstantTime, + mandatory = true) final String clusteringInstantTime, @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting", unspecifiedDefaultValue = "") final String propsFilePath, @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array", From c67cdc6771d987d64bc4ed63dba8807a7a994154 Mon Sep 17 00:00:00 2001 From: Jintao Guan Date: Wed, 7 Apr 2021 17:25:16 -0700 Subject: [PATCH 08/10] update doSchedule in HoodieClusteringJob --- .../hudi/utilities/HoodieClusteringJob.java | 16 ++++++++----- .../functional/TestHoodieDeltaStreamer.java | 24 ++++++++++--------- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java index 02d557a66c0c..1e10eef2420c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -122,9 +122,10 @@ public int cluster(int retry) { int ret = UtilHelpers.retry(retry, () -> { if (cfg.runSchedule) { LOG.info("Do schedule"); - int result = doSchedule(jsc); + Option instantTime = doSchedule(jsc); + int result = instantTime.isPresent() ? 0 : -1; if (result == 0) { - LOG.info("The schedule instant time is " + cfg.clusteringInstantTime); + LOG.info("The schedule instant time is " + instantTime.get()); } return result; } else { @@ -155,15 +156,18 @@ private int doCluster(JavaSparkContext jsc) throws Exception { } @TestOnly - public int doSchedule() throws Exception { + public Option doSchedule() throws Exception { return this.doSchedule(jsc); } - private int doSchedule(JavaSparkContext jsc) throws Exception { + private Option doSchedule(JavaSparkContext jsc) throws Exception { String schemaStr = getSchemaFromLatestInstant(); SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props); - client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty()); - return 0; + if (cfg.clusteringInstantTime != null) { + client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty()); + return Option.of(cfg.clusteringInstantTime); + } + return client.scheduleClustering(Option.empty()); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 62941cfbaf82..c3415163fd0c 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -31,7 +31,6 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; @@ -1003,7 +1002,6 @@ public void testHoodieAsyncClusteringJob() throws Exception { String tableBasePath = dfsBasePath + "/asyncClustering"; // Keep it higher than batch-size to test continuous mode int totalRecords = 3000; - // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); cfg.continuousMode = true; @@ -1014,22 +1012,26 @@ public void testHoodieAsyncClusteringJob() throws Exception { HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); - String scheduleClusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath, - scheduleClusteringInstantTime, true); + null, true); HoodieClusteringJob scheduleClusteringJob = new HoodieClusteringJob(jsc, scheduleClusteringConfig); + Option scheduleClusteringInstantTime = Option.empty(); try { - scheduleClusteringJob.doSchedule(); + scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule(); } catch (Exception e) { LOG.warn("Schedule clustering failed", e); return false; } - LOG.info("Schedule clustering success, now cluster with instant time " + scheduleClusteringInstantTime); - HoodieClusteringJob.Config clusterClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath, - scheduleClusteringInstantTime, false); - HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(jsc, clusterClusteringConfig); - clusterClusteringJob.cluster(clusterClusteringConfig.retry); - LOG.info("Cluster success"); + if (scheduleClusteringInstantTime.isPresent()) { + LOG.info("Schedule clustering success, now cluster with instant time " + scheduleClusteringInstantTime.get()); + HoodieClusteringJob.Config clusterClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath, + scheduleClusteringInstantTime.get(), false); + HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(jsc, clusterClusteringConfig); + clusterClusteringJob.cluster(clusterClusteringConfig.retry); + LOG.info("Cluster success"); + } else { + LOG.warn("Schedule clustering failed"); + } HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build(); int pendingReplaceSize = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length; int completeReplaceSize = metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length; From 582e34860a1984b4d2cabab7cee9b82072a01325 Mon Sep 17 00:00:00 2001 From: Jintao Guan Date: Wed, 7 Apr 2021 17:35:53 -0700 Subject: [PATCH 09/10] update --- .../hudi/utilities/functional/TestHoodieDeltaStreamer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index c3415163fd0c..8d837fd0972b 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -1002,6 +1002,7 @@ public void testHoodieAsyncClusteringJob() throws Exception { String tableBasePath = dfsBasePath + "/asyncClustering"; // Keep it higher than batch-size to test continuous mode int totalRecords = 3000; + // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); cfg.continuousMode = true; From 079375f4906107e85cb8d4ea193baf57aaf6c8d4 Mon Sep 17 00:00:00 2001 From: Jintao Guan Date: Wed, 7 Apr 2021 23:34:48 -0700 Subject: [PATCH 10/10] update diff according to comments --- .../apache/hudi/cli/commands/ClusteringCommand.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java index 2f4eb8138c74..092f9270b964 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java @@ -49,6 +49,9 @@ public String scheduleClustering( @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array", unspecifiedDefaultValue = "") final String[] configs) throws Exception { HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); + boolean initialized = HoodieCLI.initConf(); + HoodieCLI.initFS(initialized); + String sparkPropertiesPath = Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala()); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); @@ -65,7 +68,7 @@ public String scheduleClustering( if (exitCode != 0) { return "Failed to schedule clustering for " + clusteringInstantTime; } - return "Attempted to schedule clustering for " + clusteringInstantTime; + return "Succeeded to schedule clustering for " + clusteringInstantTime; } @CliCommand(value = "clustering run", help = "Run Clustering") @@ -84,6 +87,9 @@ public String runClustering( unspecifiedDefaultValue = "") final String[] configs ) throws Exception { HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); + boolean initialized = HoodieCLI.initConf(); + HoodieCLI.initFS(initialized); + String sparkPropertiesPath = Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala()); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); @@ -96,7 +102,6 @@ public String runClustering( if (exitCode != 0) { return "Failed to run clustering for " + clusteringInstantTime; } - return "Clustering successfully completed for " + clusteringInstantTime; - + return "Succeeded to run clustering for " + clusteringInstantTime; } }