feat: Support RIGHT/FULL joins in NLJ memory-limited execution#21833
feat: Support RIGHT/FULL joins in NLJ memory-limited execution#21833viirya merged 2 commits intoapache:mainfrom
Conversation
eb80057 to
57c8cb8
Compare
| && !need_produce_right_in_final(self.join_type) | ||
| { | ||
| // Condition: disk manager supports temp files (needed for spilling) | ||
| let spill_state = if context.runtime_env().disk_manager.tmp_files_enabled() { |
There was a problem hiding this comment.
I think we need to keep the core invariant here that memory-limited execution produces the same results as the single-pass NLJ for every enabled join type.
Enabling fallback for FULL joins exposes the memory-limited path to multi-partition right inputs, but each output partition builds its own per-chunk JoinLeftData with AtomicUsize::new(1). That means left-unmatched rows are emitted based only on matches seen by that partition's right-side input.
In the single-pass path, collect_left_input(..., right_partition_count) coordinates left-unmatched emission across all right partitions. The fallback path does not appear to do that yet.
For a FULL JOIN with target_partitions > 1, a left row that matches a row in another right partition can still be emitted as unmatched by this partition, which would produce incorrect duplicate/null-padded rows.
Could we either keep the previous exclusion for FULL until the fallback path has coordinated cross-partition left match state, or make the memory-limited left bitmap/probe completion shared across right partitions for each left chunk?
There was a problem hiding this comment.
Good catch — you're right that the fallback path doesn't yet coordinate left-bitmap state across right partitions. I've added a guard that disables fallback for FULL JOIN when right_partition_count > 1, falling back to standard OOM behavior in that case. This preserves correctness for the case you flagged.
A note: I noticed the same pre-existing concern applies to LEFT/LEFT SEMI/LEFT ANTI/LEFT MARK in the multi-partition fallback path, which was introduced in Phase 1 of this work (#21448) — not new to this PR. We should fix it. Since it requires a larger refactor (sharing the per-chunk JoinLeftData and probe-thread counter across partitions), I'll address it in a follow-up PR rather than expanding this one's scope.
| # we must emit the correct unmatched rows at the end. | ||
| query II rowsort | ||
| SELECT t1.v1, t2.v2 | ||
| FROM generate_series(1, 5) AS t1(v1) |
There was a problem hiding this comment.
This test says it exercises the global right bitmap across multiple left chunks, but generate_series(1, 5) looks too small to trigger the memory-limited fallback under the 150K limit. As a result, it seems to run through the unchanged single-pass path instead.
The larger RIGHT JOIN case above does spill, but all left chunks match the single right row, so it would not catch a bad global OR/accumulation or incorrect final unmatched emission.
Could we add a spilling RIGHT, FULL, RIGHT ANTI, or RIGHT MARK case where different right rows are matched by different left chunks and at least one right row remains unmatched? It would also be good to assert spill_count for that query so we know the fallback path is actually exercised.
There was a problem hiding this comment.
Good point — the small generate_series(1, 5) cases ran in single-pass and didn't exercise the global bitmap. I've replaced them with a 100K-left × 200-right test using the predicate (t1.v1 + t2.v2) = 2 AND t2.v2 <= 100. This is non-equi (forces NLJ), forces spill (left side ~800KB > 150K limit), and produces exactly 1 matched pair + 199 unmatched right rows — so each right batch has both bits-on and bits-off entries that must be correctly accumulated across passes. There's a corresponding EXPLAIN ANALYZE assertion confirming spill_count=2. Added the same predicate for FULL JOIN too.
| let bitmap = std::mem::take(&mut self.current_right_batch_matched) | ||
| .expect("right bitmap should be available"); | ||
| let (values, _nulls) = bitmap.into_parts(); | ||
|
|
There was a problem hiding this comment.
Small readability suggestion: could this bitmap merge/accounting block be extracted into a helper on SpillStateActive, maybe something like merge_current_right_bitmap(idx, values)?
The state machine is already pretty dense, and centralizing the first-seen vs OR-merge behavior would make the global bitmap invariant easier to audit and test.
There was a problem hiding this comment.
Done — extracted to SpillStateActive::merge_current_right_bitmap(idx, values), which centralizes the first-seen-vs-OR-merge behavior and the reservation accounting. The state-machine call site is now a 2-line invocation.
57c8cb8 to
2232ddd
Compare
Previously RIGHT/FULL/RIGHT SEMI/RIGHT ANTI/RIGHT MARK joins were excluded from the memory-limited (multi-pass) fallback path because they need to track which right rows have been matched across all left chunks. They would OOM instead of spilling. Now all join types support the fallback: - A global right bitmap (Vec<BooleanBuffer>, indexed by right batch sequence number) accumulates matches across all left chunk passes. ReplayableStreamSource guarantees consistent batch boundaries across passes, so batch sequence numbers are stable. - In memory-limited mode, EmitRightUnmatched merges the current batch's bitmap into the global accumulator (bitwise OR) instead of emitting unmatched rows immediately. - After the last left chunk, a new state EmitGlobalRightUnmatched replays the right side one more time and uses the accumulated bitmap to emit unmatched right rows correctly. Single-pass behavior is unchanged: the global bitmap path is only active when is_memory_limited() is true. Co-authored-by: Claude Code
2232ddd to
87f64ba
Compare
The optimizer pushes `t2.v2 <= 100` into a projection on the right side, rewriting it as a `join_proj_push_down_*` boolean column. This inserts an extra ProjectionExec and changes the NLJ filter expression. Update the expected EXPLAIN ANALYZE output to match. Co-authored-by: Claude Code
|
Thank you @kosiew |
Which issue does this PR close?
Rationale for this change
What changes are included in this PR?
Previously RIGHT/FULL/RIGHT SEMI/RIGHT ANTI/RIGHT MARK joins were excluded from the memory-limited (multi-pass) fallback path because they need to track which right rows have been matched across all left chunks. They would OOM instead of spilling.
Now all join types support the fallback:
A global right bitmap (Vec, indexed by right batch sequence number) accumulates matches across all left chunk passes. ReplayableStreamSource guarantees consistent batch boundaries across passes, so batch sequence numbers are stable.
In memory-limited mode, EmitRightUnmatched merges the current batch's bitmap into the global accumulator (bitwise OR) instead of emitting unmatched rows immediately.
After the last left chunk, a new state EmitGlobalRightUnmatched replays the right side one more time and uses the accumulated bitmap to emit unmatched right rows correctly.
Single-pass behavior is unchanged: the global bitmap path is only active when is_memory_limited() is true.
Co-authored-by: Claude Code
Are these changes tested?
Unit tests and e2e tests
Are there any user-facing changes?
No