-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-6735] Add noSpilling option to WriteFiles. #7929
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
R: @lukecwik |
6cd50c1 to
954af30
Compare
|
Build is failing due to FileIO.Write#defaultNaming not having a javadoc. I did not make a change to this method and I am unsure of what to write for its javadoc. Also there is another overloaded defaultNaming method and a relativeFileNaming method that may also require javadocs. |
954af30 to
d0f6db0
Compare
|
Kyle could you rebase this? And would you mind adding the Javadoc? <3 Luke is out on leave, but he'll be back soon and he can review... |
|
Ismael may also be able to review if Luke can't |
d0f6db0 to
e2b899a
Compare
|
Done! |
e2b899a to
edb478d
Compare
|
For continuity it makes sense for @chamikaramj to take a look at this. R: @chamikaramj |
7735c4d to
8c00020
Compare
|
I added a NeedsRunner test testWriteNoSpilling. |
|
Sorry to be impatient but can I get a review @lukecwik or @chamikaramj? |
|
I took a look. I worked on similar code recently (for py BQ). I understand the change; and it seems to do what it intends. Would you consider passing an argument that explicitly tells WriteFiles to have unlimited files per bundle? e.g. Just so I can understand where you're coming from to propose this change, what use case does this cover @kyle-winkelman ? Thanks. |
pabloem
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left comment : )
|
I mainly use the spark-runner in batch mode. The pipeline, as it is now, has WriteUnshardedTempFilesFn be a ParDo.MultiOutput. The way ParDo.MultiOutputs are implemented in the spark-runner are to immediately cache the output and use it twice. Currently this is the only location in my pipeline with caching. I use spark.dynamicAllocation so that I can release idle executors, but because of this caching those executors are not eligible to be released. This means at the end of my job I am holding onto hundreds of executors while only one of them does the copy from temp location to final location. |
|
I added a commit addressing your comment. I kept it separate so it was easy to review but let me know if I should squash the two commits into one before this gets merged. |
|
Thank you Kyle. That makes sense to me. Thanks. |
|
And yes, could you squash the changes? |
ef2cc0f to
41c9110
Compare
|
Run Java PreCommit |
1 similar comment
|
Run Java PreCommit |
|
Squashed. Thanks for the review @pabloem. |
Construct a simplified pipeline in the event that the number of writers isn't prohibitive. User must opt into this by using FileIO, TextIO, or WriteFiles withNoSpilling.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.R: @username).Post-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.