Consider a co-group by key over the two (streaming) collections:
l : PCollection<KV<K, L>>
r : PCollection<KV<K, R>>
Each processElement sees a K, Iterable<L> and Iterable<R>.
If the underlying trigger only allows a single PaneInfo.Timing.ON_TIME pane then it is trivial to calculate the traditional cross-product, including any of the inner/outer join combinations should Iterable<L> or Iterable<R> be empty.
However if the underlying trigger supports speculative (ie PaneInfo.Timing.EARLY) or late (ie PaneInfo.Timing.LATE) panes then the corresponding speculative output panes are awkward to compute.
(left_already_seen ******** new_left) X (right_already_seen ******** new_right)
==
(left_already_seen X right_already_seen) ********
(new_left X right_already_seen) ********
(left_already_seen X new_right) ********
(new_left X new_right)
Currently the barrier between 'already seen' and 'new' must be maintained for left and right in per-window state. That suppresses some optimizations.
This bug is for finding a cleaner way to express this combinator.
Imported from Jira BEAM-197. Original Jira may contain additional context.
Reported by: mshields822.
Consider a co-group by key over the two (streaming) collections:
l : PCollection<KV<K, L>>
r : PCollection<KV<K, R>>
Each processElement sees a K, Iterable<L> and Iterable<R>.
If the underlying trigger only allows a single PaneInfo.Timing.ON_TIME pane then it is trivial to calculate the traditional cross-product, including any of the inner/outer join combinations should Iterable<L> or Iterable<R> be empty.
However if the underlying trigger supports speculative (ie PaneInfo.Timing.EARLY) or late (ie PaneInfo.Timing.LATE) panes then the corresponding speculative output panes are awkward to compute.
(left_already_seen ******** new_left) X (right_already_seen ******** new_right)
==
(left_already_seen X right_already_seen) ********
(new_left X right_already_seen) ********
(left_already_seen X new_right) ********
(new_left X new_right)
Currently the barrier between 'already seen' and 'new' must be maintained for left and right in per-window state. That suppresses some optimizations.
This bug is for finding a cleaner way to express this combinator.
Imported from Jira BEAM-197. Original Jira may contain additional context.
Reported by: mshields822.