Skip to content

Commit

Permalink
Reduce the memory copy while building the hashmap
Browse files Browse the repository at this point in the history
  • Loading branch information
chenghao-intel committed Aug 4, 2014
1 parent 6ba6c3e commit ab1f9e0
Showing 1 changed file with 28 additions and 26 deletions.
54 changes: 28 additions & 26 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution

import java.util.{HashMap => JavaHashMap}

import scala.collection.mutable.{ArrayBuffer, BitSet}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
Expand Down Expand Up @@ -136,14 +138,6 @@ trait HashJoin {
}
}

/**
* Constant Value for Binary Join Node
*/
object HashOuterJoin {
val DUMMY_LIST = Seq[Row](null)
val EMPTY_LIST = Seq[Row]()
}

/**
* :: DeveloperApi ::
* Performs a hash based outer join for two child relations by shuffling the data using
Expand All @@ -170,6 +164,9 @@ case class HashOuterJoin(

def output = left.output ++ right.output

@transient private[this] lazy val DUMMY_LIST = Seq[Row](null)
@transient private[this] lazy val EMPTY_LIST = Seq.empty[Row]

// TODO we need to rewrite all of the iterators with our own implementation instead of the Scala
// iterator for performance purpose.

Expand All @@ -188,8 +185,8 @@ case class HashOuterJoin(
joinedRow.copy
} else {
Nil
}) ++ HashOuterJoin.DUMMY_LIST.filter(_ => !matched).map( _ => {
// HashOuterJoin.DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
}) ++ DUMMY_LIST.filter(_ => !matched).map( _ => {
// DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
// as we don't know whether we need to append it until finish iterating all of the
// records in right side.
// If we didn't get any proper row, then append a single row with empty right
Expand All @@ -213,8 +210,8 @@ case class HashOuterJoin(
joinedRow.copy
} else {
Nil
}) ++ HashOuterJoin.DUMMY_LIST.filter(_ => !matched).map( _ => {
// HashOuterJoin.DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
}) ++ DUMMY_LIST.filter(_ => !matched).map( _ => {
// DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
// as we don't know whether we need to append it until finish iterating all of the
// records in left side.
// If we didn't get any proper row, then append a single row with empty left.
Expand Down Expand Up @@ -248,10 +245,10 @@ case class HashOuterJoin(
rightMatchedSet.add(idx)
joinedRow.copy
}
} ++ HashOuterJoin.DUMMY_LIST.filter(_ => !matched).map( _ => {
} ++ DUMMY_LIST.filter(_ => !matched).map( _ => {
// 2. For those unmatched records in left, append additional records with empty right.

// HashOuterJoin.DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
// DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
// as we don't know whether we need to append it until finish iterating all
// of the records in right side.
// If we didn't get any proper row, then append a single row with empty right.
Expand All @@ -276,18 +273,22 @@ case class HashOuterJoin(
}

private[this] def buildHashTable(
iter: Iterator[Row], keyGenerator: Projection): Map[Row, ArrayBuffer[Row]] = {
// TODO: Use Spark's HashMap implementation.
val hashTable = scala.collection.mutable.Map[Row, ArrayBuffer[Row]]()
iter: Iterator[Row], keyGenerator: Projection): JavaHashMap[Row, ArrayBuffer[Row]] = {
val hashTable = new JavaHashMap[Row, ArrayBuffer[Row]]()
while (iter.hasNext) {
val currentRow = iter.next()
val rowKey = keyGenerator(currentRow)

val existingMatchList = hashTable.getOrElseUpdate(rowKey, {new ArrayBuffer[Row]()})
var existingMatchList = hashTable.get(rowKey)
if (existingMatchList == null) {
existingMatchList = new ArrayBuffer[Row]()
hashTable.put(rowKey, existingMatchList)
}

existingMatchList += currentRow.copy()
}
hashTable.toMap[Row, ArrayBuffer[Row]]

hashTable
}

def execute() = {
Expand All @@ -298,21 +299,22 @@ case class HashOuterJoin(
// Build HashMap for current partition in right relation
val rightHashTable = buildHashTable(rightIter, newProjection(rightKeys, right.output))

import scala.collection.JavaConversions._
val boundCondition =
condition.map(newPredicate(_, left.output ++ right.output)).getOrElse((row: Row) => true)
joinType match {
case LeftOuter => leftHashTable.keysIterator.flatMap { key =>
leftOuterIterator(key, leftHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST),
rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST))
leftOuterIterator(key, leftHashTable.getOrElse(key, EMPTY_LIST),
rightHashTable.getOrElse(key, EMPTY_LIST))
}
case RightOuter => rightHashTable.keysIterator.flatMap { key =>
rightOuterIterator(key, leftHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST),
rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST))
rightOuterIterator(key, leftHashTable.getOrElse(key, EMPTY_LIST),
rightHashTable.getOrElse(key, EMPTY_LIST))
}
case FullOuter => (leftHashTable.keySet ++ rightHashTable.keySet).iterator.flatMap { key =>
fullOuterIterator(key,
leftHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST),
rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST))
leftHashTable.getOrElse(key, EMPTY_LIST),
rightHashTable.getOrElse(key, EMPTY_LIST))
}
case x => throw new Exception(s"HashOuterJoin should not take $x as the JoinType")
}
Expand Down

0 comments on commit ab1f9e0

Please sign in to comment.