Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Joining windowed and non-windowed datasets #165

Open
jacksonrnewhouse opened this issue Jun 8, 2023 · 2 comments
Open

Support Joining windowed and non-windowed datasets #165

jacksonrnewhouse opened this issue Jun 8, 2023 · 2 comments
Labels
enhancement New feature or request sql Related to the DataFusion SQL integration

Comments

@jacksonrnewhouse
Copy link
Contributor

We'd like to support joins between a windowed aggregate and a non-windowed stream. This would allow us to run Nexmark Query 6. The Arroyo version of the query is below. Ideally we'd also inspect the WHERE clause in order to not keep data around after the window has passed, as the only data from B1 we care about is that within active windows.

WITH 
auction as (
    SELECT auction.category as category, 
        auction.datetime as datetime, 
        auction.expires as expires,
        auction.id as id 
    FROM nexmark where auction is not null),
bid as (
    SELECT bid.auction as auction,
        bid.bidder as bidder, 
        bid.extra as extra,
        bid.datetime as datetime,
         bid.price as price
    FROM nexmark  where bid is not null)

SELECT B.auction, B.price, B.bidder, B.dateTime, B.extra
from bid B
JOIN (
  SELECT MAX(B1.price) AS maxprice, tumble(INTERVAL '10' SECOND) as window
  FROM bid B1
  GROUP BY 2
) B1
ON B.price = B1.maxprice
WHERE B.dateTime BETWEEN B1.window.start_time AND B1.window.end_time;
@jacksonrnewhouse jacksonrnewhouse added enhancement New feature or request sql Related to the DataFusion SQL integration labels Jun 8, 2023
@edmondop
Copy link
Contributor

This is the DAG produced by the query

2023-08-31T22:56:30.082233Z  INFO arroyo_controller::compiler: digraph {
    0 [ label = "nexmark_0:Nexmark<10 eps>" ]
    1 [ label = "watermark_1:Watermark" ]
    2 [ label = "fused_9:expression<sql_fused<value_project,filter,value_project,value_project,value_project,key_project>:OptionalRecord>" ]
    3 [ label = "tumbling_window_two_phase_aggregator_3:TumblingWindowAggregator<TumblingWindow(10s)>" ]
    4 [ label = "fused_4:expression<sql_fused<filter,value_project,value_project,key_project>:OptionalRecord>" ]
    5 [ label = "join_pair_merge_2:expression<api_fused<join_merge,sql_fused<filter,value_project,value_project>>:OptionalRecord>" ]
    6 [ label = "sink_web_6:WebSink" ]
    7 [ label = "join_with_expiration_7:JoinWithExpiration<left_expire: 86400s, right_expire: 86400s, join_type: Inner>" ]
    8 [ label = "fused_8:expression<sql_fused<value_project,filter,value_project,value_project,value_project,key_project>:OptionalRecord>" ]
    0 -> 1 [ label = "() → arroyo_types :: nexmark :: Event" ]
    2 -> 7 [ label = "generated_struct_3484707784589493356 -left→ generated_struct_15490570238723732322" ]
    4 -> 7 [ label = "generated_struct_3484707784589493356 -right→ generated_struct_16745226037119149261" ]
    1 -> 8 [ label = "() → arroyo_types :: nexmark :: Event" ]
    7 -> 5 [ label = "generated_struct_3484707784589493356 → (generated_struct_15490570238723732322 , generated_struct_16745226037119149261)" ]
    5 -> 6 [ label = "() → generated_struct_6614185072020766535" ]
    8 -> 3 [ label = "generated_struct_17942395924573474124 ⤨ generated_struct_17349934388688571038" ]
    1 -> 2 [ label = "() → arroyo_types :: nexmark :: Event" ]
    3 -> 4 [ label = "generated_struct_17942395924573474124 → generated_struct_18028448699472152953" ]
    ```
    
No output is produced

@edmondop
Copy link
Contributor

Screenshot 2023-08-31 at 3 58 33 PM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request sql Related to the DataFusion SQL integration
Projects
None yet
Development

No branches or pull requests

2 participants