New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-679] Make io package Spark free #1460
Conversation
@vinothchandar @yanghua Please take a look when you are free. Thanks |
Codecov Report
@@ Coverage Diff @@
## master #1460 +/- ##
============================================
- Coverage 67.66% 67.58% -0.09%
Complexity 261 261
============================================
Files 342 348 +6
Lines 16510 16670 +160
Branches 1684 1693 +9
============================================
+ Hits 11172 11266 +94
- Misses 4599 4665 +66
Partials 739 739
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leesf I left some comments you can consider.
/** | ||
* Spark Supplier. | ||
*/ | ||
public interface SparkSupplier<T> extends Supplier<T>, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering this interface supports some information about TaskContext
, can we rename to SparkTaskContextDetailSupplier
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds reasonable.
* Spark Supplier. | ||
*/ | ||
public interface SparkSupplier<T> extends Supplier<T>, Serializable { | ||
SparkSupplier<Integer> PARTITION_SUPPLIER = () -> TaskContext.getPartitionId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add empty line to split this definition? Additionally, add some comments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add empty line to split this definition? Additionally, add some comments?
sure
@yanghua Updated this PR to address your comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing those comments. Left some new comments.
/** | ||
* Supplier to get partition id. | ||
*/ | ||
SparkTaskContextDetailSupplier<Integer> PARTITION_SUPPLIER = () -> TaskContext.getPartitionId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PARTITION_ID_SUPPLIER
?
/** | ||
* Supplier to get stage id. | ||
*/ | ||
SparkTaskContextDetailSupplier<Integer> STAGE_SUPPLIER = () -> TaskContext.get().stageId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
STAGE_ID_SUPPLIER
?
/** | ||
* Supplier to get task attempt id. | ||
*/ | ||
SparkTaskContextDetailSupplier<Long> ATTEMPT_SUPPLIER = () -> TaskContext.get().taskAttemptId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ATTEMPT_ID_SUPPLIER
?
@@ -51,6 +51,7 @@ public BulkInsertMapFunction(String instantTime, HoodieWriteConfig config, Hoodi | |||
@Override | |||
public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> sortedRecordItr) { | |||
return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config, instantTime, hoodieTable, | |||
fileIDPrefixes.get(partition)); | |||
fileIDPrefixes.get(partition), hoodieTable.getIdSupplier(), hoodieTable.getStageSupplier(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hoodieTable.getIdSupplier()
is not clear here. I suggest we can rename these getter to e.g. getPartitionIdSupplier
, getStageId
and getAttemptId
?
@@ -50,15 +51,23 @@ | |||
protected final HoodieTable<T> hoodieTable; | |||
protected final String idPrefix; | |||
protected int numFilesWritten; | |||
protected Supplier<Integer> idSupplier; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
String instantTime, HoodieTable<T> hoodieTable, String idPrefix) { | ||
String instantTime, HoodieTable<T> hoodieTable, String idPrefix, | ||
Supplier<Integer> idSupplier, Supplier<Integer> stageSupplier, | ||
Supplier<Long> attemptSupplier) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I am not sure if we can package these three args into a DTO structure. Just a thought, you can ignore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I am not sure if we can package these three args into a DTO structure. Just a thought, you can ignore.
Yes, I think it is better.
@@ -55,26 +55,32 @@ | |||
protected final String partitionPath; | |||
protected final String fileId; | |||
protected final String writeToken; | |||
protected final Supplier<Integer> idSupplier; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, IMO, id here is not clear. partition id is better.
@yanghua Updated this PR to package three suppliers into |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would love to eliminate the Suppliers
abstraction :) .. we can just have an interface the serves up three values and pass that in
/** | ||
* Spark Supplier. | ||
*/ | ||
public interface SparkTaskContextDetailSupplier<T> extends Supplier<T>, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be interface be something generic like WriteTaskContextSupplier
which is extended by SparkTaskContextSupplier
?
/** | ||
* Spark Supplier. | ||
*/ | ||
public interface SparkTaskContextDetailSupplier<T> extends Supplier<T>, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure this abstraction is at the right level.. Should this have to be Supplier
.. I think we can just have three methods that return Supplier<Integer>
and Supplier<Long>
and pass just one argument through the code path i.e the SparkTaskContextSuppler
instance..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just online seeing the latest changes comes from @leesf . Yes, it seems this is a better abstraction.
/** | ||
* A bundle of Suppliers. | ||
*/ | ||
public class Suppliers implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then this can go away.. I feel this is additional abstraction, that we may not need..
@@ -35,8 +36,9 @@ | |||
public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extends CopyOnWriteLazyInsertIterable<T> { | |||
|
|||
public MergeOnReadLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config, | |||
String instantTime, HoodieTable<T> hoodieTable, String idPfx) { | |||
super(sortedRecordItr, config, instantTime, hoodieTable, idPfx); | |||
String instantTime, HoodieTable<T> hoodieTable, String idPfx, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
side point: we should fix method arg formatting consistently between intellij and checkstyle. Keep seeing these sort of whitespace changes in PRs.
this.originalSchema = new Schema.Parser().parse(config.getSchema()); | ||
this.writerSchema = createHoodieWriteSchema(originalSchema); | ||
this.timer = new HoodieTimer().startTimer(); | ||
this.writeStatus = (WriteStatus) ReflectionUtils.loadClass(config.getWriteStatusClassName(), | ||
!hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction()); | ||
this.suppliers = suppliers; | ||
this.writeToken = makeSparkWriteToken(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to just makeWriteToken()
?
@yanghua @vinothchandar Thanks for your review, just updated the PR to address your comments. PTAL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
* [HUDI-679] Make io package Spark free
Tips
What is the purpose of the pull request
Make io package spark free.
Brief change log
Verify this pull request
This pull request is a trivial rework / code cleanup without any test coverage.
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.