diff --git a/native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs b/native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs index 86331f5ca..1f83c58d0 100644 --- a/native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs +++ b/native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs @@ -106,6 +106,9 @@ impl Joiner for ExistenceJoiner { Ordering::Equal => { let l_key_idx = cur1.cur_idx(); let r_key_idx = cur2.cur_idx(); + // Current equal key. We will find all the rows + // with the same key in both streams. + let current_key = cur1.key(l_key_idx); self.indices.push(cur1.cur_idx()); self.exists.push(true); @@ -117,7 +120,7 @@ impl Joiner for ExistenceJoiner { let mut r_equal = true; while l_equal && r_equal { if l_equal { - l_equal = !cur1.finished() && cur1.cur_key() == cur1.key(l_key_idx); + l_equal = !cur1.finished() && cur1.cur_key() == current_key; if l_equal { self.indices.push(cur1.cur_idx()); self.exists.push(true); @@ -125,7 +128,7 @@ impl Joiner for ExistenceJoiner { } } if r_equal { - r_equal = !cur2.finished() && cur2.cur_key() == cur2.key(r_key_idx); + r_equal = !cur2.finished() && cur2.cur_key() == current_key; if r_equal { cur_forward!(cur2); } @@ -134,7 +137,7 @@ impl Joiner for ExistenceJoiner { if l_equal { // stream left side - while !cur1.finished() && cur1.cur_key() == cur2.key(r_key_idx) { + while !cur1.finished() && cur1.cur_key() == current_key { self.indices.push(cur1.cur_idx()); self.exists.push(true); cur_forward!(cur1); @@ -147,7 +150,7 @@ impl Joiner for ExistenceJoiner { if r_equal { // stream right side - while !cur2.finished() && cur2.cur_key() == cur1.key(l_key_idx) { + while !cur2.finished() && cur2.cur_key() == current_key { cur_forward!(cur2); if self.should_flush() || cur2.num_buffered_batches() > 1 { self.as_mut().flush(cur1, cur2).await?; diff --git a/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs b/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs index 3f2e06773..c744e5277 100644 --- a/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs +++ b/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs @@ -127,6 +127,10 @@ impl Joiner for FullJoiner Joiner for FullJoiner Joiner for FullJoiner Joiner for FullJoiner Joiner for FullJoiner Joiner for SemiJoiner

{ Ordering::Equal => { let l_key_idx = cur1.cur_idx(); let r_key_idx = cur2.cur_idx(); + // Current equal key. We will find all the rows + // with the same key. + let current_key = cur1.key(l_key_idx); if P.join_side == L && P.semi { self.indices.push(l_key_idx); @@ -160,7 +163,7 @@ impl Joiner for SemiJoiner

{ let mut r_equal = true; while l_equal && r_equal { if l_equal { - l_equal = !cur1.finished() && cur1.cur_key() == cur1.key(l_key_idx); + l_equal = !cur1.finished() && cur1.cur_key() == current_key; if l_equal { if P.join_side == L && P.semi { self.indices.push(cur1.cur_idx()); @@ -169,7 +172,7 @@ impl Joiner for SemiJoiner

{ } } if r_equal { - r_equal = !cur2.finished() && cur2.cur_key() == cur2.key(r_key_idx); + r_equal = !cur2.finished() && cur2.cur_key() == current_key; if r_equal { if P.join_side == R && P.semi { self.indices.push(cur2.cur_idx()); @@ -181,7 +184,7 @@ impl Joiner for SemiJoiner

{ if l_equal { // stream left side - while !cur1.finished() && cur1.cur_key() == cur2.key(r_key_idx) { + while !cur1.finished() && cur1.cur_key() == current_key { if P.join_side == L && P.semi { self.indices.push(cur1.cur_idx()); } @@ -195,7 +198,7 @@ impl Joiner for SemiJoiner

{ if r_equal { // stream right side - while !cur2.finished() && cur2.cur_key() == cur1.key(l_key_idx) { + while !cur2.finished() && cur2.cur_key() == current_key { if P.join_side == R && P.semi { self.indices.push(cur2.cur_idx()); }