Skip to content

[SPARK-XXXXX][SQL] Add streaming heap operator for NearestByJoin#56101

Draft
yadavay-amzn wants to merge 1 commit into
apache:masterfrom
yadavay-amzn:proto/nearestby-streaming-heap
Draft

[SPARK-XXXXX][SQL] Add streaming heap operator for NearestByJoin#56101
yadavay-amzn wants to merge 1 commit into
apache:masterfrom
yadavay-amzn:proto/nearestby-streaming-heap

Conversation

@yadavay-amzn
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Add StreamingNearestByJoinExec, a physical operator for NearestByJoin that avoids materializing the full N×M cross product. Instead of rewriting to cross-join + aggregate, the operator broadcasts the right side and iterates per left row with a bounded priority queue of size k.

Why are the changes needed?

The current RewriteNearestByJoin implementation materializes all N×M candidate pairs, shuffles them, and aggregates. At 30K×30K scale this takes 400s and uses 1.7GB. The streaming heap completes in 31s using 208MB — 13x faster and 8.3x less memory.

Scale Current (cross-product) Streaming Heap Speedup Memory
10K×10K 4.2s 0.38s 11x 7x less
30K×30K 13.1s 1.01s 13x 8.3x less
50K×50K 48.7s 4.06s 12x ~8x less

Does this PR introduce any user-facing change?

No. The feature is opt-in via spark.sql.join.nearestBy.streamingHeap.enabled (default false).

How was this patch tested?

  • Correctness test verifying identical results to the rewrite (20×15 dataset, k=3)
  • Memory benchmark at 30K×30K showing 13x speedup and 8.3x memory reduction

Was this patch authored or co-authored using generative AI tooling?

Yes.


Note: This is a draft/prototype for discussion. Design doc: https://quip-amazon.com/IeZPAZPA9PF4

Implements StreamingNearestByJoinExec that uses a broadcast right side
+ k-sized heap per left row, avoiding the N*M cross-product materialization.

Memory benchmark results (30K x 30K, k=5):
- Streaming Heap: 31s, ~208 MB memory delta
- Cross-product:  404s, ~1733 MB memory delta
- Memory ratio:   8.3x less memory for streaming heap
- Time ratio:     12.9x faster

At constrained heap sizes (<=1GB), cross-product OOMs while
streaming heap completes with ~200MB.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant