-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-2302] Add spilling code to WriteFiles. #3161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
jkff
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Didn't look into the tests in too much detail yet.
| new ViewOverrideFactory())) /* Uses pardos and GBKs */ | ||
| ImmutableList.Builder<PTransformOverride> builder = | ||
| ImmutableList.<PTransformOverride>builder(); | ||
| if (!options.isUnitTest()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer the other alternative, where the transform itself has an option whether or not a runner should be allowed to set fixed sharding on it (or more like, perhaps introduce a distinction between "no sharding" and "runner-chosen sharding", and use "no sharding" in the test). Having different behavior for test and non-test code is dangerous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach was suggested by @tgroh, who felt that the transform should never be overridden in unit tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specifically, tests executions for the sdk and runners-core modules (e.g. the validates-runner execution) don't need to modify any behavior of the original transforms, and I'd like to avoid having the transform specify how it is permitted to be overridden.
I would like to pull any test-related options out of DirectOptions and mark them as @Internal and @Hidden, however.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tgroh where should the options go?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either in a new DirectTestOptions or in TestPipelineOptions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you add DirectTestOptions I believe you will have to register them in the DirectPipelineRegistrar as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| * for each shard have been collected into a single iterable. | ||
| */ | ||
| private class WriteShardedBundles extends DoFn<KV<Integer, Iterable<T>>, FileResult> { | ||
| boolean setWindowedShardNumber; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename the variable to something more clear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made an enum
| Writer<T> writer = writeOperation.createWriter(); | ||
| if (windowedWrites) { | ||
| writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey()); | ||
| int shardNumber = setWindowedShardNumber ? c.element().getKey() : UNKNOWN_SHARDNUM; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And add a comment here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's more clear with an enum now, and a detailed comment was added elsewhere
| "WriteBundles", | ||
| ParDo.of(windowedWrites ? new WriteWindowedBundles() : new WriteUnwindowedBundles())); | ||
| if (windowedWrites) { | ||
| TupleTag<FileResult> writtenRecordsTag = new TupleTag<FileResult>() {}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: these anonymous TupleTag's capture the enclosing class. Might want to use the TupleTag(String) constructor instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| writeTuple | ||
| .get(unwrittedRecordsTag) | ||
| .apply("GroupUnwritten", GroupByKey.<Integer, T>create()) | ||
| .apply("WriteUnwritten", ParDo.of(new WriteShardedBundles(false))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of a boolean, maybe better to use an enum with eloquently named values
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| private SimpleSink makeSimpleSink(boolean windowedWrites) { | ||
| FilenamePolicy filenamePolicy = new PerWindowFiles("file", "simple"); | ||
| return new SimpleSink(getBaseOutputDirectory(), filenamePolicy); | ||
| // return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "simple"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove commented-out code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| public void testEmptyWrite() throws IOException { | ||
| runWrite(Collections.<String>emptyList(), IDENTITY_MAP, getBaseOutputFilename()); | ||
| runWrite(Collections.<String>emptyList(), IDENTITY_MAP, getBaseOutputFilename(), | ||
| Optional.<Integer>absent(), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
false /* windowedWrites */
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forget not everyone uses IntelliJ :)
| } | ||
| private SimpleSink makeSimpleSink() { | ||
| return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "simple"); | ||
| private SimpleSink makeSimpleSink(boolean windowedWrites) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not using windowedWrites.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
| } | ||
|
|
||
| SimpleSink sink = makeSimpleSink(); | ||
| SimpleSink sink = makeSimpleSink(windowedWrites); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be easier to read all the tests if, instead of passing the parameters such as numConfiguredShards, windowedWrites etc., you make runShardedWrite take a WriteFiles instance directly.
It's easier to read WriteFiles.to(makeSimpleSink(true) /* windowedWrites */).withMaxNumWritersPerBundle(42) than "..., true, Optional.of(42)".
|
retest this please. |
|
retest this please. |
jkff
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
| PTransformMatchers.classEqualTo(CreatePCollectionView.class), | ||
| new ViewOverrideFactory())) /* Uses pardos and GBKs */ | ||
| TestPipelineOptions testOptions = options.as(TestPipelineOptions.class); | ||
| ImmutableList.Builder<PTransformOverride> builder = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reindent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| */ | ||
| public WriteFiles<T> withNumShards(ValueProvider<Integer> numShardsProvider) { | ||
| return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites); | ||
| return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be time to convert this class to AutoValue (but feel free to ignore)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think best left for another PR
| windowedWriters.put(key, writer); | ||
| LOG.debug("Done opening writer"); | ||
| } else { | ||
| c.output(unwrittedRecordsTag, KV.of( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: might make sense to put this (attaching a sharding key) into its own DoFn, and here output simply to a TupleTag<T>. That DoFn can be implemented more efficiently, without generating a random number for every value (you can generate a single number in startBundle or even setup(), and then keep incrementing it for every element - this has the same statistical properties).
Also feel free to ignore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think making it a separate DoFn is overkill, but the suggestion of caching the random number is a good one (and what we do elsewhere)
| ParDo.of(windowedWrites ? new WriteWindowedBundles() : new WriteUnwindowedBundles())); | ||
| if (windowedWrites) { | ||
| TupleTag<FileResult> writtenRecordsTag = | ||
| new TupleTag<FileResult>("writtenRecordsTag") {}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These anonymous TupleTag classes capture the enclosing transform so they'll blow up the serialized graph unnecessarily. I think the code will work even if you remove {} - if it won't, you'll probably just need to specify some coders, because all the {} does is capture the type information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| @Internal | ||
| @Hidden | ||
| @org.apache.beam.sdk.options.Description( | ||
| "Indicates whether this is an automatically-run unit test.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...as opposed to? (what else is TestPipelineOptions used for, if not unit tests?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"automatically run"
| IDENTITY_MAP, | ||
| getBaseOutputFilename(), | ||
| Optional.of(20)); | ||
| Optional.of(20), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still not super happy about readability here due to how many parameters there are, and how easy they are to confuse. What do you think about my suggestion to pass WriteFiles directly, so that the call site contains a builder call and it's obvious what parameters are configured with what values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
Hey @reuvenlax any updates on this PR? |
597f8a8 to
bce2c12
Compare
|
@jkff comments are addressed. |
|
Run Dataflow ValidatesRunner |
|
The dataflow test failure seems unrelated to this change. |
|
Run Dataflow ValidatesRunner |
|
Again fails in CombineTest.testSimpleCombineWithContext on Dataflow Runner. I see no way this could be related. |
|
Run Dataflow ValidatesRunner |
This is similar to the fix for BEAM-2154.
R: @jkff