Skip to content

Commit

Permalink
improve javadoc & add compactionStrategyClassName to config
Browse files Browse the repository at this point in the history
  • Loading branch information
stream2000 committed Sep 6, 2023
1 parent 974126f commit 31dabb9
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import org.slf4j.LoggerFactory;

/**
* Archive task to run in TableServicePipeline
* Archive task to run in TableServicePipeline.
*
* @see HoodieMultiTableServicesMain
*/
class ArchiveTask extends TableServiceTask {
Expand All @@ -48,16 +49,39 @@ void run() {
}
}

/**
* Utility to create builder for {@link ArchiveTask}.
*
* @return Builder for {@link ArchiveTask}.
*/
public static Builder newBuilder() {
return new Builder();
}

/**
* Builder class for {@link ArchiveTask}.
*/
public static final class Builder {
/**
* Properties for running archive task which are already consolidated w/ CLI provided config-overrides.
*/
private TypedProperties props;

/**
* Hoodie table path for running archive task.
*/
private String basePath;
private JavaSparkContext jsc;

/**
* Number of retries.
*/
private int retry;

/**
* JavaSparkContext to run spark job.
*/
private JavaSparkContext jsc;

public Builder withProps(TypedProperties props) {
this.props = props;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import org.apache.spark.api.java.JavaSparkContext;

/**
* Clean task to run in TableServicePipeline
* Clean task to run in TableServicePipeline.
*
* @see HoodieMultiTableServicesMain
*/
class CleanTask extends TableServiceTask {
Expand All @@ -41,16 +42,39 @@ void run() {
}, "Clean Failed");
}

/**
* Utility to create builder for {@link CleanTask}.
*
* @return Builder for {@link CleanTask}.
*/
public static Builder newBuilder() {
return new Builder();
}

/**
* Builder class for {@link CleanTask}.
*/
public static final class Builder {
/**
* Properties for running clean task which are already consolidated w/ CLI provided config-overrides.
*/
private TypedProperties props;

/**
* Hoodie table path for running clean task.
*/
private String basePath;
private JavaSparkContext jsc;

/**
* Number of retries.
*/
private int retry;

/**
* JavaSparkContext to run spark job.
*/
private JavaSparkContext jsc;

public Builder withProps(TypedProperties props) {
this.props = props;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,22 @@
import org.apache.spark.api.java.JavaSparkContext;

/**
* Clustering task to run in TableServicePipeline
* Clustering task to run in TableServicePipeline.
*
* @see HoodieMultiTableServicesMain
*/
class ClusteringTask extends TableServiceTask {

/**
* Parallelism for hoodie clustering.
*/
private int parallelism;

/**
* Mode for running clustering.
*
* @see HoodieClusteringJob.Config#runningMode
*/
private String clusteringMode;

@Override
Expand All @@ -42,16 +52,50 @@ void run() {
new HoodieClusteringJob(jsc, clusteringConfig, props).cluster(retry);
}

/**
* Utility to create builder for {@link ClusteringTask}.
*
* @return Builder for {@link ClusteringTask}.
*/
public static Builder newBuilder() {
return new Builder();
}

/**
* Builder class for {@link ClusteringTask}.
*/
public static final class Builder {

/**
* Properties for running clustering task which are already consolidated w/ CLI provided config-overrides.
*/
private TypedProperties props;

/**
* Parallelism for hoodie clustering.
*/
private int parallelism;

/**
* Clustering mode for running clustering.
*
* @see HoodieClusteringJob.Config#runningMode
*/
private String clusteringMode;

/**
* Hoodie table path for running clustering task.
*/
private String basePath;

/**
* JavaSparkContext to run spark job.
*/
private JavaSparkContext jsc;

/**
* Number of retries.
*/
private int retry;

private Builder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,32 @@
package org.apache.hudi.utilities.multitable;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
import org.apache.hudi.utilities.HoodieCompactor;

import org.apache.spark.api.java.JavaSparkContext;

/**
* Compaction task to run in TableServicePipeline
* Compaction task to run in TableServicePipeline.
*
* @see HoodieMultiTableServicesMain
*/
class CompactionTask extends TableServiceTask {

public String compactionRunningMode = HoodieCompactor.EXECUTE;

public String compactionStrategyName = LogFileSizeBasedCompactionStrategy.class.getName();

/**
* Mode for running compaction.
*
* @see HoodieCompactor.Config#runningMode
*/
public String compactionRunningMode;

/**
* Strategy Class of compaction.
*/
public String compactionStrategyName;

/**
* Parallelism for hoodie clustering.
*/
private int parallelism;

@Override
Expand All @@ -48,16 +59,54 @@ void run() {
new HoodieCompactor(jsc, compactionCfg, props).compact(retry);
}

/**
* Utility to create builder for {@link CompactionTask}.
*
* @return Builder for {@link CompactionTask}.
*/
public static Builder newBuilder() {
return new Builder();
}

/**
* Builder class for {@link CompactionTask}.
*/
public static final class Builder {
/**
* Properties for running compaction task which are already consolidated w/ CLI provided config-overrides.
*/
private TypedProperties props;

/**
* Mode for running compaction.
*
* @see HoodieCompactor.Config#runningMode
*/
private String compactionRunningMode;

/**
* Strategy Class of compaction.
*/
public String compactionStrategyName;

/**
* Parallelism for hoodie compaction.
*/
private int parallelism;

/**
* Number of retries.
*/
private int retry;

/**
* Hoodie table path for running compaction task.
*/
private String basePath;

/**
* JavaSparkContext to run spark job.
*/
private JavaSparkContext jsc;

public Builder withProps(TypedProperties props) {
Expand All @@ -70,6 +119,11 @@ public Builder withCompactionRunningMode(String compactionRunningMode) {
return this;
}

public Builder withCompactionStrategyName(String compactionStrategyName) {
this.compactionStrategyName = compactionStrategyName;
return this;
}

public Builder withParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
Expand All @@ -96,6 +150,7 @@ public CompactionTask build() {
compactionTask.jsc = this.jsc;
compactionTask.parallelism = this.parallelism;
compactionTask.compactionRunningMode = this.compactionRunningMode;
compactionTask.compactionStrategyName = this.compactionStrategyName;
compactionTask.retry = this.retry;
compactionTask.props = this.props;
return compactionTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
import org.apache.hudi.utilities.HoodieCompactor;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
Expand Down Expand Up @@ -48,7 +49,7 @@
import java.util.stream.Collectors;

/**
* Main function for executing multi-table services
* Main function for executing multi-table services.
*/
public class HoodieMultiTableServicesMain {
private static final Logger LOG = LoggerFactory.getLogger(HoodieMultiTableServicesMain.class);
Expand Down Expand Up @@ -188,6 +189,9 @@ public static class Config implements Serializable {
+ "Set \"scheduleAndExecute\" means make a compact plan first and execute that plan immediately")
public String compactionRunningMode = HoodieCompactor.EXECUTE;

@Parameter(names = {"--strategy", "-st"}, description = "Strategy Class")
public String compactionStrategyClassName = LogFileSizeBasedCompactionStrategy.class.getName();

@Parameter(names = {"--clustering-mode"}, description = "Set job mode: Set \"schedule\" means make a clustering plan; "
+ "Set \"execute\" means execute a clustering plan at given instant which means --instant-time is needed here; "
+ "Set \"scheduleAndExecute\" means make a clustering plan first and execute that plan immediately")
Expand Down Expand Up @@ -225,6 +229,7 @@ public String toString() {
.add("enableClean=" + enableClean)
.add("enableArchive=" + enableArchive)
.add("compactionRunningMode='" + compactionRunningMode + "'")
.add("compactionStrategyClassName='" + compactionStrategyClassName + "'")
.add("clusteringRunningMode='" + clusteringRunningMode + "'")
.add("sparkMaster='" + sparkMaster + "'")
.add("sparkMemory='" + sparkMemory + "'")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;

/**
* Utils for executing multi-table services
* Utils for executing multi-table services.
*/
public class MultiTableServiceUtils {

Expand All @@ -63,7 +63,7 @@ public static List<String> getTablesToBeServedFromProps(TypedProperties properti
}

/**
* Type of directories when searching hoodie tables under path
* Type of directories when searching hoodie tables under path.
*/
enum DirType {
HOODIE_TABLE,
Expand Down Expand Up @@ -142,6 +142,7 @@ public static TableServicePipeline buildTableServicePipeline(JavaSparkContext js
.withBasePath(basePath)
.withParallelism(cfg.parallelism)
.withCompactionRunningMode(cfg.compactionRunningMode)
.withCompactionStrategyName(cfg.compactionStrategyClassName)
.withProps(props)
.withRetry(cfg.retry)
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.List;

/**
* TableServicePipeline is a container holding all table services task to execute for a specific hoodie table
* TableServicePipeline is a container holding all table services task to execute for a specific hoodie table.
*/
public class TableServicePipeline {

Expand All @@ -34,16 +34,16 @@ public TableServicePipeline() {
}

/**
* Add a table service task to the end of table service pipe. The task will be executed in FIFO manner
* Add a table service task to the end of table service pipe. The task will be executed in FIFO manner.
*
* @param task table service task to run in pipeline
* @param task table service task to run in pipeline.
*/
public void add(TableServiceTask task) {
tableServiceTasks.add(task);
}

/**
* Run all table services task sequentially
* Run all table services task sequentially.
*/
public void execute() {
tableServiceTasks.forEach(TableServiceTask::run);
Expand Down

0 comments on commit 31dabb9

Please sign in to comment.