New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-770] Organize upsert/insert API implementation under a single package #1495
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1495 +/- ##
============================================
- Coverage 72.25% 72.17% -0.08%
Complexity 289 289
============================================
Files 338 365 +27
Lines 15946 16228 +282
Branches 1624 1632 +8
============================================
+ Hits 11521 11712 +191
- Misses 3697 3783 +86
- Partials 728 733 +5
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few suggestions!
} else { | ||
return table.getInsertPartitioner(profile, jsc); | ||
HoodieTable<T> hoodieTable) { | ||
CommitActionResult result = hoodieTable.ingest(jsc, preppedRecords, instantTime, getOperationType()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should just mirror the same APIs upsert, insert on the table.. ingest
is confusing, since it also implies that we are reading from some source, whihc we are not..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
return true; | ||
} | ||
|
||
void doPostCommitAndEmitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
break this into two methods? one to do the post commit and one to emit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} else { | ||
return table.getInsertPartitioner(profile, jsc); | ||
HoodieTable<T> hoodieTable) { | ||
CommitActionResult result = hoodieTable.ingest(jsc, preppedRecords, instantTime, getOperationType()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and can we return HoodieCommitMetadata
instead? I guess the issue is we have to pass this to the caller (deltastreamer or datasource) to decide whether to commit or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Supporting externally triggered commit is the reason why we have CommitActionResult.
|
||
import scala.Tuple2; | ||
|
||
public abstract class AbstractBaseCommitActionExecutor<T extends HoodieRecordPayload<T>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to just BaseCommitActionExecutor
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
public class CommitActionResult { | ||
|
||
private JavaRDD<WriteStatus> writeStatuses; | ||
private Duration indexUpdateDuration; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if tagging the index is outside this action, then updating should be moved out as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fixed as as part of creating separate executors for each operation.
|
||
WriteOperationType(String value) { | ||
WriteOperationType(String value, boolean isUpsert) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is nt this really debt? can we get rid of this boolean and just replace with a helper method that takes in the operation type and returns true/false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -69,4 +71,8 @@ public static WriteOperationType fromValue(String value) { | |||
throw new HoodieException("Invalid value of Type."); | |||
} | |||
} | |||
|
|||
public boolean isUpsert() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like this method, can be just a static method that returns true/false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
/** | ||
* Contains metadata, write-statuses and latency times corresponding to a commit/delta-commit action. | ||
*/ | ||
public class CommitActionResult { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we are able to return HoodieCommitMetadata
... but since we can't, may be still rename this to HoodieWriteMetadata
to stay consistent with the other objects we are returning now. (RollbackMetadata, RestoreMetadata etc)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
5359fd0
to
ce8cc69
Compare
@vinothchandar : Addressed review comments. Please take a look when you get a chance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments on naming.. Overall, felt we can organize for reuse more.. but this already improves things substantially.. So please land after you go over the comments and CI is happy
hudi-client/src/main/java/org/apache/hudi/table/action/commit/DedupeWriteHelper.java
Outdated
Show resolved
Hide resolved
...-client/src/main/java/org/apache/hudi/table/action/commit/MergeOnReadBulkInsertExecutor.java
Outdated
Show resolved
Hide resolved
...lient/src/main/java/org/apache/hudi/table/action/commit/MergeOnReadCommitActionExecutor.java
Outdated
Show resolved
Hide resolved
hudi-client/src/main/java/org/apache/hudi/table/action/commit/MergeOnReadDeleteExecutor.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
Outdated
Show resolved
Hide resolved
@vinothchandar : Addressed comments. Will merge once the CI passes. |
[HUDI-770] Organize upsert/insert API implementation under a single package
There is inherent complexity with supporting auto commit vs non-auto commit. Once that is resolved, we will be able to remove much of the code in HoodieWriteClient.