Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@
Ordering::Equal => {
let l_key_idx = cur1.cur_idx();
let r_key_idx = cur2.cur_idx();
// Current equal key. We will find all the rows
// with the same key in both streams.
let current_key = cur1.key(l_key_idx);

self.indices.push(cur1.cur_idx());
self.exists.push(true);
Expand All @@ -117,15 +120,15 @@
let mut r_equal = true;
while l_equal && r_equal {
if l_equal {
l_equal = !cur1.finished() && cur1.cur_key() == cur1.key(l_key_idx);
l_equal = !cur1.finished() && cur1.cur_key() == current_key;
if l_equal {
self.indices.push(cur1.cur_idx());
self.exists.push(true);
cur_forward!(cur1);
}
}
if r_equal {
r_equal = !cur2.finished() && cur2.cur_key() == cur2.key(r_key_idx);
r_equal = !cur2.finished() && cur2.cur_key() == current_key;
if r_equal {
cur_forward!(cur2);
}
Expand All @@ -134,23 +137,23 @@

if l_equal {
// stream left side
while !cur1.finished() && cur1.cur_key() == cur2.key(r_key_idx) {
while !cur1.finished() && cur1.cur_key() == current_key {
self.indices.push(cur1.cur_idx());
self.exists.push(true);
cur_forward!(cur1);
if self.should_flush() || cur1.num_buffered_batches() > 1 {
self.as_mut().flush(cur1, cur2).await?;

Check failure on line 145 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.3 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 145 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.5 JDK17 Scala-2.13 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 145 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.2 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 145 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.5 JDK21 Scala-2.13 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 145 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test Uniffle uniffle-0.10 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 145 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.4 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 145 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test Celeborn celeborn-0.6 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 145 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.1 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 145 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test Celeborn celeborn-0.5 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 145 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.0 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable
cur1.clean_out_dated_batches();

Check failure on line 146 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.3 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 146 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.5 JDK17 Scala-2.13 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 146 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.2 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 146 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.5 JDK21 Scala-2.13 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 146 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test Uniffle uniffle-0.10 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 146 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.4 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 146 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test Celeborn celeborn-0.6 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 146 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.1 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 146 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test Celeborn celeborn-0.5 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 146 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.0 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable
}
}
}

if r_equal {
// stream right side
while !cur2.finished() && cur2.cur_key() == cur1.key(l_key_idx) {
while !cur2.finished() && cur2.cur_key() == current_key {
cur_forward!(cur2);
if self.should_flush() || cur2.num_buffered_batches() > 1 {
self.as_mut().flush(cur1, cur2).await?;

Check failure on line 156 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.3 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 156 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.5 JDK17 Scala-2.13 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 156 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.2 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 156 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.5 JDK21 Scala-2.13 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 156 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test Uniffle uniffle-0.10 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 156 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.4 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 156 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test Celeborn celeborn-0.6 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 156 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.1 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 156 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test Celeborn celeborn-0.5 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 156 in native-engine/datafusion-ext-plans/src/joins/smj/existence_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.0 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable
cur2.clean_out_dated_batches();
}
}
Expand Down
12 changes: 8 additions & 4 deletions native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@
equal_rindices.push(cur2.cur_idx());
let l_key_idx = cur1.cur_idx();
let r_key_idx = cur2.cur_idx();
// Current equal key. We will find all the rows
// with the same key in both streams.
let current_key = cur1.key(l_key_idx);

cur_forward!(cur1);
cur_forward!(cur2);

Expand All @@ -136,15 +140,15 @@
let mut r_equal = true;
while l_equal && r_equal {
if l_equal {
l_equal = !cur1.finished() && cur1.cur_key() == cur1.key(l_key_idx);
l_equal = !cur1.finished() && cur1.cur_key() == current_key;
if l_equal {
has_multi_equal = true;
equal_lindices.push(cur1.cur_idx());
cur_forward!(cur1);
}
}
if r_equal {
r_equal = !cur2.finished() && cur2.cur_key() == cur2.key(r_key_idx);
r_equal = !cur2.finished() && cur2.cur_key() == current_key;
if r_equal {
has_multi_equal = true;
equal_rindices.push(cur2.cur_idx());
Expand All @@ -167,14 +171,14 @@

if r_equal {
// stream right side
while !cur2.finished() && cur2.cur_key() == cur1.key(l_key_idx) {
while !cur2.finished() && cur2.cur_key() == current_key {
for &lidx in &equal_lindices {
self.lindices.push(lidx);
self.rindices.push(cur2.cur_idx());
}
cur_forward!(cur2);
if self.should_flush() || cur2.num_buffered_batches() > 1 {
self.as_mut().flush(cur1, cur2).await?;

Check failure on line 181 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.3 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 181 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.5 JDK17 Scala-2.13 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 181 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.2 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 181 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.5 JDK21 Scala-2.13 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 181 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test Uniffle uniffle-0.10 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 181 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.4 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 181 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test Celeborn celeborn-0.6 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 181 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.1 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 181 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test Celeborn celeborn-0.5 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 181 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.0 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable
cur2.clean_out_dated_batches();
}
}
Expand All @@ -182,15 +186,15 @@

if l_equal {
// stream left side
while !cur1.finished() && cur1.cur_key() == cur2.key(r_key_idx) {
while !cur1.finished() && cur1.cur_key() == current_key {
for &ridx in &equal_rindices {
self.lindices.push(cur1.cur_idx());
self.rindices.push(ridx);
}
cur_forward!(cur1);
if self.should_flush() || cur1.num_buffered_batches() > 1 {
self.as_mut().flush(cur1, cur2).await?;

Check failure on line 196 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.3 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 196 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.5 JDK17 Scala-2.13 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 196 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.2 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 196 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.5 JDK21 Scala-2.13 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 196 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test Uniffle uniffle-0.10 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 196 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.4 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 196 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test Celeborn celeborn-0.6 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 196 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.1 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 196 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test Celeborn celeborn-0.5 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 196 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.0 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable
cur1.clean_out_dated_batches();

Check failure on line 197 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.3 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 197 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.5 JDK17 Scala-2.13 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 197 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.2 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 197 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.5 JDK21 Scala-2.13 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 197 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test Uniffle uniffle-0.10 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 197 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.4 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 197 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test Celeborn celeborn-0.6 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 197 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.1 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 197 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test Celeborn celeborn-0.5 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable

Check failure on line 197 in native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs

View workflow job for this annotation

GitHub Actions / Test spark-3.0 / Build Auron JAR

cannot borrow `*cur1` as mutable because it is also borrowed as immutable
}
}
}
Expand Down
11 changes: 7 additions & 4 deletions native-engine/datafusion-ext-plans/src/joins/smj/semi_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ impl<const P: JoinerParams> Joiner for SemiJoiner<P> {
Ordering::Equal => {
let l_key_idx = cur1.cur_idx();
let r_key_idx = cur2.cur_idx();
// Current equal key. We will find all the rows
// with the same key.
let current_key = cur1.key(l_key_idx);

if P.join_side == L && P.semi {
self.indices.push(l_key_idx);
Expand All @@ -160,7 +163,7 @@ impl<const P: JoinerParams> Joiner for SemiJoiner<P> {
let mut r_equal = true;
while l_equal && r_equal {
if l_equal {
l_equal = !cur1.finished() && cur1.cur_key() == cur1.key(l_key_idx);
l_equal = !cur1.finished() && cur1.cur_key() == current_key;
if l_equal {
if P.join_side == L && P.semi {
self.indices.push(cur1.cur_idx());
Expand All @@ -169,7 +172,7 @@ impl<const P: JoinerParams> Joiner for SemiJoiner<P> {
}
}
if r_equal {
r_equal = !cur2.finished() && cur2.cur_key() == cur2.key(r_key_idx);
r_equal = !cur2.finished() && cur2.cur_key() == current_key;
if r_equal {
if P.join_side == R && P.semi {
self.indices.push(cur2.cur_idx());
Expand All @@ -181,7 +184,7 @@ impl<const P: JoinerParams> Joiner for SemiJoiner<P> {

if l_equal {
// stream left side
while !cur1.finished() && cur1.cur_key() == cur2.key(r_key_idx) {
while !cur1.finished() && cur1.cur_key() == current_key {
if P.join_side == L && P.semi {
self.indices.push(cur1.cur_idx());
}
Expand All @@ -195,7 +198,7 @@ impl<const P: JoinerParams> Joiner for SemiJoiner<P> {

if r_equal {
// stream right side
while !cur2.finished() && cur2.cur_key() == cur1.key(l_key_idx) {
while !cur2.finished() && cur2.cur_key() == current_key {
if P.join_side == R && P.semi {
self.indices.push(cur2.cur_idx());
}
Expand Down
Loading