Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean left semi join hash #1049

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 7 additions & 33 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
Original file line number Diff line number Diff line change
Expand Up @@ -169,51 +169,25 @@ case class LeftSemiJoinHash(
def execute() = {

buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
val hashTable = new java.util.HashSet[Row]()
val hashSet = new java.util.HashSet[Row]()
var currentRow: Row = null

// Create a Hash set of buildKeys
while (buildIter.hasNext) {
currentRow = buildIter.next()
val rowKey = buildSideKeyGenerator(currentRow)
if(!rowKey.anyNull) {
val keyExists = hashTable.contains(rowKey)
val keyExists = hashSet.contains(rowKey)
if (!keyExists) {
hashTable.add(rowKey)
hashSet.add(rowKey)
}
}
}

new Iterator[Row] {
private[this] var currentStreamedRow: Row = _
private[this] var currentHashMatched: Boolean = false

private[this] val joinKeys = streamSideKeyGenerator()

override final def hasNext: Boolean =
streamIter.hasNext && fetchNext()

override final def next() = {
currentStreamedRow
}

/**
* Searches the streamed iterator for the next row that has at least one match in hashtable.
*
* @return true if the search is successful, and false the streamed iterator runs out of
* tuples.
*/
private final def fetchNext(): Boolean = {
currentHashMatched = false
while (!currentHashMatched && streamIter.hasNext) {
currentStreamedRow = streamIter.next()
if (!joinKeys(currentStreamedRow).anyNull) {
currentHashMatched = hashTable.contains(joinKeys.currentValue)
}
}
currentHashMatched
}
}
val joinKeys = streamSideKeyGenerator()
streamIter.filter(current => {
!joinKeys(current).anyNull && hashSet.contains(joinKeys.currentValue)
})
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
0 val_0
0 val_0
0 val_0
2 val_2
4 val_4
5 val_5
5 val_5
5 val_5
8 val_8
9 val_9
10 val_10
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
0 val_0
0 val_0
0 val_0
4 val_2
8 val_4
10 val_5
10 val_5
10 val_5
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
val_0
val_0
val_0
val_10
val_2
val_4
val_5
val_5
val_5
val_8
val_9
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
0 val_0
0 val_0
0 val_0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
val_10
val_8
val_9
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
4 val_2
8 val_4
10 val_5
10 val_5
10 val_5
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
0
0
0
0
0
0
2
4
4
5
5
5
8
8
9
10
10
10
10
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
0 val_0
0 val_0
0 val_0
8 val_8
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
0 val_0 0 val_0
0 val_0 0 val_0
0 val_0 0 val_0
0 val_0 0 val_0
0 val_0 0 val_0
0 val_0 0 val_0
0 val_0 0 val_0
0 val_0 0 val_0
0 val_0 0 val_0
4 val_4 4 val_2
8 val_8 8 val_4
10 val_10 10 val_5
10 val_10 10 val_5
10 val_10 10 val_5
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
0 val_0
0 val_0
0 val_0
4 val_2
8 val_4
10 val_5
10 val_5
10 val_5
16 val_8
18 val_9
20 val_10
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
0 val_0
0 val_0
0 val_0
0 val_0
0 val_0
0 val_0
2 val_2
4 val_4
5 val_5
5 val_5
5 val_5
8 val_8
9 val_9
10 val_10
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
0
0
0
0
0
0
4
4
8
8
10
10
10
10
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
4
4
8
8
10
10
10
10
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
NULL
NULL
NULL
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
4
4
8
8
10
10
10
10
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
NULL
NULL
NULL
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
4
4
8
8
10
10
10
10
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
4
4
8
8
10
10
10
10
16
18
20
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
NULL
NULL
NULL
NULL
NULL
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
4
4
8
8
10
10
10
10
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
NULL
NULL
NULL
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
2
4
4
5
5
5
8
8
9
10
10
10
10
10
10
10
10
10
10
10
10
Loading