Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-1089] Refactor hudi-client to support multi-engine #1827

Merged
merged 6 commits into from Oct 1, 2020

Conversation

wangxianghu
Copy link
Contributor

@wangxianghu wangxianghu commented Jul 14, 2020

Tips

What is the purpose of the pull request

Refactor hudi-client to support multi-engine

Brief change log

  • Refactor hudi-client to support multi-engine

Verify this pull request

This pull request is already covered by existing tests.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@wangxianghu
Copy link
Contributor Author

wangxianghu commented Jul 14, 2020

Hi, @vinothchandar @yanghua @leesf as the refactor is finished, I have filed a Jira ticket to track this work,
please review the refactor work on this pr :)

@leesf
Copy link
Contributor

leesf commented Jul 15, 2020

Hi, @vinothchandar @yanghua @leesf as the refactor is finished, I have filed a Jira ticket to track this work,
please review the refactor work on this pr :)

ack. @Mathieu1124 pls check travis failure.

@vinothchandar
Copy link
Member

@leesf @Mathieu1124 @lw309637554 so this replaces #1727 right?

@vinothchandar
Copy link
Member

Good to get @n3nash 's review here as well to make sure we are not breaking anything for the RDD client users..

@wangxianghu
Copy link
Contributor Author

@leesf @Mathieu1124 @lw309637554 so this replaces #1727 right?

yes, #1727 can be closed now

@wangxianghu
Copy link
Contributor Author

Hi, @vinothchandar @yanghua @leesf as the refactor is finished, I have filed a Jira ticket to track this work,
please review the refactor work on this pr :)

ack. @Mathieu1124 pls check travis failure.

copy, have resolved the ci failure and conflicts with master, will push it after work.

@wangxianghu wangxianghu force-pushed the HUDI-1089 branch 5 times, most recently from 9d544ec to 13795f5 Compare July 15, 2020 16:27
@wangxianghu
Copy link
Contributor Author

Hi, @vinothchandar @yanghua @leesf @n3nash, ci is green, this pr is ready for review now :)

Copy link
Contributor

@yanghua yanghua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Mathieu1124 I reviewed some changes. Since this is a huge PR, can we recover the method align issues in this PR so that we can reduce many change points? Then, the review work should be easier.

private String getRenamesToBePrinted(List<RenameOpResult> res, Integer limit, String sortByField, boolean descending,
boolean headerOnly, String operation) {
private String getRenamesToBePrinted(List<BaseCompactionAdminClient.RenameOpResult> res, Integer limit, String sortByField, boolean descending,
boolean headerOnly, String operation) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new style or the old style, which one is right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @yanghua thanks for your review. I am not sure which one is right either, I will roll back these style issues just to keep as same as before.

@@ -40,7 +39,7 @@
* HoodieParquetWriter extends the ParquetWriter to help limit the size of underlying file. Provides a way to check if
* the current file can take more records with the <code>canWrite()</code>
*/
public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
public class HoodieParquetWriter<R extends IndexedRecord>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to change this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to change this class?

The Generic "T" is useless in this class, and it causes some generic problems in the abstraction, so I removed it.

@vinothchandar
Copy link
Member

@Mathieu1124 @leesf can you please share any tests you may have done in your own environment to ensure existing functionality is in tact.. This is a major signal we may not completely get with a PR review

@wangxianghu
Copy link
Contributor Author

wangxianghu commented Jul 17, 2020

@Mathieu1124 @leesf can you please share any tests you may have done in your own environment to ensure existing functionality is in tact.. This is a major signal we may not completely get with a PR review

@vinothchandar, My test is limited, just all the unit tests in source code, and all the demos in the Quick-Start Guide. I am planning to test it in docker env.

@vinothchandar vinothchandar force-pushed the HUDI-1089 branch 2 times, most recently from 56690a5 to c7b1cb1 Compare October 1, 2020 06:42
* Making HoodieSnapshotCopier/HoodieSnapshotExporter all use HoodieContext
* More replacements of jsc.parallelize across hudi-spark-client
* More replacements of jsc.setJobGroup across hudi-spark-client
* Removing usages of HoodieIndex#fetchRecordLocation everywhere
@codecov-commenter
Copy link

Codecov Report

Merging #1827 into master will decrease coverage by 3.75%.
The diff coverage is 30.00%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #1827      +/-   ##
============================================
- Coverage     59.89%   56.14%   -3.76%     
+ Complexity     4454     2658    -1796     
============================================
  Files           558      324     -234     
  Lines         23378    14775    -8603     
  Branches       2348     1539     -809     
============================================
- Hits          14003     8295    -5708     
+ Misses         8355     5783    -2572     
+ Partials       1020      697     -323     
Flag Coverage Δ Complexity Δ
#hudicli 38.37% <30.00%> (-27.83%) 193.00 <0.00> (-1615.00)
#hudiclient 100.00% <ø> (+25.46%) 0.00 <ø> (-1615.00) ⬆️
#hudicommon 54.74% <ø> (ø) 1793.00 <ø> (ø)
#hudihadoopmr ? ?
#hudispark 67.18% <ø> (-0.02%) 311.00 <ø> (ø)
#huditimelineservice 64.43% <ø> (ø) 49.00 <ø> (ø)
#hudiutilities 69.43% <ø> (+0.05%) 312.00 <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ Complexity Δ
...rg/apache/hudi/cli/commands/SavepointsCommand.java 14.28% <0.00%> (ø) 3.00 <0.00> (ø)
...main/java/org/apache/hudi/cli/utils/SparkUtil.java 0.00% <0.00%> (ø) 0.00 <0.00> (ø)
...n/java/org/apache/hudi/cli/commands/SparkMain.java 6.43% <37.50%> (+0.40%) 4.00 <0.00> (ø)
...src/main/java/org/apache/hudi/DataSourceUtils.java 45.36% <0.00%> (ø) 21.00% <0.00%> (ø%)
...in/scala/org/apache/hudi/HoodieStreamingSink.scala 24.00% <0.00%> (ø) 10.00% <0.00%> (ø%)
...n/scala/org/apache/hudi/HoodieSparkSqlWriter.scala 56.20% <0.00%> (ø) 0.00% <0.00%> (ø%)
...in/java/org/apache/hudi/utilities/UtilHelpers.java 64.59% <0.00%> (ø) 30.00% <0.00%> (ø%)
...apache/hudi/utilities/deltastreamer/DeltaSync.java 68.16% <0.00%> (ø) 39.00% <0.00%> (ø%)
.../hudi/async/SparkStreamingAsyncCompactService.java 0.00% <0.00%> (ø) 0.00% <0.00%> (ø%)
.../hudi/internal/HoodieDataSourceInternalWriter.java 87.50% <0.00%> (ø) 8.00% <0.00%> (ø%)
... and 46 more

@wangxianghu
Copy link
Contributor Author

@wangxianghu @yanghua I have rebased this against master. Please take a look at my changes.

High level, we could re-use more code, but it needs an abstraction that can wrap RDD or DataSet or `D

@wangxianghu @yanghua I have rebased this against master. Please take a look at my changes.

High level, we could re-use more code, but it needs an abstraction that can wrap RDD or DataSet or DataStream adequately and support basic operations like .map(), reduceByKey() etc. We can do this in a second pass once we have a working Flink impl. For now this will do.

I am trying to get the tests to pass. if they do, we could go ahead and merge

Thanks, @vinothchandar, this is really great work!
Yes, we can do more abstractions about basic map, reduceByKey methods in HoodieEngineContext, or some Util classes next.

@vinothchandar
Copy link
Member

I actually figured out that we can remove P altogether. since HoodieIndex#fetchRecordLocation is not used much outside of internal APIs. So will push a final change for that . tests are passing now

* Not used by any other major API
* Removing `P` from the templatized list of parameters
@vinothchandar
Copy link
Member

@wangxianghu Please help test this out if possible. Once the tests pass again, planning to merge this, morning PST

cc @yanghua

@leesf
Copy link
Contributor

leesf commented Oct 1, 2020

  1. Run quickstart demo: found the warn log:
    20/10/01 21:11:18 WARN embedded.EmbeddedTimelineService: Unable to find driver bind address from spark config, but works fine, the warn log is not found in 0.6.0. @vinothchandar @wangxianghu
  2. Ran my own unit tests, works fine.

@vinothchandar
Copy link
Member

@leesf do you see the following exception? could not understand how you ll get the other one even.

LOG.info("Starting Timeline service !!");
        Option<String> hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST);
        if (!hostAddr.isPresent()) {
          throw new HoodieException("Unable to find host address to bind timeline server to.");
        }
        timelineServer = Option.of(new EmbeddedTimelineService(context, hostAddr.get(),
            config.getClientSpecifiedViewStorageConfig()));

Either way, good pointer. the behavior has changed around this a bit actually. So will try and tweak and push a fix

@wangxianghu
Copy link
Contributor Author

@vinothchandar @yanghua @leesf The demo runs well in my local, except the warning WARN embedded.EmbeddedTimelineService: Unable to find driver bind address from spark config

@vinothchandar
Copy link
Member

@wangxianghu can you please test the latest commit. To be clear, you are saying you don't get the warning on master, but get it on this branch. right?

if this round of tests pass, and you confirm, we can land from my perspective

@wangxianghu
Copy link
Contributor Author

wangxianghu commented Oct 1, 2020

@wangxianghu can you please test the latest commit. To be clear, you are saying you don't get the warning on master, but get it on this branch. right?

if this round of tests pass, and you confirm, we can land from my perspective

Hi @vinothchandar The warn log is still there in HUDI-1089 branch.(master is ok, no warn log)
I think we should check embeddedTimelineServiceHostAddr instead of hostAddr.

  private void setHostAddr(String embeddedTimelineServiceHostAddr) {
   // here we should check embeddedTimelineServiceHostAddr instead of hostAddr
    if (hostAddr != null) {
      LOG.info("Overriding hostIp to (" + embeddedTimelineServiceHostAddr + ") found in spark-conf. It was " + this.hostAddr);
      this.hostAddr = embeddedTimelineServiceHostAddr;
    } else {
      LOG.warn("Unable to find driver bind address from spark config");
      this.hostAddr = NetworkUtils.getHostname();
    }
  }

@wangxianghu
Copy link
Contributor Author

@wangxianghu can you please test the latest commit. To be clear, you are saying you don't get the warning on master, but get it on this branch. right?
if this round of tests pass, and you confirm, we can land from my perspective

Hi @vinothchandar The warn log is still there in HUDI-1089 branch.(master is ok, no warn log)
I think we should check embeddedTimelineServiceHostAddr instead of hostAddr.

  private void setHostAddr(String embeddedTimelineServiceHostAddr) {
   // here we should check embeddedTimelineServiceHostAddr instead of hostAddr
    if (hostAddr != null) {
      LOG.info("Overriding hostIp to (" + embeddedTimelineServiceHostAddr + ") found in spark-conf. It was " + this.hostAddr);
      this.hostAddr = embeddedTimelineServiceHostAddr;
    } else {
      LOG.warn("Unable to find driver bind address from spark config");
      this.hostAddr = NetworkUtils.getHostname();
    }
  }

I have tested the latest commit with the check condition changed to

if (embeddedTimelineServiceHostAddr != null) {

It runs well in my local, and the warn log disappeared.

@wangxianghu
Copy link
Contributor Author

@vinothchandar The warn log issue is fixed

@vinothchandar
Copy link
Member

@wangxianghu duh ofc. I understand now. Thanks for jumping in @wangxianghu !

@vinothchandar vinothchandar merged commit 1f7add9 into apache:master Oct 1, 2020
@vinothchandar
Copy link
Member

@wangxianghu Just merged! Thanks again for the herculean effort.

May be some followups could pop up. Would you be interested in taking them up? if so, I ll mention you along the way

@wangxianghu
Copy link
Contributor Author

@wangxianghu Just merged! Thanks again for the herculean effort.

May be some followups could pop up. Would you be interested in taking them up? if so, I ll mention you along the way

sure, just ping me when needed

prashantwason pushed a commit to prashantwason/incubator-hudi that referenced this pull request Feb 22, 2021
- This change breaks `hudi-client` into `hudi-client-common` and `hudi-spark-client` modules 
- Simple usages of Spark using jsc.parallelize() has been redone using EngineContext#map, EngineContext#flatMap etc
- Code changes in the PR, break classes into `BaseXYZ` parent classes with no spark dependencies living in `hudi-client-common`
- Classes on `hudi-spark-client` are named `SparkXYZ` extending the parent classes with all the Spark dependencies
- To simplify/cleanup, HoodieIndex#fetchRecordLocation has been removed and its usages in tests replaced with alternatives

Co-authored-by: Vinoth Chandar <vinoth@apache.org>
Comment on lines +154 to +157
protected abstract Iterator<List<WriteStatus>> handleInsert(String idPfx,
Iterator<HoodieRecord<T>> recordItr) throws Exception;

protected abstract Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask if the difference of parameter idPfx and fileId between the method handleInsert and handleUpdate is designed on some purpose?

I can understand for update operation, we need to find out the previous version of base file, so use file id to locate it.

But in the method BaseSparkCommitActionExecutor.handleUpsertPartition, both the method handleInsert and handleUpdate get the parameters from binfo.fileIdPrefix. It's really confused.

  protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr,
                                                              Partitioner partitioner) {
    SparkHoodiePartitioner upsertPartitioner = (SparkHoodiePartitioner) partitioner;
    BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
    BucketType btype = binfo.bucketType;
    try {
      if (btype.equals(BucketType.INSERT)) {
        return handleInsert(binfo.fileIdPrefix, recordItr);
      } else if (btype.equals(BucketType.UPDATE)) {
        return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr);
      } else {
        throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
      }
    } catch (Throwable t) {
      String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
      LOG.error(msg, t);
      throw new HoodieUpsertException(msg, t);
    }
  }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants