-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[CALCITE-3546] Improve EnumerableDefaults nested loop join
Provide a new implementation of nested loop join that, unlike the existing one, does not require to build the complete result as a list before returning it. Instead, it iterates through the outer and inner enumerables and returns the results step by step.
- Loading branch information
Showing
1 changed file
with
135 additions
and
28 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1803,11 +1803,24 @@ public static <TSource, TInner, TResult> Enumerable<TResult> nestedLoopJoin( | |
final Predicate2<TSource, TInner> predicate, | ||
Function2<TSource, TInner, TResult> resultSelector, | ||
final JoinType joinType) { | ||
// Building the result as a list is easy but hogs memory. We should iterate. | ||
if (!joinType.generatesNullsOnLeft()) { | ||
return nestedLoopJoinOptimized(outer, inner, predicate, resultSelector, joinType); | ||
} | ||
return nestedLoopJoinAsList(outer, inner, predicate, resultSelector, joinType); | ||
} | ||
|
||
/** | ||
* Implementation of nested loop join that builds the complete result as a list | ||
* and then returns it. This is an easy-to-implement solution, but hogs memory. | ||
*/ | ||
private static <TSource, TInner, TResult> Enumerable<TResult> nestedLoopJoinAsList( | ||
final Enumerable<TSource> outer, final Enumerable<TInner> inner, | ||
final Predicate2<TSource, TInner> predicate, | ||
Function2<TSource, TInner, TResult> resultSelector, | ||
final JoinType joinType) { | ||
final boolean generateNullsOnLeft = joinType.generatesNullsOnLeft(); | ||
final boolean generateNullsOnRight = joinType.generatesNullsOnRight(); | ||
final List<TResult> result = new ArrayList<>(); | ||
final Enumerator<TSource> lefts = outer.enumerator(); | ||
final List<TInner> rightList = inner.toList(); | ||
final Set<TInner> rightUnmatched; | ||
if (generateNullsOnLeft) { | ||
|
@@ -1816,40 +1829,134 @@ public static <TSource, TInner, TResult> Enumerable<TResult> nestedLoopJoin( | |
} else { | ||
rightUnmatched = null; | ||
} | ||
while (lefts.moveNext()) { | ||
int leftMatchCount = 0; | ||
final TSource left = lefts.current(); | ||
final Enumerator<TInner> rights = Linq4j.iterableEnumerator(rightList); | ||
while (rights.moveNext()) { | ||
TInner right = rights.current(); | ||
if (predicate.apply(left, right)) { | ||
++leftMatchCount; | ||
if (joinType == JoinType.ANTI) { | ||
break; | ||
} else { | ||
if (rightUnmatched != null) { | ||
rightUnmatched.remove(right); | ||
} | ||
result.add(resultSelector.apply(left, right)); | ||
if (joinType == JoinType.SEMI) { | ||
try (Enumerator<TSource> lefts = outer.enumerator()) { | ||
while (lefts.moveNext()) { | ||
int leftMatchCount = 0; | ||
final TSource left = lefts.current(); | ||
for (TInner right : rightList) { | ||
if (predicate.apply(left, right)) { | ||
++leftMatchCount; | ||
if (joinType == JoinType.ANTI) { | ||
break; | ||
} else { | ||
if (rightUnmatched != null) { | ||
rightUnmatched.remove(right); | ||
} | ||
result.add(resultSelector.apply(left, right)); | ||
if (joinType == JoinType.SEMI) { | ||
break; | ||
} | ||
} | ||
} | ||
} | ||
if (leftMatchCount == 0 && (generateNullsOnRight || joinType == JoinType.ANTI)) { | ||
result.add(resultSelector.apply(left, null)); | ||
} | ||
} | ||
if (leftMatchCount == 0 && (generateNullsOnRight || joinType == JoinType.ANTI)) { | ||
result.add(resultSelector.apply(left, null)); | ||
if (rightUnmatched != null) { | ||
for (TInner right : rightUnmatched) { | ||
result.add(resultSelector.apply(null, right)); | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
rubenada
Author
Contributor
|
||
} | ||
} | ||
return Linq4j.asEnumerable(result); | ||
} | ||
if (rightUnmatched != null) { | ||
final Enumerator<TInner> rights = | ||
Linq4j.iterableEnumerator(rightUnmatched); | ||
while (rights.moveNext()) { | ||
TInner right = rights.current(); | ||
result.add(resultSelector.apply(null, right)); | ||
} | ||
} | ||
|
||
/** | ||
* Implementation of nested loop join that, unlike {@link #nestedLoopJoinAsList}, does not | ||
* require to build the complete result as a list before returning it. Instead, it iterates | ||
* through the outer enumerable and inner enumerable and returns the results step by step. | ||
* It does not support RIGHT / FULL join. | ||
*/ | ||
private static <TSource, TInner, TResult> Enumerable<TResult> nestedLoopJoinOptimized( | ||
final Enumerable<TSource> outer, final Enumerable<TInner> inner, | ||
final Predicate2<TSource, TInner> predicate, | ||
Function2<TSource, TInner, TResult> resultSelector, | ||
final JoinType joinType) { | ||
if (joinType == JoinType.RIGHT || joinType == JoinType.FULL) { | ||
throw new IllegalArgumentException("JoinType " + joinType + " is unsupported"); | ||
} | ||
return Linq4j.asEnumerable(result); | ||
|
||
return new AbstractEnumerable<TResult>() { | ||
public Enumerator<TResult> enumerator() { | ||
return new Enumerator<TResult>() { | ||
private Enumerator<TSource> outerEnumerator = outer.enumerator(); | ||
private Enumerator<TInner> innerEnumerator = null; | ||
private boolean outerMatch = false; // whether the outerValue has matched an innerValue | ||
private TSource outerValue; | ||
private TInner innerValue; | ||
private int state = 0; // 0 moving outer, 1 moving inner | ||
|
||
@Override public TResult current() { | ||
return resultSelector.apply(outerValue, innerValue); | ||
} | ||
|
||
@Override public boolean moveNext() { | ||
while (true) { | ||
switch (state) { | ||
case 0: | ||
// move outer | ||
if (!outerEnumerator.moveNext()) { | ||
return false; | ||
} | ||
outerValue = outerEnumerator.current(); | ||
closeInner(); | ||
innerEnumerator = inner.enumerator(); | ||
outerMatch = false; | ||
state = 1; | ||
continue; | ||
case 1: | ||
// move inner | ||
if (innerEnumerator.moveNext()) { | ||
innerValue = innerEnumerator.current(); | ||
if (predicate.apply(outerValue, innerValue)) { | ||
outerMatch = true; | ||
switch (joinType) { | ||
case ANTI: // try next outer row | ||
state = 0; | ||
continue; | ||
case SEMI: // return result, and try next outer row | ||
state = 0; | ||
return true; | ||
case INNER: | ||
case LEFT: // INNER and LEFT just return result | ||
return true; | ||
} | ||
} // else (predicate returned false) continue: move inner | ||
} else { // innerEnumerator is over | ||
state = 0; | ||
innerValue = null; | ||
if (!outerMatch | ||
&& (joinType == JoinType.LEFT || joinType == JoinType.ANTI)) { | ||
// No match detected: outerValue is a result for LEFT / ANTI join | ||
return true; | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
@Override public void reset() { | ||
state = 0; | ||
outerMatch = false; | ||
outerEnumerator.reset(); | ||
closeInner(); | ||
} | ||
|
||
@Override public void close() { | ||
outerEnumerator.close(); | ||
closeInner(); | ||
} | ||
|
||
private void closeInner() { | ||
if (innerEnumerator != null) { | ||
innerEnumerator.close(); | ||
innerEnumerator = null; | ||
} | ||
} | ||
}; | ||
} | ||
}; | ||
} | ||
|
||
/** Joins two inputs that are sorted on the key. */ | ||
|
When was the inner enumerable resource released ?