diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index e6735675125bd..c5c794f5a8c68 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -639,6 +639,7 @@ impl HashJoinStream { filter, JoinSide::Left, None, + self.join_type, )? } else { (left_indices, right_indices) @@ -707,6 +708,7 @@ impl HashJoinStream { &right_indices, &self.column_indices, join_side, + self.join_type, )?; self.output_buffer.push_batch(batch)?; @@ -770,6 +772,7 @@ impl HashJoinStream { &right_side, &self.column_indices, JoinSide::Left, + self.join_type, )?; self.output_buffer.push_batch(batch)?; } diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 1f6bc703a0300..a75a9893e9f1a 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -930,6 +930,7 @@ pub(crate) fn build_side_determined_results( &probe_indices, column_indices, build_hash_joiner.build_side, + join_type, ) .map(|batch| (batch.num_rows() > 0).then_some(batch)) } else { @@ -993,6 +994,7 @@ pub(crate) fn join_with_probe_batch( filter, build_hash_joiner.build_side, None, + join_type, )? } else { (build_indices, probe_indices) @@ -1031,6 +1033,7 @@ pub(crate) fn join_with_probe_batch( &probe_indices, column_indices, build_hash_joiner.build_side, + join_type, ) .map(|batch| (batch.num_rows() > 0).then_some(batch)) } diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index a9243fe04e28d..53b4c4f80236e 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -910,6 +910,7 @@ pub(crate) fn get_final_indices_from_bit_map( (left_indices, right_indices) } +#[expect(clippy::too_many_arguments)] pub(crate) fn apply_join_filter_to_indices( build_input_buffer: &RecordBatch, probe_batch: &RecordBatch, @@ -918,6 +919,7 @@ pub(crate) fn apply_join_filter_to_indices( filter: &JoinFilter, build_side: JoinSide, max_intermediate_size: Option, + join_type: JoinType, ) -> Result<(UInt64Array, UInt32Array)> { if build_indices.is_empty() && probe_indices.is_empty() { return Ok((build_indices, probe_indices)); @@ -938,6 +940,7 @@ pub(crate) fn apply_join_filter_to_indices( &probe_indices.slice(i, len), filter.column_indices(), build_side, + join_type, )?; let filter_result = filter .expression() @@ -959,6 +962,7 @@ pub(crate) fn apply_join_filter_to_indices( &probe_indices, filter.column_indices(), build_side, + join_type, )?; filter @@ -979,6 +983,7 @@ pub(crate) fn apply_join_filter_to_indices( /// Returns a new [RecordBatch] by combining the `left` and `right` according to `indices`. /// The resulting batch has [Schema] `schema`. +#[expect(clippy::too_many_arguments)] pub(crate) fn build_batch_from_indices( schema: &Schema, build_input_buffer: &RecordBatch, @@ -987,11 +992,19 @@ pub(crate) fn build_batch_from_indices( probe_indices: &UInt32Array, column_indices: &[ColumnIndex], build_side: JoinSide, + join_type: JoinType, ) -> Result { if schema.fields().is_empty() { + // For RightAnti and RightSemi joins, after `adjust_indices_by_join_type` + // the build_indices were untouched so only probe_indices hold the actual + // row count. + let row_count = match join_type { + JoinType::RightAnti | JoinType::RightSemi => probe_indices.len(), + _ => build_indices.len(), + }; let options = RecordBatchOptions::new() .with_match_field_names(true) - .with_row_count(Some(build_indices.len())); + .with_row_count(Some(row_count)); return Ok(RecordBatch::try_new_with_options( Arc::new(schema.clone()), diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 42441fe787dbe..1155bc4f3b2bf 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -5226,3 +5226,46 @@ DROP TABLE issue_20437_small; statement count 0 DROP TABLE issue_20437_large; + +# Test count(*) with right semi/anti joins returns correct row counts +# issue: https://github.com/apache/datafusion/issues/20669 + +statement ok +CREATE TABLE t1 (k INT, v INT); + +statement ok +CREATE TABLE t2 (k INT, v INT); + +statement ok +INSERT INTO t1 SELECT i AS k, i AS v FROM generate_series(1, 100) t(i); + +statement ok +INSERT INTO t2 VALUES (1, 1); + +query I +WITH t AS ( + SELECT * + FROM t1 + LEFT ANTI JOIN t2 ON t1.k = t2.k +) +SELECT count(*) +FROM t; +---- +99 + +query I +WITH t AS ( + SELECT * + FROM t1 + LEFT SEMI JOIN t2 ON t1.k = t2.k +) +SELECT count(*) +FROM t; +---- +1 + +statement count 0 +DROP TABLE t1; + +statement count 0 +DROP TABLE t2;