From fbe4887c10939907d8e9d55e07e4f70d38d43d3d Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Mon, 1 Dec 2014 20:57:00 -0800 Subject: [PATCH] change style --- .../joins/BroadcastLeftSemiJoinHash.scala | 2 - .../org/apache/spark/sql/JoinSuite.scala | 5 +- .../spark/sql/hive/StatisticsSuite.scala | 88 +++++++------------ 3 files changed, 37 insertions(+), 58 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala index 9d88c8d71a14f..2ab064fd0151e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala @@ -39,7 +39,6 @@ case class BroadcastLeftSemiJoinHash( override def output = left.output override def execute() = { - val buildIter= buildPlan.execute().map(_.copy()).collect().toIterator val hashSet = new java.util.HashSet[Row]() var currentRow: Row = null @@ -59,7 +58,6 @@ case class BroadcastLeftSemiJoinHash( val broadcastedRelation = sparkContext.broadcast(hashSet) streamedPlan.execute().mapPartitions { streamIter => - val joinKeys = streamSideKeyGenerator() streamIter.filter(current => { !joinKeys(current).anyNull && broadcastedRelation.value.contains(joinKeys.currentValue) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 89f397150d92b..2efe7dd35573d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -383,7 +383,8 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { """.stripMargin), (null, 10) :: Nil) } - test("broadcasted left semi join operator selection") { + + test("broadcasted left semi join operator selection") { clearCache() sql("CACHE TABLE testData") val tmp = autoBroadcastJoinThreshold @@ -403,7 +404,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { case (query, joinClass) => assertJoin(query, joinClass) } - sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-$tmp""") + setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, tmp.toString) sql("UNCACHE TABLE testData") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index b337dd1fdd850..49560982665de 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -195,69 +195,49 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { } test("auto converts to broadcast left semi join, by size estimate of a relation") { - def mkTest( - before: () => Unit, - after: () => Unit, - query: String, - expectedAnswer: Seq[Any], - ct: ClassTag[_]) = { - before() - - var rdd = sql(query) - - // Assert src has a size smaller than the threshold. - val sizes = rdd.queryExecution.analyzed.collect { - case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes - } - assert(sizes.size === 2 && sizes(1) <= autoBroadcastJoinThreshold - && sizes(0) <= autoBroadcastJoinThreshold, - s"query should contain two relations, each of which has size smaller than autoConvertSize") + val leftSemiJoinQuery = + """SELECT * FROM src a + |left semi JOIN src b ON a.key=86 and a.key = b.key""".stripMargin + val answer = (86, "val_86") :: Nil - // Using `sparkPlan` because for relevant patterns in HashJoin to be - // matched, other strategies need to be applied. - var bhj = rdd.queryExecution.sparkPlan.collect { - case j: BroadcastLeftSemiJoinHash => j - } - assert(bhj.size === 1, - s"actual query plans do not contain broadcast join: ${rdd.queryExecution}") + var rdd = sql(leftSemiJoinQuery) - checkAnswer(rdd, expectedAnswer) // check correctness of output + // Assert src has a size smaller than the threshold. + val sizes = rdd.queryExecution.analyzed.collect { + case r if implicitly[ClassTag[MetastoreRelation]].runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes + } + assert(sizes.size === 2 && sizes(1) <= autoBroadcastJoinThreshold + && sizes(0) <= autoBroadcastJoinThreshold, + s"query should contain two relations, each of which has size smaller than autoConvertSize") + + // Using `sparkPlan` because for relevant patterns in HashJoin to be + // matched, other strategies need to be applied. + var bhj = rdd.queryExecution.sparkPlan.collect { + case j: BroadcastLeftSemiJoinHash => j + } + assert(bhj.size === 1, + s"actual query plans do not contain broadcast join: ${rdd.queryExecution}") - TestHive.settings.synchronized { - val tmp = autoBroadcastJoinThreshold + checkAnswer(rdd, answer) // check correctness of output - sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""") - rdd = sql(query) - bhj = rdd.queryExecution.sparkPlan.collect { - case j: BroadcastLeftSemiJoinHash => j - } - assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off") + TestHive.settings.synchronized { + val tmp = autoBroadcastJoinThreshold - val shj = rdd.queryExecution.sparkPlan.collect { - case j: LeftSemiJoinHash => j - } - assert(shj.size === 1, - "LeftSemiJoinHash should be planned when BroadcastHashJoin is turned off") + sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""") + rdd = sql(leftSemiJoinQuery) + bhj = rdd.queryExecution.sparkPlan.collect { + case j: BroadcastLeftSemiJoinHash => j + } + assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off") - sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=$tmp""") + val shj = rdd.queryExecution.sparkPlan.collect { + case j: LeftSemiJoinHash => j } + assert(shj.size === 1, + "LeftSemiJoinHash should be planned when BroadcastHashJoin is turned off") - after() + sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=$tmp""") } - /** Tests for MetastoreRelation */ - val leftSemiJoinQuery = - """SELECT * FROM src a - |left semi JOIN src b ON a.key=86 and a.key = b.key""".stripMargin - val Answer =(86, "val_86") ::Nil - - mkTest( - () => (), - () => (), - leftSemiJoinQuery, - Answer, - implicitly[ClassTag[MetastoreRelation]] - ) - } }