Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6481] Support run multi tables services in a single spark job #9558

Merged
merged 6 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ public class HoodieCleaner {
private TypedProperties props;

public HoodieCleaner(Config cfg, JavaSparkContext jssc) {
this(cfg, jssc, UtilHelpers.buildProperties(jssc.hadoopConfiguration(), cfg.propsFilePath, cfg.configs));
}

public HoodieCleaner(Config cfg, JavaSparkContext jssc, TypedProperties props) {
this.cfg = cfg;
this.jssc = jssc;
/*
* Filesystem used.
*/
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
: UtilHelpers.readConfig(jssc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs).getProps(true);
this.props = props;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original code fetches the props either from cmd line or global options, what is the purpose to support custom props per-service then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes in multi-table services we also fetch the props either from cmd line or global options. And we do not want each service to fetch the props by themselves. So here I add a new constructor to use the props that all services shared.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n multi-table services we also fetch the props either from cmd line or global options

The existing constructor already satisfy the needs.

Copy link
Contributor Author

@stream2000 stream2000 Sep 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing constructor already satisfy the needs.

Yes, however if we want to reuse existing constructor, we need pass the prop file path and the cmd line config from HoodieMultiTableServicesMain to HoodieCleaner. And we will parse the config(read props file and use cmd line config to override it) for every TableServiceTask , which is redundant. Current implementation will do the config parsing in HoodieMultiTableServicesMain only, and all the TableServiceTask will use the final config parsed by HoodieMultiTableServicesMain. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine with it.

Copy link
Contributor

@danny0405 danny0405 Sep 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we can force one constructor to invoke the other one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we can make one constructor to invokes the other one.

Thanks for your suggestions~ I have slightly refactored the code and make it cleaner, PTAL

LOG.info("Creating Cleaner with configs : " + props.toString());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
import org.jetbrains.annotations.TestOnly;
import org.slf4j.Logger;
Expand All @@ -57,11 +56,14 @@ public class HoodieClusteringJob {
private HoodieTableMetaClient metaClient;

public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) {
this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(), cfg.propsFilePath, cfg.configs));
}

public HoodieClusteringJob(JavaSparkContext jsc, Config cfg, TypedProperties props) {
this.cfg = cfg;
this.jsc = jsc;
this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
? UtilHelpers.buildProperties(cfg.configs)
: readConfigFromFileSystem(jsc, cfg);
this.props = props;
this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
// Disable async cleaning, will trigger synchronous cleaning manually.
this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false);
this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
Expand All @@ -71,11 +73,6 @@ public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) {
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to do the disable async cleaning and add lock options like the other constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Thanks for reminding


private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
.getProps(true);
}

public static class Config implements Serializable {
@Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
public String basePath = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.beust.jcommander.Parameter;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
Expand All @@ -60,25 +59,22 @@ public class HoodieCompactor {
private final HoodieTableMetaClient metaClient;

public HoodieCompactor(JavaSparkContext jsc, Config cfg) {
this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(), cfg.propsFilePath, cfg.configs));
}

public HoodieCompactor(JavaSparkContext jsc, Config cfg, TypedProperties props) {
this.cfg = cfg;
this.jsc = jsc;
this.props = cfg.propsFilePath == null
? UtilHelpers.buildProperties(cfg.configs)
: readConfigFromFileSystem(jsc, cfg);
this.props = props;
this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
// Disable async cleaning, will trigger synchronous cleaning manually.
this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false);
this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
// add default lock config options if MDT is enabled.
UtilHelpers.addLockOptions(cfg.basePath, this.props);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as in clustering job

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure and updated


private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
.getProps(true);
}

public static class Config implements Serializable {
@Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
public String basePath = null;
Expand Down Expand Up @@ -261,8 +257,7 @@ private int doCompact(JavaSparkContext jsc) throws Exception {
// instant from the active timeline
if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
Option<HoodieInstant> firstCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
Option<HoodieInstant> firstCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
if (firstCompactionInstant.isPresent()) {
cfg.compactionInstantTime = firstCompactionInstant.get().getTimestamp();
LOG.info("Found the earliest scheduled compaction instant which will be executed: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,13 @@ public static DFSPropertiesConfiguration getConfig(List<String> overriddenProps)
return conf;
}

public static TypedProperties buildProperties(Configuration hadoopConf, String propsFilePath, List<String> props) {
return StringUtils.isNullOrEmpty(propsFilePath)
? UtilHelpers.buildProperties(props)
: UtilHelpers.readConfig(hadoopConf, new Path(propsFilePath), props)
.getProps(true);
}

public static TypedProperties buildProperties(List<String> props) {
TypedProperties properties = DFSPropertiesConfiguration.getGlobalProps();
props.forEach(x -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.utilities.multitable;

import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.utilities.UtilHelpers;

import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Archive task to run in TableServicePipeline.
*
* @see HoodieMultiTableServicesMain
*/
class ArchiveTask extends TableServiceTask {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add javadocs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added javadoc for all new classes

private static final Logger LOG = LoggerFactory.getLogger(ArchiveTask.class);

@Override
void run() {
LOG.info("Run Archive with props: " + props);
HoodieWriteConfig hoodieCfg = HoodieWriteConfig.newBuilder().withPath(basePath).withProps(props).build();
try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), hoodieCfg)) {
UtilHelpers.retry(retry, () -> {
client.archive();
return 0;
}, "Archive Failed");
}
}

/**
* Utility to create builder for {@link ArchiveTask}.
*
* @return Builder for {@link ArchiveTask}.
*/
public static Builder newBuilder() {
return new Builder();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add doc for each Builder class, also the doc for each member variables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/**
* Builder class for {@link ArchiveTask}.
*/
public static final class Builder {
/**
* Properties for running archive task which are already consolidated w/ CLI provided config-overrides.
*/
private TypedProperties props;

/**
* Hoodie table path for running archive task.
*/
private String basePath;

/**
* Number of retries.
*/
private int retry;

/**
* JavaSparkContext to run spark job.
*/
private JavaSparkContext jsc;

public Builder withProps(TypedProperties props) {
this.props = props;
return this;
}

public Builder withBasePath(String basePath) {
this.basePath = basePath;
return this;
}

public Builder withJsc(JavaSparkContext jsc) {
this.jsc = jsc;
return this;
}

public Builder withRetry(int retry) {
this.retry = retry;
return this;
}

public ArchiveTask build() {
ArchiveTask archiveTask = new ArchiveTask();
archiveTask.jsc = this.jsc;
archiveTask.retry = this.retry;
archiveTask.props = this.props;
archiveTask.basePath = this.basePath;
return archiveTask;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.utilities.multitable;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.HoodieCleaner;
import org.apache.hudi.utilities.UtilHelpers;

import org.apache.spark.api.java.JavaSparkContext;

/**
* Clean task to run in TableServicePipeline.
*
* @see HoodieMultiTableServicesMain
*/
class CleanTask extends TableServiceTask {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadocs


@Override
void run() {
HoodieCleaner.Config cleanConfig = new HoodieCleaner.Config();
cleanConfig.basePath = basePath;
UtilHelpers.retry(retry, () -> {
new HoodieCleaner(cleanConfig, jsc, props).run();
return 0;
}, "Clean Failed");
}

/**
* Utility to create builder for {@link CleanTask}.
*
* @return Builder for {@link CleanTask}.
*/
public static Builder newBuilder() {
return new Builder();
}

/**
* Builder class for {@link CleanTask}.
*/
public static final class Builder {
/**
* Properties for running clean task which are already consolidated w/ CLI provided config-overrides.
*/
private TypedProperties props;

/**
* Hoodie table path for running clean task.
*/
private String basePath;

/**
* Number of retries.
*/
private int retry;

/**
* JavaSparkContext to run spark job.
*/
private JavaSparkContext jsc;

public Builder withProps(TypedProperties props) {
this.props = props;
return this;
}

public Builder withBasePath(String basePath) {
this.basePath = basePath;
return this;
}

public Builder withJsc(JavaSparkContext jsc) {
this.jsc = jsc;
return this;
}

public Builder withRetry(int retry) {
this.retry = retry;
return this;
}

public CleanTask build() {
CleanTask cleanTask = new CleanTask();
cleanTask.jsc = this.jsc;
cleanTask.retry = this.retry;
cleanTask.basePath = this.basePath;
cleanTask.props = this.props;
return cleanTask;
}
}
}