Skip to content

Commit

Permalink
[SQL] [SPARK-2826] Reduce the memory copy while building the hashmap …
Browse files Browse the repository at this point in the history
…for HashOuterJoin

This is a follow up for #1147 , this PR will improve the performance about 10% - 15% in my local tests.
```
Before:
LeftOuterJoin: took 16750 ms ([3000000] records)
LeftOuterJoin: took 15179 ms ([3000000] records)
RightOuterJoin: took 15515 ms ([3000000] records)
RightOuterJoin: took 15276 ms ([3000000] records)
FullOuterJoin: took 19150 ms ([6000000] records)
FullOuterJoin: took 18935 ms ([6000000] records)

After:
LeftOuterJoin: took 15218 ms ([3000000] records)
LeftOuterJoin: took 13503 ms ([3000000] records)
RightOuterJoin: took 13663 ms ([3000000] records)
RightOuterJoin: took 14025 ms ([3000000] records)
FullOuterJoin: took 16624 ms ([6000000] records)
FullOuterJoin: took 16578 ms ([6000000] records)
```

Besides the performance improvement, I also do some clean up as suggested in #1147

Author: Cheng Hao <hao.cheng@intel.com>

Closes #1765 from chenghao-intel/hash_outer_join_fixing and squashes the following commits:

ab1f9e0 [Cheng Hao] Reduce the memory copy while building the hashmap

(cherry picked from commit 5d54d71)
Signed-off-by: Michael Armbrust <michael@databricks.com>
  • Loading branch information
chenghao-intel authored and marmbrus committed Aug 12, 2014
1 parent 779d1eb commit f66f260
Showing 1 changed file with 28 additions and 26 deletions.
54 changes: 28 additions & 26 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution

import java.util.{HashMap => JavaHashMap}

import scala.collection.mutable.{ArrayBuffer, BitSet}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
Expand Down Expand Up @@ -136,14 +138,6 @@ trait HashJoin {
}
}

/**
* Constant Value for Binary Join Node
*/
object HashOuterJoin {
val DUMMY_LIST = Seq[Row](null)
val EMPTY_LIST = Seq[Row]()
}

/**
* :: DeveloperApi ::
* Performs a hash based outer join for two child relations by shuffling the data using
Expand Down Expand Up @@ -181,6 +175,9 @@ case class HashOuterJoin(
}
}

@transient private[this] lazy val DUMMY_LIST = Seq[Row](null)
@transient private[this] lazy val EMPTY_LIST = Seq.empty[Row]

// TODO we need to rewrite all of the iterators with our own implementation instead of the Scala
// iterator for performance purpose.

Expand All @@ -199,8 +196,8 @@ case class HashOuterJoin(
joinedRow.copy
} else {
Nil
}) ++ HashOuterJoin.DUMMY_LIST.filter(_ => !matched).map( _ => {
// HashOuterJoin.DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
}) ++ DUMMY_LIST.filter(_ => !matched).map( _ => {
// DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
// as we don't know whether we need to append it until finish iterating all of the
// records in right side.
// If we didn't get any proper row, then append a single row with empty right
Expand All @@ -224,8 +221,8 @@ case class HashOuterJoin(
joinedRow.copy
} else {
Nil
}) ++ HashOuterJoin.DUMMY_LIST.filter(_ => !matched).map( _ => {
// HashOuterJoin.DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
}) ++ DUMMY_LIST.filter(_ => !matched).map( _ => {
// DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
// as we don't know whether we need to append it until finish iterating all of the
// records in left side.
// If we didn't get any proper row, then append a single row with empty left.
Expand Down Expand Up @@ -259,10 +256,10 @@ case class HashOuterJoin(
rightMatchedSet.add(idx)
joinedRow.copy
}
} ++ HashOuterJoin.DUMMY_LIST.filter(_ => !matched).map( _ => {
} ++ DUMMY_LIST.filter(_ => !matched).map( _ => {
// 2. For those unmatched records in left, append additional records with empty right.

// HashOuterJoin.DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
// DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
// as we don't know whether we need to append it until finish iterating all
// of the records in right side.
// If we didn't get any proper row, then append a single row with empty right.
Expand All @@ -287,18 +284,22 @@ case class HashOuterJoin(
}

private[this] def buildHashTable(
iter: Iterator[Row], keyGenerator: Projection): Map[Row, ArrayBuffer[Row]] = {
// TODO: Use Spark's HashMap implementation.
val hashTable = scala.collection.mutable.Map[Row, ArrayBuffer[Row]]()
iter: Iterator[Row], keyGenerator: Projection): JavaHashMap[Row, ArrayBuffer[Row]] = {
val hashTable = new JavaHashMap[Row, ArrayBuffer[Row]]()
while (iter.hasNext) {
val currentRow = iter.next()
val rowKey = keyGenerator(currentRow)

val existingMatchList = hashTable.getOrElseUpdate(rowKey, {new ArrayBuffer[Row]()})
var existingMatchList = hashTable.get(rowKey)
if (existingMatchList == null) {
existingMatchList = new ArrayBuffer[Row]()
hashTable.put(rowKey, existingMatchList)
}

existingMatchList += currentRow.copy()
}
hashTable.toMap[Row, ArrayBuffer[Row]]

hashTable
}

def execute() = {
Expand All @@ -309,21 +310,22 @@ case class HashOuterJoin(
// Build HashMap for current partition in right relation
val rightHashTable = buildHashTable(rightIter, newProjection(rightKeys, right.output))

import scala.collection.JavaConversions._
val boundCondition =
condition.map(newPredicate(_, left.output ++ right.output)).getOrElse((row: Row) => true)
joinType match {
case LeftOuter => leftHashTable.keysIterator.flatMap { key =>
leftOuterIterator(key, leftHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST),
rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST))
leftOuterIterator(key, leftHashTable.getOrElse(key, EMPTY_LIST),
rightHashTable.getOrElse(key, EMPTY_LIST))
}
case RightOuter => rightHashTable.keysIterator.flatMap { key =>
rightOuterIterator(key, leftHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST),
rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST))
rightOuterIterator(key, leftHashTable.getOrElse(key, EMPTY_LIST),
rightHashTable.getOrElse(key, EMPTY_LIST))
}
case FullOuter => (leftHashTable.keySet ++ rightHashTable.keySet).iterator.flatMap { key =>
fullOuterIterator(key,
leftHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST),
rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST))
leftHashTable.getOrElse(key, EMPTY_LIST),
rightHashTable.getOrElse(key, EMPTY_LIST))
}
case x => throw new Exception(s"HashOuterJoin should not take $x as the JoinType")
}
Expand Down

0 comments on commit f66f260

Please sign in to comment.