Skip to content
36 changes: 35 additions & 1 deletion datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ pub(super) enum SortMergeJoinState {
Polling,
/// Joining polled data and making output
JoinOutput,
/// Emit ready data if have any and then go back to [`Self::Init`] state
EmitReadyThenInit,
/// No more output
Exhausted,
}
Expand Down Expand Up @@ -598,13 +600,45 @@ impl Stream for SortMergeJoinStream {
self.current_ordering = self.compare_streamed_buffered()?;
self.state = SortMergeJoinState::JoinOutput;
}
SortMergeJoinState::EmitReadyThenInit => {
// If have data to emit, emit it and if no more, change to next

// Verify metadata alignment before checking if we have batches to output
self.joined_record_batches
.filter_metadata
.debug_assert_metadata_aligned();

// For filtered joins, skip output and let Init state handle it
if needs_deferred_filtering(&self.filter, self.join_type) {
self.state = SortMergeJoinState::Init;
continue;
}

// For non-filtered joins, only output if we have a completed batch
// (opportunistic output when target batch size is reached)
if self
.joined_record_batches
.joined_batches
.has_completed_batch()
{
let record_batch = self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a point in not doing a let Some() on next_completed_batch()?
afaict it's a simple pop and has_completed_batch() just checks that the queue isn't empty

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next refactor I have already

.joined_record_batches
.joined_batches
.next_completed_batch()
.expect("has_completed_batch was true");
(&record_batch)
.record_output(&self.join_metrics.baseline_metrics());
return Poll::Ready(Some(Ok(record_batch)));
}
self.state = SortMergeJoinState::Init;
}
SortMergeJoinState::JoinOutput => {
self.join_partial()?;

if self.num_unfrozen_pairs() < self.batch_size {
if self.buffered_data.scanning_finished() {
self.buffered_data.scanning_reset();
self.state = SortMergeJoinState::Init;
self.state = SortMergeJoinState::EmitReadyThenInit;
}
} else {
self.freeze_all()?;
Expand Down
Loading