[BEAM-2624] Allow access to created filenames from WriteFiles#3573
[BEAM-2624] Allow access to created filenames from WriteFiles#3573reuvenlax wants to merge 9 commits intoapache:masterfrom
Conversation
|
Adding @jkff as @kennknowles is on vacation right now. |
| Map<TupleTag<?>, PValue> outputs, WriteFilesResult newOutput) { | ||
| // We must connect the new output from WriteFilesResult to the outputs provided by the original | ||
| // transform. | ||
| Map.Entry<TupleTag<?>, PValue> original = Iterables.getOnlyElement(outputs.entrySet()); |
There was a problem hiding this comment.
I think there exists a helper that does all this for you; take a look at other implementations of mapOutputs?
| import org.apache.beam.sdk.values.TupleTag; | ||
|
|
||
| /** The result of a {@link WriteFiles} transform. */ | ||
| public class WriteFilesResult implements POutput { |
There was a problem hiding this comment.
- It should probably be destination-dependent, i.e.
PCollection<KV<DestinationT, String>> - Not a fan of the POutput boilerplate: why do we need a WriteFilesResult in general, why not just return a PCollection - is that because you want to make it forward-compatible?
There was a problem hiding this comment.
This is what we did in BigQueryIO for exactly that reason - it is forward compatible, and I can imagine wanting to return additional metadata in the future, as well as other things (e.g. dead-letter type accessors like in BigQueryIO).
Added a second getter to return KV<DestinationT, String>(simple use cases of sinks do not set a custom destination type, so want there to be a simple accessor)
| public Void apply(Iterable<String> values) { | ||
| try { | ||
| String pattern = baseFilename.toString() + "*"; | ||
| Iterable<String> matches = |
There was a problem hiding this comment.
- This will look better as a simple loop
- You can put the creation of expected values into runTestWrite, and use regular PAssert.that(collection).contains(those values).
There was a problem hiding this comment.
runTestWrite doesn't necessarily know the expected values a priori (e.g. if dynamic sharding is used). That's why we match the files written to FS.
Changed to a loop.
|
I just remembered this PR was pending. It changes an API introduced in 2.2.0, so we need to ensure this goes in before 2.2.0 is cut |
|
I would like to defer this to @jkff if possible; my impression is that review is well underway. R: -@kennknowles |
8f6e407 to
5c838f4
Compare
We use the new TypedWrite transforms to provide the new return type. Since these transforms have not yet been part of a Beam release (they are slated for 2.2), we can change their return types without worrying about backwards compatibility.
R: @kennknowles
R: @jkff