[BEAM-2926] Migrate to using a trivial multimap materialization within the Java SDK.#4011
[BEAM-2926] Migrate to using a trivial multimap materialization within the Java SDK.#4011lukecwik wants to merge 5 commits intoapache:masterfrom
Conversation
f9dff19 to
f7240d8
Compare
|
R: @tgroh I had a brief discussion with Ken about doing this high level approach since he had been the largest contributor to the PCollectionView interfaces from my memory. Ken, you might want to take a look at the high level interface changes. |
| public static List<Object> materializeValuesFor( | ||
| PTransform<?, ? extends PCollectionView<?>> viewTransformClass, Object ... values) { | ||
| List<Object> rval = new ArrayList<>(); | ||
| // Currently all view materializations are the same where the data is shared underneath |
There was a problem hiding this comment.
Can this be any more specific than a PTransform, Object[] -> List<Object>? Narrowing the materialization (e.g. must be a Multimap, by name or by checkArgument) is fine.
A comment on what the method exists for would also be appreciated - my understanding of how this interacts with the existing ViewFns is that they're stored as the encoded objects in a singleton map from the void key; and in the future we'll do something smarter; this will be responsible for materializing those as well?
There was a problem hiding this comment.
This is for the future in case we have other materializations then just multimap so we can't give anything more specific then Object.
Your assumption is correct. This is to decouple the SDKs representation of how it chooses to materialize the different views with how the runner is testing them.
Once side inputs are being only fetched over the Fn API then this will not be the way to test things as really the Runner should be dealing just with storage and look up and not need to deal with the ViewFn at all.
| // Generate the temporary-file prefix. | ||
| private PCollectionView<String> createTempFilePrefixView(PCollectionView<String> jobIdView) { | ||
| return ((PCollection<String>) jobIdView.getPCollection()) | ||
| private PCollectionView<String> createTempFilePrefixView( |
There was a problem hiding this comment.
Can this change be separated out? It already LGTM, assuming that we don't ever expect to have separate prefixes per window.
There was a problem hiding this comment.
Like part of another PR or just called out as a commit in this PR?
Currently just split to separate commit.
| // * null keys | ||
| // * null values | ||
| // * duplicate values | ||
| Multimap<Object, Object> multimap = ArrayListMultimap.create(); |
There was a problem hiding this comment.
This can be a ListMultimap to force a bit more of the preconditions. Up to you.
There was a problem hiding this comment.
The List part isn't important. Its just the ability to store null keys and values and duplicate values. For example HashMultimap doesn't support duplicate values and ImmutableMultimap doesn't support null keys/values.
There was a problem hiding this comment.
I wanted to use ListMultimap instead of generic Multimap to signal the non-set nature of the Multimap, as there's unfortunately no BagMultimap, but It's a minor nit given the commenting.
There was a problem hiding this comment.
I don't think the firstNotNull is necessary; Guava Multimaps return empty collections when there are no values associated with a key.
| return sideInput.getViewFn().apply(elements); | ||
| ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn(); | ||
| Coder<?> keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder(); | ||
| return viewFn.apply(InMemoryMultimapSideInputView.fromIterable(keyCoder, (Iterable) elements)); |
There was a problem hiding this comment.
Can/do we have an outstanding task to update the handler to use a more appropriate storage type, and/or is there a more appropriate state type?
There was a problem hiding this comment.
The issue from swapping away from value state is that not all runners may support more complex state types and value state effectively forces using an in memory representation.
| @SuppressWarnings("unchecked") Iterable<KV<?, ?>> elements = Iterables.transform( | ||
| (Iterable<WindowedValue<KV<?, ?>>>) viewContents.getUnchecked( | ||
| PCollectionViewWindow.of(view, window)).get(), | ||
| new Function<WindowedValue<KV<?, ?>>, KV<?, ?>>() { |
There was a problem hiding this comment.
Instead of storing the input Multimap's entries as an iterable and converting back, can we just store the multimap directly? If that's too large for this change, file a followup jira.
There was a problem hiding this comment.
The plan was to eventually improve these representations (or have runner authors make these changes), see https://issues.apache.org/jira/browse/BEAM-3080 for the DirectRunner/ULR.
This was just to get the change in since all the runners here just do a trivial multimap -> iterable -> multimap for all the types anyways.
| public class SideInputInitializer<ElemT, ViewT, W extends BoundedWindow> | ||
| implements BroadcastVariableInitializer<WindowedValue<ElemT>, Map<BoundedWindow, ViewT>> { | ||
| public class SideInputInitializer<ViewT> | ||
| implements BroadcastVariableInitializer<WindowedValue<?>, Map<BoundedWindow, ViewT>> { |
There was a problem hiding this comment.
Can this be WindowedValues of KVs, or is that not possible with the module's type signatures?
There was a problem hiding this comment.
I'm specifically not narrowing the type because KV<> is for the multimap materialization. When/if this SideInputInitializer supports a different URN, that KV is unlikely to be true anymore.
There was a problem hiding this comment.
No action required, but these number changes are a real downer.
|
Run Java PreCommit |
| // * null keys | ||
| // * null values | ||
| // * duplicate values | ||
| Multimap<Object, Object> multimap = ArrayListMultimap.create(); |
There was a problem hiding this comment.
I wanted to use ListMultimap instead of generic Multimap to signal the non-set nature of the Multimap, as there's unfortunately no BagMultimap, but It's a minor nit given the commenting.
| // Generate the temporary-file prefix. | ||
| private PCollectionView<String> createTempFilePrefixView(PCollectionView<String> jobIdView) { | ||
| return ((PCollection<String>) jobIdView.getPCollection()) | ||
| private PCollectionView<String> createTempFilePrefixView( |
ba39ffd to
76c4fc0
Compare
|
Run Dataflow ValidatesRunner |
|
Run Apex ValidatesRunner |
|
Run Spark ValidatesRunner |
|
Run Flink ValidatesRunner |
|
Run Gearpump ValidatesRunner |
|
SUCCESS --none-- |
4 similar comments
|
SUCCESS --none-- |
|
SUCCESS --none-- |
|
SUCCESS --none-- |
|
SUCCESS --none-- |
|
FAILURE --none-- |
2 similar comments
|
FAILURE --none-- |
|
FAILURE --none-- |
…implementation is specialized.
|
Run Dataflow ValidatesRunner |
|
Run Spark ValidatesRunner |
|
Run Apex ValidatesRunner |
|
Run Gearpump ValidatesRunner |
|
Run Flink ValidatesaRunner |
|
SUCCESS --none-- |
|
Run Flink ValidatesRunner |
|
SUCCESS --none-- |
|
FAILURE --none-- |
|
SUCCESS --none-- |
2 similar comments
|
SUCCESS --none-- |
|
SUCCESS --none-- |
|
FAILURE --none-- |
1 similar comment
|
FAILURE --none-- |
…as being replaced.
|
SUCCESS --none-- |
|
Run Dataflow ValidatesRunner |
|
Run Apex ValidatesRunner |
|
Run Spark ValidatesRunner |
|
Run Flink ValidatesRunner |
|
Run Gearpump ValidatesRunner |
|
SUCCESS --none-- |
6 similar comments
|
SUCCESS --none-- |
|
SUCCESS --none-- |
|
SUCCESS --none-- |
|
SUCCESS --none-- |
|
SUCCESS --none-- |
|
SUCCESS --none-- |
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue.mvn clean verifyto make sure basic checks pass. A more thorough check will be performed on your pull request automatically.I want to get this reviewed before I start mucking with the Dataflow worker so I can update that internally after this approach has been agreed upon.
Review notes:
PCollectionView.getCoderInternal()/PCollection.getWindowingStrategyInternal()/... toPCollectionView.getPCollection().getYYY()but was unable to because the PCollection stored within PCollectionView is transient and the majority of runners pull information from serialized PCollectionViews, not from proto round tripped views.?types and forced casts but that would have grown this PR to be so much larger.