[BEAM-2052] Allow dynamic sharding in windowed file sinks#3023
[BEAM-2052] Allow dynamic sharding in windowed file sinks#3023jkff wants to merge 4 commits intoapache:masterfrom
Conversation
FileBasedWriteOperation -> WriteOperation, FileBasedWriter -> Writer
|
retest this please |
|
|
||
| @FinishBundle | ||
| public void finishBundle(FinishBundleContext c) throws Exception { | ||
| FileResult result = writer.close(); |
There was a problem hiding this comment.
if (writer != null) {
}
(empty bundles are legal I believe)
| private FileBasedWriter<T> writer = null; | ||
| private BoundedWindow window = null; | ||
| private class WriteWindowedBundles extends DoFn<T, FileResult> { | ||
| private Map<KV<BoundedWindow, PaneInfo>, Writer<T>> windowedWriters; |
There was a problem hiding this comment.
incorrect I think - BoundedWindow isn't guaranteed to implement a proper hashCode. That's why I used Coder.structuralValue in the original PR
There was a problem hiding this comment.
In particular while this will work for our built-in windows, users can write their own window functions, and we do not insist that they provide hashCode - all we insist is that they provide a Coder. Hence the reason I used Coder.structuredValue to encode the windows.
There was a problem hiding this comment.
BoundedWindow documents that it must implement equals and hashCode (https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L30), and there's a lot of HashMaps keyed with BoundedWindow in the repo.
|
1. This is true for the non-windowed case as well. Fixing this might
require some work in the DirectRunner - @tgroh can comment.
2. 1 bundle per key is true of the Dataflow streaming runner, but is in
general runner dependent. This is true of unwindowed FileBasedSinks as
well, this is not new in the windowing support.
I see two regressions in your PR from mine (one of which was also caught by
a test). Added comments.
…On Tue, May 9, 2017 at 4:40 PM, Eugene Kirpichov ***@***.***> wrote:
This is a slightly modified and rearranged version of @reuvenlax
<https://github.com/reuvenlax> 's #2647
<#2647> .
My concerns about it are:
1.
In direct runner, the integration tests of dynamic sharding are
vacuous, because direct runner replaces unspecified sharding with fixed
sharding at https://github.com/apache/beam/blob/master/runners/
direct-java/src/main/java/org/apache/beam/runners/direct/
WriteWithShardingFactory.java
<https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java>
(applied at https://github.com/apache/beam/blob/master/runners/
direct-java/src/main/java/org/apache/beam/runners/direct/
DirectRunner.java#L217
<https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java#L217>).
However, this is a testing-only concern: other runners don't have this
override, so overall the testing is non-vacuous, this is just hard to test
against direct runner and I suspect that we probably want these tests to be
non-vacuous in direct runner too.
2.
When I removed that override for testing purposes, I noticed that
there's a very large number of files being written - primarily, I guess,
because the bundles are very small. So large a number of files that the
test time for batch with dynamic sharding grows from 21 seconds to 5
minutes. In particular, we write many, many files for each window/pane -
presumably because in streaming runners and in direct runner, there's at
least 1 bundle per key, and we create at least 1 file per bundle in
WriteFiles.Write(Windowed,Unwindowed)Bundles.
Reuven, can you please comment on whether this "at least 1 file per key"
is expected behavior in a streaming runner? I suspect that it's not, but
then I'm not sure how to fix the PR semantically.
CC: @reuvenlax <https://github.com/reuvenlax> @davorbonaci
<https://github.com/davorbonaci> @dhalperi <https://github.com/dhalperi>
------------------------------
You can view, comment on, or merge this pull request online at:
#3023
Commit Summary
- Implement dynamic-sharding for windowed file outputs, and add an
integration test.
- Renames FileBasedSink inner classes
- Simpler code for setting shard numbers on results in FileBasedSink
- Splits WriteBundles into windowed/unwindowed versions
File Changes
- *M* examples/java/src/main/java/org/apache/beam/examples/
WindowedWordCount.java
<https://github.com/apache/beam/pull/3023/files#diff-0> (7)
- *M* examples/java/src/main/java/org/apache/beam/examples/
common/WriteOneFilePerWindow.java
<https://github.com/apache/beam/pull/3023/files#diff-1> (24)
- *M* examples/java/src/test/java/org/apache/beam/examples/
WindowedWordCountIT.java
<https://github.com/apache/beam/pull/3023/files#diff-2> (29)
- *M* runners/core-construction-java/src/test/java/org/apache/
beam/runners/core/construction/PTransformMatchersTest.java
<https://github.com/apache/beam/pull/3023/files#diff-3> (2)
- *M* runners/direct-java/src/test/java/org/apache/beam/runners/direct/
WriteWithShardingFactoryTest.java
<https://github.com/apache/beam/pull/3023/files#diff-4> (2)
- *M* runners/google-cloud-dataflow-java/src/main/java/org/apache/
beam/runners/dataflow/util/DefaultCoderCloudObjectTransla
torRegistrar.java
<https://github.com/apache/beam/pull/3023/files#diff-5> (2)
- *M* sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
<https://github.com/apache/beam/pull/3023/files#diff-6> (14)
- *M* sdks/java/core/src/main/java/org/apache/beam/sdk/io/
FileBasedSink.java
<https://github.com/apache/beam/pull/3023/files#diff-7> (316)
- *M* sdks/java/core/src/main/java/org/apache/beam/sdk/io/
TFRecordIO.java <https://github.com/apache/beam/pull/3023/files#diff-8>
(16)
- *M* sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
<https://github.com/apache/beam/pull/3023/files#diff-9> (14)
- *M* sdks/java/core/src/main/java/org/apache/beam/sdk/io/
WriteFiles.java
<https://github.com/apache/beam/pull/3023/files#diff-10> (259)
- *M* sdks/java/core/src/test/java/org/apache/beam/sdk/io/
FileBasedSinkTest.java
<https://github.com/apache/beam/pull/3023/files#diff-11> (162)
- *M* sdks/java/core/src/test/java/org/apache/beam/sdk/io/
SimpleSink.java
<https://github.com/apache/beam/pull/3023/files#diff-12> (4)
- *M* sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/
xml/XmlSink.java
<https://github.com/apache/beam/pull/3023/files#diff-13> (8)
Patch Links:
- https://github.com/apache/beam/pull/3023.patch
- https://github.com/apache/beam/pull/3023.diff
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#3023>, or mute the thread
<https://github.com/notifications/unsubscribe-auth/AUGE1ef1ECr_2_E3BIuSK-oH13NV2a1Lks5r4Pl5gaJpZM4NWAz->
.
|
| testWindowedWordCountPipeline(options); | ||
| } | ||
|
|
||
| @Test |
There was a problem hiding this comment.
I think we should remove this test until we fix the streaming runner to not generate a file per bundle. Until then this is not a suggested use case for streaming, so I think it's ok to remove the test.
|
Run Spark ValidatesRunner |
|
Run Flink ValidatesRunner |
|
Run Dataflow ValidatesRunner |
|
I merged this after manually running WindowedWordCountIT with Flink runner. Looking at the dependency error - it looks pretty clearly unrelated to my PR though, the diff doesn't even mention "findbugs"... |
|
Sorry, I'm an idiot, I actually introduced the error. Will send a fix PR right away. |
|
DO NOT cherrypick without #3059 |
This is a slightly modified and rearranged version of @reuvenlax 's #2647 .
My concerns about it are:
In direct runner, the integration tests of dynamic sharding are vacuous, because direct runner replaces unspecified sharding with fixed sharding at https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java (applied at https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java#L217). However, this is a testing-only concern: other runners don't have this override, so overall the testing is non-vacuous, this is just hard to test against direct runner and I suspect that we probably want these tests to be non-vacuous in direct runner too.
When I removed that override for testing purposes, I noticed that there's a very large number of files being written - primarily, I guess, because the bundles are very small. So large a number of files that the test time for batch with dynamic sharding grows from 21 seconds to 5 minutes. In particular, we write many, many files for each window/pane - presumably because in streaming runners and in direct runner, there's at least 1 bundle per key, and we create at least 1 file per bundle in WriteFiles.Write(Windowed,Unwindowed)Bundles.
Reuven, can you please comment on whether this "at least 1 file per key" is expected behavior in a streaming runner? I suspect that it's not, but then I'm not sure how to fix the PR semantically.
CC: @reuvenlax @davorbonaci @dhalperi