diff --git a/crates/core/common/src/incrementalizer.rs b/crates/core/common/src/incrementalizer.rs index 2edfd74b6..af2f27563 100644 --- a/crates/core/common/src/incrementalizer.rs +++ b/crates/core/common/src/incrementalizer.rs @@ -61,7 +61,7 @@ impl BlockNumForm { /// ## Special cases /// - If a join as an `on` condition that is an inequality on `_block_num`, e.g. `l._block_num <= r._block_num`, then Δ(L⋈R)=(L[t−1]​⋈ΔR)∪(ΔL⋈ΔR), a term is optimized away. /// This is because the inequality can be relaxed to `start <= r_block_num`. More generally, if the `on` clause can be relaxed by a lower start bound, we can push it down and potentially eliminate a term. This is not yet implemented. -/// - If a join has a `r._block_num = l.block_num` condition, then Δ(L⋈R)=ΔL⋈ΔR. These joins may stack with each other and with linear operators, or on top of output of general joins. This special case is not yet implemented. +/// - If a join has a `l._block_num = r._block_num` condition, then Δ(L⋈R)=ΔL⋈ΔR. These joins may stack with each other and with linear operators, or on top of output of general joins. /// /// ## Further reading /// - The inner join update formula is well-known and commonly implemented in incremental view maintenance systems. For one academic reference see: @@ -123,7 +123,13 @@ impl TreeNodeRewriter for Incrementalizer { match incremental_op_kind(&node, BlockNumForm::Propagated) .map_err(|e| DataFusionError::External(e.into()))? { - IncrementalOpKind::Linear => Ok(Transformed::no(node)), + IncrementalOpKind::Linear | IncrementalOpKind::BlockNumEqJoin => { + // Linear ops and _block_num equality joins just push the current range + // to both children via normal tree recursion. For BlockNumEqJoin this works + // because _block_num equality guarantees temporal alignment: + // Δ(L⋈R) = ΔL⋈ΔR and History(L⋈R) = History(L)⋈History(R). + Ok(Transformed::no(node)) + } IncrementalOpKind::InnerJoin => { let LogicalPlan::Join(join) = node else { unreachable!("IncrementalOpKind::InnerJoin only returned for Join nodes") @@ -279,6 +285,9 @@ pub enum NonIncrementalQueryError { pub enum IncrementalOpKind { Linear, InnerJoin, + /// An inner join with `l._block_num = r._block_num` in its equi-join conditions. + /// Acts like a linear operator: the range filter can be pushed to both children. + BlockNumEqJoin, Table, } @@ -305,12 +314,13 @@ pub fn incremental_op_kind( // Joins Join(join) => match join.join_type { - // TODO: detect and split out `l._block_num = r._block_num` joins - JoinType::Inner => Ok(InnerJoin), - - // Semi-joins are just projections of inner joins - JoinType::LeftSemi => Ok(InnerJoin), - JoinType::RightSemi => Ok(InnerJoin), + JoinType::Inner | JoinType::LeftSemi | JoinType::RightSemi => { + if has_block_num_eq_condition(&join.on) { + Ok(BlockNumEqJoin) + } else { + Ok(InnerJoin) + } + } // Outer and anti joins are not incremental JoinType::Left @@ -360,6 +370,16 @@ pub fn incremental_op_kind( } } +/// Returns true if any equi-join condition pair equates `_block_num` columns on both sides. +fn has_block_num_eq_condition(on: &[(Expr, Expr)]) -> bool { + on.iter() + .any(|(l, r)| is_block_num_col(l) && is_block_num_col(r)) +} + +fn is_block_num_col(expr: &Expr) -> bool { + matches!(expr, Expr::Column(c) if c.name == RESERVED_BLOCK_NUM_COLUMN_NAME) +} + fn empty_relation(schema: DFSchemaRef) -> EmptyRelation { EmptyRelation { produce_one_row: false, diff --git a/tests/config/packages/block_num_eq_join/amp.config.ts b/tests/config/packages/block_num_eq_join/amp.config.ts new file mode 100644 index 000000000..718937687 --- /dev/null +++ b/tests/config/packages/block_num_eq_join/amp.config.ts @@ -0,0 +1,23 @@ +import { defineDataset } from "@edgeandnode/amp" + +export default defineDataset(() => ({ + name: "block_num_eq_join", + network: "anvil", + dependencies: { + anvil_rpc: "_/anvil_rpc@0.0.0", + }, + tables: { + // Two tables with distinct per-block values, designed to be joined on block_num. + // left_val and right_val are non-join columns with predictable, different values + // per block, so result assertions can verify the join matched the correct rows. + lefty: { + sql: `SELECT block_num, CAST(block_num AS BIGINT) * 10 + 1 AS left_val FROM anvil_rpc.blocks`, + network: "anvil", + }, + righty: { + sql: `SELECT block_num, CAST(block_num AS BIGINT) * 10 + 2 AS right_val FROM anvil_rpc.blocks`, + network: "anvil", + }, + }, + functions: {}, +})) diff --git a/tests/config/packages/block_num_eq_join/package.json b/tests/config/packages/block_num_eq_join/package.json new file mode 100644 index 000000000..6b365443d --- /dev/null +++ b/tests/config/packages/block_num_eq_join/package.json @@ -0,0 +1,5 @@ +{ + "name": "@amp-datasets/block-num-eq-join", + "private": true, + "type": "module" +} diff --git a/tests/config/packages/block_num_eq_join/tsconfig.json b/tests/config/packages/block_num_eq_join/tsconfig.json new file mode 100644 index 000000000..03bc4d7be --- /dev/null +++ b/tests/config/packages/block_num_eq_join/tsconfig.json @@ -0,0 +1,4 @@ +{ + "extends": "../tsconfig.json", + "include": ["amp.config.ts"] +} diff --git a/tests/specs/streaming-join-block-num-eq-anvil.yaml b/tests/specs/streaming-join-block-num-eq-anvil.yaml new file mode 100644 index 000000000..bec6aeb00 --- /dev/null +++ b/tests/specs/streaming-join-block-num-eq-anvil.yaml @@ -0,0 +1,119 @@ +# Streaming block_num = block_num join test using Anvil +# +# Validates correctness of streaming joins where the join condition is +# `l.block_num = r.block_num` — the pattern targeted by the _block_num +# equality incrementalizer optimization (where Δ(L⋈R) = ΔL⋈ΔR). +# +# Two derived tables (lefty, righty) each carry a distinct computed column +# per block (left_val = block_num*10+1, right_val = block_num*10+2). +# A streaming query joins them on block_num. Result assertions verify +# the non-join columns, proving the join matched the correct rows on both +# sides — not just that the right number of rows appeared. +# +# Four incremental batches with varying sizes stress history/delta +# boundaries: as history grows (0→3→5→6→9), any leakage of +# history×delta or delta×history terms would surface as duplicates, +# and any mis-matched rows would show wrong left_val/right_val. + +- anvil: {} + +# ── Batch 1: genesis + 3 mined = blocks 0-3 ────────────────────────── + +- name: mine_initial + anvil_mine: 3 + +- name: dump_anvil_rpc_initial + dataset: _/anvil_rpc@0.0.0 + end: 3 + +- name: register_block_num_eq_join + dataset: block_num_eq_join + tag: "0.0.0" + +- name: dump_derived_initial + dataset: _/block_num_eq_join@0.0.0 + end: 3 + +- name: register_stream + stream: | + SELECT l.block_num, l.left_val, r.right_val + FROM block_num_eq_join.lefty l + JOIN block_num_eq_join.righty r ON l._block_num = r._block_num + SETTINGS stream = true + +- name: take_batch_1 + stream: register_stream + take: 4 + results: | + [ + {"block_num": 0, "left_val": 1, "right_val": 2}, + {"block_num": 1, "left_val": 11, "right_val": 12}, + {"block_num": 2, "left_val": 21, "right_val": 22}, + {"block_num": 3, "left_val": 31, "right_val": 32} + ] + +# ── Batch 2: 2 new blocks (4-5), history = 0-3 ────────────────────── + +- name: mine_batch_2 + anvil_mine: 2 + +- name: dump_anvil_rpc_batch_2 + dataset: _/anvil_rpc@0.0.0 + end: 5 + +- name: dump_derived_batch_2 + dataset: _/block_num_eq_join@0.0.0 + end: 5 + +- name: take_batch_2 + stream: register_stream + take: 2 + results: | + [ + {"block_num": 4, "left_val": 41, "right_val": 42}, + {"block_num": 5, "left_val": 51, "right_val": 52} + ] + +# ── Batch 3: 1 new block (6), history = 0-5 ───────────────────────── + +- name: mine_batch_3 + anvil_mine: 1 + +- name: dump_anvil_rpc_batch_3 + dataset: _/anvil_rpc@0.0.0 + end: 6 + +- name: dump_derived_batch_3 + dataset: _/block_num_eq_join@0.0.0 + end: 6 + +- name: take_batch_3 + stream: register_stream + take: 1 + results: | + [ + {"block_num": 6, "left_val": 61, "right_val": 62} + ] + +# ── Batch 4: 3 new blocks (7-9), history = 0-6 ────────────────────── + +- name: mine_batch_4 + anvil_mine: 3 + +- name: dump_anvil_rpc_batch_4 + dataset: _/anvil_rpc@0.0.0 + end: 9 + +- name: dump_derived_batch_4 + dataset: _/block_num_eq_join@0.0.0 + end: 9 + +- name: take_batch_4 + stream: register_stream + take: 3 + results: | + [ + {"block_num": 7, "left_val": 71, "right_val": 72}, + {"block_num": 8, "left_val": 81, "right_val": 82}, + {"block_num": 9, "left_val": 91, "right_val": 92} + ] diff --git a/tests/src/tests/it_streaming_join.rs b/tests/src/tests/it_streaming_join.rs index d64425d58..6ffb035f6 100644 --- a/tests/src/tests/it_streaming_join.rs +++ b/tests/src/tests/it_streaming_join.rs @@ -69,3 +69,29 @@ async fn streaming_join_with_reorg() { .await .expect("Failed to run streaming join with reorg spec"); } + +#[tokio::test(flavor = "multi_thread")] +async fn streaming_join_block_num_eq() { + logging::init(); + + let test_ctx = TestCtxBuilder::new("streaming_join_block_num_eq") + .with_anvil_ipc() + .with_dataset_manifest("anvil_rpc") + .build() + .await + .expect("Failed to create test environment"); + + let mut client = test_ctx + .new_flight_client() + .await + .expect("Failed to connect FlightClient"); + + run_spec( + "streaming-join-block-num-eq-anvil", + &test_ctx, + &mut client, + None, + ) + .await + .expect("Failed to run streaming join block_num eq spec"); +}