Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-759] Integrate checkpoint provider with delta streamer #1486

Merged
merged 1 commit into from
Apr 14, 2020

Conversation

garyli1019
Copy link
Member

@garyli1019 garyli1019 commented Apr 5, 2020

What is the purpose of the pull request

Integrate the initial checkpoint provider with delta streamer

Brief change log

  • Add two options to delta streamer to use the initial checkpoint provider

Verify this pull request

This change added tests and can be verified as follows:

  • Added unit test in TestHoodieDeltaStreamer

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@garyli1019
Copy link
Member Author

@pratyakshsharma @vinothchandar
I agree with Vinoth's idea that having those two options in the delta streamer. Is this implementation makes sense to you guys?
I will add a test case if this approach looks good.
Thanks

@codecov-io
Copy link

codecov-io commented Apr 5, 2020

Codecov Report

Merging #1486 into master will decrease coverage by 0.07%.
The diff coverage is 78.78%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #1486      +/-   ##
============================================
- Coverage     72.23%   72.15%   -0.08%     
- Complexity      289      294       +5     
============================================
  Files           338      373      +35     
  Lines         15947    16282     +335     
  Branches       1624     1638      +14     
============================================
+ Hits          11519    11748     +229     
- Misses         3700     3798      +98     
- Partials        728      736       +8     
Impacted Files Coverage Δ Complexity Δ
...in/java/org/apache/hudi/utilities/UtilHelpers.java 64.70% <50.00%> (-0.71%) 22.00 <1.00> (+1.00) ⬇️
...lities/checkpointing/KafkaConnectHdfsProvider.java 89.28% <71.42%> (-3.03%) 14.00 <3.00> (+2.00) ⬇️
...ities/checkpointing/InitialCheckPointProvider.java 83.33% <83.33%> (ø) 1.00 <1.00> (?)
...i/utilities/deltastreamer/HoodieDeltaStreamer.java 77.93% <85.71%> (-1.22%) 11.00 <4.00> (+1.00) ⬇️
...apache/hudi/utilities/deltastreamer/DeltaSync.java 72.44% <100.00%> (ø) 37.00 <0.00> (ø)
...g/apache/hudi/metrics/InMemoryMetricsReporter.java 40.00% <0.00%> (-40.00%) 0.00% <0.00%> (ø%)
.../org/apache/hudi/table/HoodieCopyOnWriteTable.java 61.62% <0.00%> (-27.66%) 0.00% <0.00%> (ø%)
.../org/apache/hudi/table/HoodieMergeOnReadTable.java 57.50% <0.00%> (-25.63%) 0.00% <0.00%> (ø%)
...hudi/common/fs/inline/InLineFsDataInputStream.java 38.46% <0.00%> (-15.39%) 0.00% <0.00%> (ø%)
...src/main/java/org/apache/hudi/metrics/Metrics.java 58.33% <0.00%> (-13.89%) 0.00% <0.00%> (ø%)
... and 51 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 5d717a2...b923a97. Read the comment docs.

@pratyakshsharma
Copy link
Contributor

LGTM

@garyli1019 garyli1019 changed the title WIP[HUDI-759] Integrate checkpoint privoder with delta streamer [HUDI-759] Integrate checkpoint privoder with delta streamer Apr 5, 2020
@garyli1019
Copy link
Member Author

Test added. Thanks for the review

@garyli1019
Copy link
Member Author

Add #1493 into this PR.

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few clarifications

if (cfg.initialCheckpointProvider != null && cfg.bootstrapFromPath != null && cfg.checkpoint == null) {
InitialCheckPointProvider checkPointProvider =
UtilHelpers.createInitialCheckpointProvider(cfg.initialCheckpointProvider, new Path(cfg.bootstrapFromPath), fs);
cfg.checkpoint = checkPointProvider.getCheckpoint();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC setting cfg.checkpoint will force use of that timestamp instead of what we normally do - read from the last commit?

Should we do this only when creating the dataset for the first time..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this depends on how we design the migration flow for the user.
What I did myself is I use Spark datasource to do a bulkInsert to convert all the plain parquet files to Hudi format, then the second job I'd like to use delta streamer to read from Kafka. So this initialCheckpointProvider should be the first delta streamer job when switching sources.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.. do you think if we made it such that even if someone runs delta streamer few times after initial bootstrap, the initial checkpoint provider is used just once? otherwise, you need to scramble to stop the delta streamer after the first run or manually run it by hand once before scheduling it using airflow or deploying in --continuous mode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the initial checkpoint provider should be just used once when the user wants to switch from one source to another. After that, the delta streamer should be able to get the checkpoint from the previous commit. We can improve this once the bootstrap is ready. At this point, I am not sure how to put everything together if we want one step to handling everything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay.. lets revisit once we have bootstrap support.. cc @bvaradar as fyi

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some suggestions.. lmk what you think

@garyli1019
Copy link
Member Author

hmm... Looks like the checkstyle auto-fix something...Let me see what's going...

@garyli1019
Copy link
Member Author

I added the save action and checkstyle as documented, not sure which one triggered all those final and this. Is that ok to add those? As a scala programmer I do prefer to use val all the time and the Google Java Guide does encourage using final, but I am not sure what is the preference of Hudi.

@garyli1019
Copy link
Member Author

Addressed some comments, summary:

  • Removed --bootstrap-from option in the delta streamer. Use hoodie.deltastreamer.checkpoint.provider.path field in the props instead.
  • Use TypedProperty to construct the InitialCheckPointProvider and init(FileSystem fs) to initialize the class
  • Keep hiveConf as the variable name even change HiveConf type to Configuration. Open to discussion if you guys don't agree.
  • Not able to replace all null in this PR because null was served as a flag in the delta streamer workflow, this might change the behavior of other classes using the TypedProperty field. Will need a separate PR to do the code refactoring.
  • The style check tool automatically adds final and this to match the stylecheck.xml.

@vinothchandar vinothchandar changed the title [HUDI-759] Integrate checkpoint privoder with delta streamer [HUDI-759] Integrate checkpoint provider with delta streamer Apr 12, 2020
Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert the non-essential fixes from the PR (final, this..) and so on., so its easier to review.. Those need more discussion if we are changing code style and applied uniformly..

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The style check tool automatically adds final and this to match the stylecheck.xml.

Please revert this change..

@garyli1019 garyli1019 force-pushed the HUDI-759 branch 2 times, most recently from 92cf0e7 to 94bb6ff Compare April 12, 2020 19:25
Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working through this @garyli1019 .. Hopefully just one more cycle.. and we are home.

if (cfg.initialCheckpointProvider != null && cfg.bootstrapFromPath != null && cfg.checkpoint == null) {
InitialCheckPointProvider checkPointProvider =
UtilHelpers.createInitialCheckpointProvider(cfg.initialCheckpointProvider, new Path(cfg.bootstrapFromPath), fs);
cfg.checkpoint = checkPointProvider.getCheckpoint();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.. do you think if we made it such that even if someone runs delta streamer few times after initial bootstrap, the initial checkpoint provider is used just once? otherwise, you need to scramble to stop the delta streamer after the first run or manually run it by hand once before scheduling it using airflow or deploying in --continuous mode?

@garyli1019
Copy link
Member Author

@vinothchandar Thanks for all the feedback! Very helpful!
Comments addressed. Please take a look.

@Parameter(names = {"--initial-checkpoint-provider"}, description = "Generate check point for delta streamer "
+ "for the first run. This field will override the checkpoint of last commit using the checkpoint field. "
+ "Use this field only when switch source, for example, from DFS source to Kafka Source. Check the class "
+ "org.apache.hudi.utilities.checkpointing for details")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InitialCheckPointProvider did you intend to write the name of the class here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean we should add InitialCheckPointProvider here or we should remove this description?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the description to match with --schemaprovider-class

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.. 1 minor comment. once you respond/repush.. can merge

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants