Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6481] Support run multi tables services in a single spark job #9558

Merged
merged 6 commits into from Sep 7, 2023

Conversation

stream2000
Copy link
Contributor

@stream2000 stream2000 commented Aug 28, 2023

Change Logs

Origin Issue: #8960

Now we have HoodieMultiTableDeltaStreamer using spark to ingest multi tables into hudi, and we can also ingest multi tables using a single flink in some platforms like alicloud vvp. In such scenario we may want to run all tables services outside of the ingestion job to improve stability.

Currently, hudi provides a bunch of offline table service jobs but we still need to run them manually or through some schedule tools. This Pr provides a tool to run all table services in a single spark job

Impact

provides a tool to run all table services in a single spark job

Risk level (write none, low medium or high below)

none

Documentation Update

Will update document after merge

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@stream2000
Copy link
Contributor Author

@leesf @jonvex Hi, could you please help review this pr? Thanks~

Copy link
Contributor

@jonvex jonvex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job! Code looks solid. @nsivabalan @xushiyan @danny0405 @leesf can one of you take a look at HoodieMultiTableServicesMain and validate that this aligns with how we want table services to be run? Also the use of concurrency instead of running sequentially?

this.jsc = jsc;
this.props = props;
this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to do the disable async cleaning and add lock options like the other constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Thanks for reminding

this.jsc = jsc;
this.props = props;
this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as in clustering job

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure and updated

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ArchiveTask extends TableServiceTask {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add javadocs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added javadoc for all new classes


import org.apache.spark.api.java.JavaSparkContext;

class CleanTask extends TableServiceTask {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadocs


import org.apache.spark.api.java.JavaSparkContext;

class ClusteringTask extends TableServiceTask {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadocs

import java.util.ArrayList;
import java.util.List;

public class TableServicePipeline {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc

Comment on lines 1 to 18
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In all the files you have double commented, it should just be

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated all license header

LOG.info("Shutdown the table services");
main.cancel();
} catch (InterruptedException e) {
//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated and ignore the exception

.withParallelism(4, 4)
.withBulkInsertParallelism(4)
.withFinalizeWriteParallelism(2)
.withProps(makeIndexConfig(HoodieIndex.IndexType.BUCKET))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why bucket index? Just curious

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to make it easier to produce log for compaciton.


import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;

class TestHoodieMultiTableServicesMain extends HoodieCommonTestHarness implements SparkProvider {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadocs

@jonvex
Copy link
Contributor

jonvex commented Aug 29, 2023

@stream2000 also please sign up for jira and email the dev mailing list if you haven't. And assign yourself to the jira ticket.

Copy link
Contributor Author

@stream2000 stream2000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review! ~ I have addressed all comments and assigned the jira to myself

private ScheduledExecutorService executorService;

private void batchRunTableServices(List<String> tablePaths) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(cfg.poolSize);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand what you mean here? I created a new local thread pool for running all table services and destroyed it since we only need it once.

@stream2000 stream2000 force-pushed the HUDI-6481_multi_table_services branch from bb592b7 to d0a5621 Compare August 31, 2023 07:22
@leesf
Copy link
Contributor

leesf commented Aug 31, 2023

We have already used this feature internally to make table service management of Hudi tables easier.

* Main function for executing multi-table services
*/
public class HoodieMultiTableServicesMain {
private static final Logger LOG = LoggerFactory.getLogger(HoodieStreamer.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HoodieMultiTableServicesMain.class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I missed it when migrated it from HoodieStreamer. Will check other classes also.


if (!dirs.isEmpty()) {
List<Pair<FileStatus, Integer>> dirResults = engineContext.map(dirs, fileStatus -> {
if (isHoodieTable(fileStatus.getPath(), conf.get())) {
Copy link
Contributor

@leesf leesf Aug 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does 0, 1, 2 mean? looks like magic number

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch~ It's not a good design that uses hard-coded magic number, already updated the magic number to meaningful enum constants.

Updated to:

  /**
   * Type of directories when searching hoodie tables under path
   */
  enum DirType {
    HOODIE_TABLE, // previous 0
    NORMAL_DIR, // previous 1 
    META_FOLDER  // previous 2 
  }

ExecutorService executorService = Executors.newFixedThreadPool(cfg.poolSize);
List<CompletableFuture<Void>> futures = tablePaths.stream()
.map(basePath -> CompletableFuture.runAsync(
() -> MultiTableServiceUtils.buildTableServicePipeline(jsc, basePath, cfg, props).execute(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should early exit if no services is enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no services is enabled, the TableServicePipeline will contain no task and the execute method will return directly. So it's OK to keep current design.

Copy link
Contributor Author

@stream2000 stream2000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review! Already addressed all comments, PTAL

ExecutorService executorService = Executors.newFixedThreadPool(cfg.poolSize);
List<CompletableFuture<Void>> futures = tablePaths.stream()
.map(basePath -> CompletableFuture.runAsync(
() -> MultiTableServiceUtils.buildTableServicePipeline(jsc, basePath, cfg, props).execute(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no services is enabled, the TableServicePipeline will contain no task and the execute method will return directly. So it's OK to keep current design.

* Main function for executing multi-table services
*/
public class HoodieMultiTableServicesMain {
private static final Logger LOG = LoggerFactory.getLogger(HoodieStreamer.class);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I missed it when migrated it from HoodieStreamer. Will check other classes also.


if (!dirs.isEmpty()) {
List<Pair<FileStatus, Integer>> dirResults = engineContext.map(dirs, fileStatus -> {
if (isHoodieTable(fileStatus.getPath(), conf.get())) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch~ It's not a good design that uses hard-coded magic number, already updated the magic number to meaningful enum constants.

Updated to:

  /**
   * Type of directories when searching hoodie tables under path
   */
  enum DirType {
    HOODIE_TABLE, // previous 0
    NORMAL_DIR, // previous 1 
    META_FOLDER  // previous 2 
  }

@stream2000
Copy link
Contributor Author

@hudi-bot run azure

@stream2000
Copy link
Contributor Author

@leesf @jonvex Hi, thanks for your review! I have addressed all comments and can we move on this~

Copy link
Contributor

@leesf leesf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, @jonvex any other comments?

public HoodieCleaner(Config cfg, TypedProperties props, JavaSparkContext jssc) {
this.cfg = cfg;
this.jssc = jssc;
this.props = props;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original code fetches the props either from cmd line or global options, what is the purpose to support custom props per-service then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes in multi-table services we also fetch the props either from cmd line or global options. And we do not want each service to fetch the props by themselves. So here I add a new constructor to use the props that all services shared.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n multi-table services we also fetch the props either from cmd line or global options

The existing constructor already satisfy the needs.

Copy link
Contributor Author

@stream2000 stream2000 Sep 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing constructor already satisfy the needs.

Yes, however if we want to reuse existing constructor, we need pass the prop file path and the cmd line config from HoodieMultiTableServicesMain to HoodieCleaner. And we will parse the config(read props file and use cmd line config to override it) for every TableServiceTask , which is redundant. Current implementation will do the config parsing in HoodieMultiTableServicesMain only, and all the TableServiceTask will use the final config parsed by HoodieMultiTableServicesMain. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine with it.

Copy link
Contributor

@danny0405 danny0405 Sep 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we can force one constructor to invoke the other one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we can make one constructor to invokes the other one.

Thanks for your suggestions~ I have slightly refactored the code and make it cleaner, PTAL

import org.slf4j.LoggerFactory;

/**
* Archive task to run in TableServicePipeline
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every doc sentence should end up with dot .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

public static Builder newBuilder() {
return new Builder();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add doc for each Builder class, also the doc for each member variables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor Author

@stream2000 stream2000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405, Thanks for your review! All comments addressed and added a new param compactionStrategyClassName to config since I missed it in previous version.

import org.slf4j.LoggerFactory;

/**
* Archive task to run in TableServicePipeline
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

public static Builder newBuilder() {
return new Builder();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

@jonvex jonvex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi. Sorry I was out of the country. Looks good

@stream2000
Copy link
Contributor Author

@hudi-bot run azure

1 similar comment
@stream2000
Copy link
Contributor Author

@hudi-bot run azure

@hudi-bot
Copy link

hudi-bot commented Sep 7, 2023

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@leesf
Copy link
Contributor

leesf commented Sep 7, 2023

Merging this as CI failed with other reason.

@leesf leesf merged commit bbeb531 into apache:master Sep 7, 2023
27 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants