[BEAM-4285] Extend side input handlers to handle multiple access patterns.#5814
[BEAM-4285] Extend side input handlers to handle multiple access patterns.#5814jkff merged 2 commits intoapache:masterfrom
Conversation
| Collections.emptyList()); | ||
| } | ||
|
|
||
| private static <T> byte[] encode(T value, Coder<T> coder) { |
There was a problem hiding this comment.
This can be replaced by CoderUtils.encodeToByteArray
| * be used to encode/decode their respective values. | ||
| */ | ||
| <K, V, W extends BoundedWindow> MultimapSideInputHandler<K, V, W> forSideInput( | ||
| <T, V, W extends BoundedWindow> SideInputHandler<V, W> forSideInput( |
There was a problem hiding this comment.
Instead of trying to force two different access patterns into a single interface that sort of works for both by special casing the key, wouldn't it make more sense to have a separate handler interface for iterable side inputs?
The idea is that for side inputs, you can create something which delegates based upon the access pattern so you effectively have three StateRequestHandlers, one for iterable, one for multimap, and one that delegates based upon access pattern.
There was a problem hiding this comment.
I agree with this but I suggest we do it in an immediately-following-up cleanup PR.
There was a problem hiding this comment.
I actually started down this road, but found myself duplicating a huge amount of code due to the lack of a common interface (short of passing the full StateRequest which seemed to break the abstraction). That's not to say it wouldn't be worth exploring more, but it seems every access pattern (geo-spacial, range-based, ...) falls naturally into the pattern of having a key that is interpreted/ignored according to the access pattern's semantics. The other alternative is to let the StateHandlerFactory interface have a handle-returning-method per-known access pattern rather than pass this down to the runners themselves, which also felt odd.
There was a problem hiding this comment.
If runners want to write one object that supports both types, runners should be able to have a single class which has shared state and exposes a trivial wrapper object that implements the desired interface that converts calls to the shared object.
| K key; | ||
| try { | ||
| // TODO: We could skip decoding and just compare encoded values for deterministic keyCoders. | ||
| key = keyCoder.decode(new ByteArrayInputStream(keyBytes)); |
There was a problem hiding this comment.
Here too, could use CoderUtils.
| * be used to encode/decode their respective values. | ||
| */ | ||
| <K, V, W extends BoundedWindow> MultimapSideInputHandler<K, V, W> forSideInput( | ||
| <T, V, W extends BoundedWindow> SideInputHandler<V, W> forSideInput( |
There was a problem hiding this comment.
I agree with this but I suggest we do it in an immediately-following-up cleanup PR.
Python side inputs now work.
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.It will help us expedite review of your Pull Request if you tag someone (e.g.
@username) to look at it.Post-Commit Tests Status (on master branch)