Skip to content

Commit

Permalink
change style
Browse files Browse the repository at this point in the history
  • Loading branch information
wangxiaojing committed Dec 23, 2014
1 parent ff2e618 commit fbe4887
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ case class BroadcastLeftSemiJoinHash(
override def output = left.output

override def execute() = {

val buildIter= buildPlan.execute().map(_.copy()).collect().toIterator
val hashSet = new java.util.HashSet[Row]()
var currentRow: Row = null
Expand All @@ -59,7 +58,6 @@ case class BroadcastLeftSemiJoinHash(
val broadcastedRelation = sparkContext.broadcast(hashSet)

streamedPlan.execute().mapPartitions { streamIter =>

val joinKeys = streamSideKeyGenerator()
streamIter.filter(current => {
!joinKeys(current).anyNull && broadcastedRelation.value.contains(joinKeys.currentValue)
Expand Down
5 changes: 3 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,8 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
""".stripMargin),
(null, 10) :: Nil)
}
test("broadcasted left semi join operator selection") {

test("broadcasted left semi join operator selection") {
clearCache()
sql("CACHE TABLE testData")
val tmp = autoBroadcastJoinThreshold
Expand All @@ -403,7 +404,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
case (query, joinClass) => assertJoin(query, joinClass)
}

sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-$tmp""")
setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, tmp.toString)
sql("UNCACHE TABLE testData")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,69 +195,49 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
}

test("auto converts to broadcast left semi join, by size estimate of a relation") {
def mkTest(
before: () => Unit,
after: () => Unit,
query: String,
expectedAnswer: Seq[Any],
ct: ClassTag[_]) = {
before()

var rdd = sql(query)

// Assert src has a size smaller than the threshold.
val sizes = rdd.queryExecution.analyzed.collect {
case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes
}
assert(sizes.size === 2 && sizes(1) <= autoBroadcastJoinThreshold
&& sizes(0) <= autoBroadcastJoinThreshold,
s"query should contain two relations, each of which has size smaller than autoConvertSize")
val leftSemiJoinQuery =
"""SELECT * FROM src a
|left semi JOIN src b ON a.key=86 and a.key = b.key""".stripMargin
val answer = (86, "val_86") :: Nil

// Using `sparkPlan` because for relevant patterns in HashJoin to be
// matched, other strategies need to be applied.
var bhj = rdd.queryExecution.sparkPlan.collect {
case j: BroadcastLeftSemiJoinHash => j
}
assert(bhj.size === 1,
s"actual query plans do not contain broadcast join: ${rdd.queryExecution}")
var rdd = sql(leftSemiJoinQuery)

checkAnswer(rdd, expectedAnswer) // check correctness of output
// Assert src has a size smaller than the threshold.
val sizes = rdd.queryExecution.analyzed.collect {
case r if implicitly[ClassTag[MetastoreRelation]].runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes
}
assert(sizes.size === 2 && sizes(1) <= autoBroadcastJoinThreshold
&& sizes(0) <= autoBroadcastJoinThreshold,
s"query should contain two relations, each of which has size smaller than autoConvertSize")

// Using `sparkPlan` because for relevant patterns in HashJoin to be
// matched, other strategies need to be applied.
var bhj = rdd.queryExecution.sparkPlan.collect {
case j: BroadcastLeftSemiJoinHash => j
}
assert(bhj.size === 1,
s"actual query plans do not contain broadcast join: ${rdd.queryExecution}")

TestHive.settings.synchronized {
val tmp = autoBroadcastJoinThreshold
checkAnswer(rdd, answer) // check correctness of output

sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""")
rdd = sql(query)
bhj = rdd.queryExecution.sparkPlan.collect {
case j: BroadcastLeftSemiJoinHash => j
}
assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off")
TestHive.settings.synchronized {
val tmp = autoBroadcastJoinThreshold

val shj = rdd.queryExecution.sparkPlan.collect {
case j: LeftSemiJoinHash => j
}
assert(shj.size === 1,
"LeftSemiJoinHash should be planned when BroadcastHashJoin is turned off")
sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""")
rdd = sql(leftSemiJoinQuery)
bhj = rdd.queryExecution.sparkPlan.collect {
case j: BroadcastLeftSemiJoinHash => j
}
assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off")

sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=$tmp""")
val shj = rdd.queryExecution.sparkPlan.collect {
case j: LeftSemiJoinHash => j
}
assert(shj.size === 1,
"LeftSemiJoinHash should be planned when BroadcastHashJoin is turned off")

after()
sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=$tmp""")
}

/** Tests for MetastoreRelation */
val leftSemiJoinQuery =
"""SELECT * FROM src a
|left semi JOIN src b ON a.key=86 and a.key = b.key""".stripMargin
val Answer =(86, "val_86") ::Nil

mkTest(
() => (),
() => (),
leftSemiJoinQuery,
Answer,
implicitly[ClassTag[MetastoreRelation]]
)

}
}

0 comments on commit fbe4887

Please sign in to comment.