Skip to content

Commit

Permalink
Revert previous changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Feb 25, 2016
1 parent 687e948 commit 88f5020
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ case class BroadcastHashJoin(
*/
private def codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): String = {
val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
val (keyEv, _) = genStreamSideJoinKey(ctx, input)
val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
val matched = ctx.freshName("matched")
val buildVars = genBuildSideVars(ctx, matched)
val resultVars = buildSide match {
Expand Down Expand Up @@ -219,7 +219,7 @@ case class BroadcastHashJoin(
|// generate join key for stream side
|${keyEv.code}
|// find matches from HashedRelation
|UnsafeRow $matched = (UnsafeRow)$relationTerm.getValue(${keyEv.value});
|UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyEv.value});
|if ($matched != null) {
| ${buildVars.map(_.code).mkString("\n")}
| $outputCode
Expand All @@ -235,7 +235,7 @@ case class BroadcastHashJoin(
|// generate join key for stream side
|${keyEv.code}
|// find matches from HashRelation
|$bufferType $matches = ($bufferType)$relationTerm.get(${keyEv.value});
|$bufferType $matches = $anyNull ? null : ($bufferType)$relationTerm.get(${keyEv.value});
|if ($matches != null) {
| int $size = $matches.size();
| for (int $i = 0; $i < $size; $i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,11 @@ trait HashJoin {
while (currentHashMatches == null && streamIter.hasNext) {
currentStreamedRow = streamIter.next()
val key = joinKeys(currentStreamedRow)
// We do filtering null keys by inserting a Filter before inner join,
// So don't need to check nullability of keys again.
currentHashMatches = hashedRelation.get(key)
if (currentHashMatches != null) {
currentMatchPosition = 0
if (!key.anyNull) {
currentHashMatches = hashedRelation.get(key)
if (currentHashMatches != null) {
currentMatchPosition = 0
}
}
}
if (currentHashMatches == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,10 @@ private[joins] class SortMergeJoinScanner(
* results.
*/
final def findNextInnerJoinRows(): Boolean = {
advancedStreamed()
while (advancedStreamed() && streamedRowKey.anyNull) {
// Advance the streamed side of the join until we find the next row whose join key contains
// no nulls or we hit the end of the streamed iterator.
}
if (streamedRow == null) {
// We have consumed the entire streamed iterator, so there can be no more matches.
matchJoinKey = null
Expand All @@ -440,9 +443,14 @@ private[joins] class SortMergeJoinScanner(
// Advance both the streamed and buffered iterators to find the next pair of matching rows.
var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
do {
comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
else if (comp < 0) advancedStreamed()
if (streamedRowKey.anyNull) {
advancedStreamed()
} else {
assert(!bufferedRowKey.anyNull)
comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
else if (comp < 0) advancedStreamed()
}
} while (streamedRow != null && bufferedRow != null && comp != 0)
if (streamedRow == null || bufferedRow == null) {
// We have either hit the end of one of the iterators, so there can be no more matches.
Expand Down

0 comments on commit 88f5020

Please sign in to comment.