Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-1764] Add Hudi-CLI support for clustering #2773

Merged
merged 10 commits into from
Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.cli.commands;

import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
import scala.collection.JavaConverters;

@Component
public class ClusteringCommand implements CommandMarker {

private static final Logger LOG = LogManager.getLogger(ClusteringCommand.class);

@CliCommand(value = "clustering schedule", help = "Schedule Clustering")
public String scheduleClustering(
@CliOption(key = "sparkMemory", help = "Spark executor memory",
unspecifiedDefaultValue = "1G") final String sparkMemory,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for clustering",
unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
unspecifiedDefaultValue = "") final String[] configs) throws Exception {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we do not need initfs just like compaction command?
HoodieTableMetaClient client = checkAndGetMetaClient();
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Thanks.

String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);

// First get a clustering instant time and pass it to spark launcher for scheduling clustering
String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();

sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_SCHEDULE.toString(), client.getBasePath(),
client.getTableConfig().getTableName(), clusteringInstantTime, sparkMemory, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to schedule clustering for " + clusteringInstantTime;
}
return "Attempted to schedule clustering for " + clusteringInstantTime;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Succeed to schedule clustering for " + clusteringInstantTime

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

}

@CliCommand(value = "clustering run", help = "Run Clustering")
public String runClustering(
@CliOption(key = "parallelism", help = "Parallelism for hoodie clustering",
unspecifiedDefaultValue = "1") final String parallelism,
@CliOption(key = "sparkMemory", help = "Spark executor memory",
unspecifiedDefaultValue = "4G") final String sparkMemory,
@CliOption(key = "retry", help = "Number of retries",
unspecifiedDefaultValue = "1") final String retry,
@CliOption(key = "clusteringInstant", help = "Clustering instant time",
mandatory = true) final String clusteringInstantTime,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting",
unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
unspecifiedDefaultValue = "") final String[] configs
) throws Exception {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_RUN.toString(), client.getBasePath(),
client.getTableConfig().getTableName(), clusteringInstantTime, parallelism, sparkMemory, retry, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to run clustering for " + clusteringInstantTime;
}
return "Clustering successfully completed for " + clusteringInstantTime;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Succeed to run clustering for " + clusteringInstantTime

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hudi.utilities.HDFSParquetImporter;
import org.apache.hudi.utilities.HDFSParquetImporter.Config;
import org.apache.hudi.utilities.HoodieCleaner;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.HoodieCompactionAdminTool;
import org.apache.hudi.utilities.HoodieCompactionAdminTool.Operation;
import org.apache.hudi.utilities.HoodieCompactor;
Expand Down Expand Up @@ -70,7 +71,8 @@ public class SparkMain {
*/
enum SparkCommand {
BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN,
COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLEAN, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE
COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLUSTERING_SCHEDULE,
CLUSTERING_RUN, CLEAN, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE
}

public static void main(String[] args) throws Exception {
Expand Down Expand Up @@ -159,6 +161,31 @@ public static void main(String[] args) throws Exception {
Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8]));
returnCode = 0;
break;
case CLUSTERING_RUN:
assert (args.length >= 8);
propsFilePath = null;
if (!StringUtils.isNullOrEmpty(args[7])) {
propsFilePath = args[7];
}
configs = new ArrayList<>();
if (args.length > 8) {
configs.addAll(Arrays.asList(args).subList(8, args.length));
}
returnCode = cluster(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5],
Integer.parseInt(args[6]), false, propsFilePath, configs);
break;
case CLUSTERING_SCHEDULE:
assert (args.length >= 6);
propsFilePath = null;
if (!StringUtils.isNullOrEmpty(args[5])) {
propsFilePath = args[5];
}
configs = new ArrayList<>();
if (args.length > 6) {
configs.addAll(Arrays.asList(args).subList(6, args.length));
}
returnCode = cluster(jsc, args[1], args[2], args[3], 1, args[4], 0, true, propsFilePath, configs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clusteringInstant use schedule generate will be more resonable. Because user set instant time may conflict with hudi. More information can see comments of #2379

Copy link
Contributor Author

@jintaoguan jintaoguan Apr 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The Clustering Instant here (args[4]) is generated by HoodieActiveTimeline.createNewInstantTime(); at Line 57 of ClusteringCommand.java above. Users cannot set instant time directly for clustering.

break;
case CLEAN:
assert (args.length >= 5);
propsFilePath = null;
Expand Down Expand Up @@ -312,6 +339,20 @@ private static int compact(JavaSparkContext jsc, String basePath, String tableNa
return new HoodieCompactor(jsc, cfg).compact(retry);
}

private static int cluster(JavaSparkContext jsc, String basePath, String tableName, String clusteringInstant,
int parallelism, String sparkMemory, int retry, boolean schedule, String propsFilePath, List<String> configs) {
HoodieClusteringJob.Config cfg = new HoodieClusteringJob.Config();
cfg.basePath = basePath;
cfg.tableName = tableName;
cfg.clusteringInstantTime = clusteringInstant;
cfg.parallelism = parallelism;
cfg.runSchedule = schedule;
cfg.propsFilePath = propsFilePath;
cfg.configs = configs;
jsc.getConf().set("spark.executor.memory", sparkMemory);
return new HoodieClusteringJob(jsc, cfg).cluster(retry);
}

private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplicatedPartitionPath,
String repairedOutputPath, String basePath, boolean dryRun, String dedupeType) {
DedupeSparkJob job = new DedupeSparkJob(basePath, duplicatedPartitionPath, repairedOutputPath, new SQLContext(jsc),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,9 @@ public int cluster(int retry) {
int ret = UtilHelpers.retry(retry, () -> {
if (cfg.runSchedule) {
LOG.info("Do schedule");
Option<String> instantTime = doSchedule(jsc);
int result = instantTime.isPresent() ? 0 : -1;
int result = doSchedule(jsc);
if (result == 0) {
LOG.info("The schedule instant time is " + instantTime.get());
LOG.info("The schedule instant time is " + cfg.clusteringInstantTime);
}
return result;
} else {
Expand Down Expand Up @@ -156,15 +155,15 @@ private int doCluster(JavaSparkContext jsc) throws Exception {
}

@TestOnly
public Option<String> doSchedule() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return the schedule instant time will be more clear.

Copy link
Contributor Author

@jintaoguan jintaoguan Apr 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schedule instant time is already in HoodieClusteringJob.Config. If doSchedule() succeeds and returns 0, we should be able to get the clustering instant time from the config.
I am trying to use the same patern as doSchedule() of HoodieCompactor. Correct me if I misunderstand it.

public int doSchedule() throws Exception {
return this.doSchedule(jsc);
}

private Option<String> doSchedule(JavaSparkContext jsc) throws Exception {
private int doSchedule(JavaSparkContext jsc) throws Exception {
String schemaStr = getSchemaFromLatestInstant();
SparkRDDWriteClient client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props);
return client.scheduleClustering(Option.empty());
client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty());
return 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
Expand Down Expand Up @@ -1013,26 +1014,22 @@ public void testHoodieAsyncClusteringJob() throws Exception {
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
String scheduleClusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this have change HoodieClusteringJob usage mode, Now if user use HoodieClusteringJob need first HoodieActiveTimeline.createNewInstantTime(); Can we compatibility old usage mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I will make it compatible with the old usage mode. The behavior will be

  1. if the user provides an instant time, we will use it to schedule clustering and return it to the user.
  2. if the user doesn't provide an instant time, we will generate one and return it to the user.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath,
null, true);
scheduleClusteringInstantTime, true);
HoodieClusteringJob scheduleClusteringJob = new HoodieClusteringJob(jsc, scheduleClusteringConfig);
Option<String> scheduleClusteringInstantTime = Option.empty();
try {
scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule();
scheduleClusteringJob.doSchedule();
} catch (Exception e) {
LOG.warn("Schedule clustering failed", e);
return false;
}
if (scheduleClusteringInstantTime.isPresent()) {
LOG.info("Schedule clustering success, now cluster with instant time " + scheduleClusteringInstantTime.get());
HoodieClusteringJob.Config clusterClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath,
scheduleClusteringInstantTime.get(), false);
HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(jsc, clusterClusteringConfig);
clusterClusteringJob.cluster(clusterClusteringConfig.retry);
LOG.info("Cluster success");
} else {
LOG.warn("Schedule clustering failed");
}
LOG.info("Schedule clustering success, now cluster with instant time " + scheduleClusteringInstantTime);
HoodieClusteringJob.Config clusterClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath,
scheduleClusteringInstantTime, false);
HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(jsc, clusterClusteringConfig);
clusterClusteringJob.cluster(clusterClusteringConfig.retry);
LOG.info("Cluster success");
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
int pendingReplaceSize = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length;
int completeReplaceSize = metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length;
Expand Down