-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Description
Beam SQL runs in two contexts:
- As a PTransform in a pipeline. A PTransform operates on a PCollection, which is always implicitly windows and a PTransform should operate per-window so it automatically works on bounded and unbounded data. This only works if the query has no windowing operators, in which case the GROUP BY <non-window stuff> should operate per-window.
- As a top-level shell that starts and ends with SQL. In the relational model there are no implicit windows. Calcite has some extensions for windowing, but they manifest (IMO correctly) as just items in the GROUP BY list. The output of the aggregation is "just rows" again. So it should be globally windowed.
The problem is that this semantic fix makes it so we cannot join windowing stream subqueries. Because we don't have retractions, we only support GroupByKey-based equijoins over windowed streams, with the default trigger. These joins implicitly also join windows. For example:
JOIN(left.id = right.id)
SELECT ... GROUP BY id, TUMBLE(1 hour)
SELECT ... GROUP BY id, TUMBLE(1
hour)
Semantically, there may be a joined row for 1:00pm on the left and 10:00pm on the right. But by the time the right-hand row for 10:00pm shows up, the left one may be GC'd. So this is implicitly, but nondeterministically, joining on the window as well. Before this PR, we left the windowing strategies for left and right in place, and asserted that they matched.
If we re-window into the global window always, there are no windowed streams so you just can't do stream joins. The solution is probably to track which field of a stream is the window and allow joins which also explicitly express the equijoin over the window field.
Imported from Jira BEAM-4702. Original Jira may contain additional context.
Reported by: kenn.