diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java new file mode 100644 index 000000000000..092f9270b964 --- /dev/null +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.commands; + +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.commands.SparkMain.SparkCommand; +import org.apache.hudi.cli.utils.InputStreamConsumer; +import org.apache.hudi.cli.utils.SparkUtil; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.utilities.UtilHelpers; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.launcher.SparkLauncher; +import org.apache.spark.util.Utils; +import org.springframework.shell.core.CommandMarker; +import org.springframework.shell.core.annotation.CliCommand; +import org.springframework.shell.core.annotation.CliOption; +import org.springframework.stereotype.Component; +import scala.collection.JavaConverters; + +@Component +public class ClusteringCommand implements CommandMarker { + + private static final Logger LOG = LogManager.getLogger(ClusteringCommand.class); + + @CliCommand(value = "clustering schedule", help = "Schedule Clustering") + public String scheduleClustering( + @CliOption(key = "sparkMemory", help = "Spark executor memory", + unspecifiedDefaultValue = "1G") final String sparkMemory, + @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for clustering", + unspecifiedDefaultValue = "") final String propsFilePath, + @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array", + unspecifiedDefaultValue = "") final String[] configs) throws Exception { + HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); + boolean initialized = HoodieCLI.initConf(); + HoodieCLI.initFS(initialized); + + String sparkPropertiesPath = + Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala()); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + + // First get a clustering instant time and pass it to spark launcher for scheduling clustering + String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); + + sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_SCHEDULE.toString(), client.getBasePath(), + client.getTableConfig().getTableName(), clusteringInstantTime, sparkMemory, propsFilePath); + UtilHelpers.validateAndAddProperties(configs, sparkLauncher); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return "Failed to schedule clustering for " + clusteringInstantTime; + } + return "Succeeded to schedule clustering for " + clusteringInstantTime; + } + + @CliCommand(value = "clustering run", help = "Run Clustering") + public String runClustering( + @CliOption(key = "parallelism", help = "Parallelism for hoodie clustering", + unspecifiedDefaultValue = "1") final String parallelism, + @CliOption(key = "sparkMemory", help = "Spark executor memory", + unspecifiedDefaultValue = "4G") final String sparkMemory, + @CliOption(key = "retry", help = "Number of retries", + unspecifiedDefaultValue = "1") final String retry, + @CliOption(key = "clusteringInstant", help = "Clustering instant time", + mandatory = true) final String clusteringInstantTime, + @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting", + unspecifiedDefaultValue = "") final String propsFilePath, + @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array", + unspecifiedDefaultValue = "") final String[] configs + ) throws Exception { + HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); + boolean initialized = HoodieCLI.initConf(); + HoodieCLI.initFS(initialized); + + String sparkPropertiesPath = + Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala()); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_RUN.toString(), client.getBasePath(), + client.getTableConfig().getTableName(), clusteringInstantTime, parallelism, sparkMemory, retry, propsFilePath); + UtilHelpers.validateAndAddProperties(configs, sparkLauncher); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return "Failed to run clustering for " + clusteringInstantTime; + } + return "Succeeded to run clustering for " + clusteringInstantTime; + } +} diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 7833ee750b5c..27bc13c43ca6 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -41,6 +41,7 @@ import org.apache.hudi.utilities.HDFSParquetImporter; import org.apache.hudi.utilities.HDFSParquetImporter.Config; import org.apache.hudi.utilities.HoodieCleaner; +import org.apache.hudi.utilities.HoodieClusteringJob; import org.apache.hudi.utilities.HoodieCompactionAdminTool; import org.apache.hudi.utilities.HoodieCompactionAdminTool.Operation; import org.apache.hudi.utilities.HoodieCompactor; @@ -70,7 +71,8 @@ public class SparkMain { */ enum SparkCommand { BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, - COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLEAN, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE + COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLUSTERING_SCHEDULE, + CLUSTERING_RUN, CLEAN, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE } public static void main(String[] args) throws Exception { @@ -159,6 +161,31 @@ public static void main(String[] args) throws Exception { Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8])); returnCode = 0; break; + case CLUSTERING_RUN: + assert (args.length >= 8); + propsFilePath = null; + if (!StringUtils.isNullOrEmpty(args[7])) { + propsFilePath = args[7]; + } + configs = new ArrayList<>(); + if (args.length > 8) { + configs.addAll(Arrays.asList(args).subList(8, args.length)); + } + returnCode = cluster(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], + Integer.parseInt(args[6]), false, propsFilePath, configs); + break; + case CLUSTERING_SCHEDULE: + assert (args.length >= 6); + propsFilePath = null; + if (!StringUtils.isNullOrEmpty(args[5])) { + propsFilePath = args[5]; + } + configs = new ArrayList<>(); + if (args.length > 6) { + configs.addAll(Arrays.asList(args).subList(6, args.length)); + } + returnCode = cluster(jsc, args[1], args[2], args[3], 1, args[4], 0, true, propsFilePath, configs); + break; case CLEAN: assert (args.length >= 5); propsFilePath = null; @@ -312,6 +339,20 @@ private static int compact(JavaSparkContext jsc, String basePath, String tableNa return new HoodieCompactor(jsc, cfg).compact(retry); } + private static int cluster(JavaSparkContext jsc, String basePath, String tableName, String clusteringInstant, + int parallelism, String sparkMemory, int retry, boolean schedule, String propsFilePath, List configs) { + HoodieClusteringJob.Config cfg = new HoodieClusteringJob.Config(); + cfg.basePath = basePath; + cfg.tableName = tableName; + cfg.clusteringInstantTime = clusteringInstant; + cfg.parallelism = parallelism; + cfg.runSchedule = schedule; + cfg.propsFilePath = propsFilePath; + cfg.configs = configs; + jsc.getConf().set("spark.executor.memory", sparkMemory); + return new HoodieClusteringJob(jsc, cfg).cluster(retry); + } + private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplicatedPartitionPath, String repairedOutputPath, String basePath, boolean dryRun, String dedupeType) { DedupeSparkJob job = new DedupeSparkJob(basePath, duplicatedPartitionPath, repairedOutputPath, new SQLContext(jsc), diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java index 44328d3d6597..1e10eef2420c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -164,7 +164,10 @@ private Option doSchedule(JavaSparkContext jsc) throws Exception { String schemaStr = getSchemaFromLatestInstant(); SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props); + if (cfg.clusteringInstantTime != null) { + client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty()); + return Option.of(cfg.clusteringInstantTime); + } return client.scheduleClustering(Option.empty()); } - }