From 4e65ac97f0d7746a9d2769e0b02725745f264027 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 5 Aug 2015 18:55:14 -0700 Subject: [PATCH 01/10] Begin to refactor OuterJoinSuite. --- .../sql/execution/joins/OuterJoinSuite.scala | 160 +++++++++++++----- 1 file changed, 113 insertions(+), 47 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala index 2c27da596bc4f..25975c11b4084 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala @@ -17,14 +17,42 @@ package org.apache.spark.sql.execution.joins -import org.apache.spark.sql.Row +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.expressions.{Expression, LessThan} -import org.apache.spark.sql.catalyst.plans.{FullOuter, LeftOuter, RightOuter} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter} import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest} class OuterJoinSuite extends SparkPlanTest { + private def testOuterJoin( + testName: String, + leftRows: DataFrame, + rightRows: DataFrame, + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + joinType: JoinType, + condition: Option[Expression], + expectedAnswer: Seq[Product]): Unit = { + // Precondition: leftRows and rightRows should be sorted according to the join keys. + + test(s"$testName using ShuffledHashOuterJoin") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + ShuffledHashOuterJoin(leftKeys, rightKeys, joinType, condition, left, right), + expectedAnswer.map(Row.fromTuple), + sortAnswers = false) + } + + if (joinType != FullOuter) { + test(s"$testName using BroadcastHashOuterJoin") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + BroadcastHashOuterJoin(leftKeys, rightKeys, joinType, condition, left, right), + expectedAnswer.map(Row.fromTuple), + sortAnswers = false) + } + } + } + val left = Seq( (1, 2.0), (2, 1.0), @@ -41,49 +69,87 @@ class OuterJoinSuite extends SparkPlanTest { val rightKeys: List[Expression] = 'c :: Nil val condition = Some(LessThan('b, 'd)) - test("shuffled hash outer join") { - checkAnswer2(left, right, (left: SparkPlan, right: SparkPlan) => - ShuffledHashOuterJoin(leftKeys, rightKeys, LeftOuter, condition, left, right), - Seq( - (1, 2.0, null, null), - (2, 1.0, 2, 3.0), - (3, 3.0, null, null) - ).map(Row.fromTuple)) - - checkAnswer2(left, right, (left: SparkPlan, right: SparkPlan) => - ShuffledHashOuterJoin(leftKeys, rightKeys, RightOuter, condition, left, right), - Seq( - (2, 1.0, 2, 3.0), - (null, null, 3, 2.0), - (null, null, 4, 1.0) - ).map(Row.fromTuple)) - - checkAnswer2(left, right, (left: SparkPlan, right: SparkPlan) => - ShuffledHashOuterJoin(leftKeys, rightKeys, FullOuter, condition, left, right), - Seq( - (1, 2.0, null, null), - (2, 1.0, 2, 3.0), - (3, 3.0, null, null), - (null, null, 3, 2.0), - (null, null, 4, 1.0) - ).map(Row.fromTuple)) - } + // --- Basic outer joins ------------------------------------------------------------------------ - test("broadcast hash outer join") { - checkAnswer2(left, right, (left: SparkPlan, right: SparkPlan) => - BroadcastHashOuterJoin(leftKeys, rightKeys, LeftOuter, condition, left, right), - Seq( - (1, 2.0, null, null), - (2, 1.0, 2, 3.0), - (3, 3.0, null, null) - ).map(Row.fromTuple)) - - checkAnswer2(left, right, (left: SparkPlan, right: SparkPlan) => - BroadcastHashOuterJoin(leftKeys, rightKeys, RightOuter, condition, left, right), - Seq( - (2, 1.0, 2, 3.0), - (null, null, 3, 2.0), - (null, null, 4, 1.0) - ).map(Row.fromTuple)) - } + testOuterJoin( + "basic left outer join", + left, + right, + leftKeys, + rightKeys, + LeftOuter, + condition, + Seq( + (1, 2.0, null, null), + (2, 1.0, 2, 3.0), + (3, 3.0, null, null) + ) + ) + + testOuterJoin( + "basic right outer join", + left, + right, + leftKeys, + rightKeys, + RightOuter, + condition, + Seq( + (2, 1.0, 2, 3.0), + (null, null, 3, 2.0), + (null, null, 4, 1.0) + ) + ) + + testOuterJoin( + "basic full outer join", + left, + right, + leftKeys, + rightKeys, + FullOuter, + condition, + Seq( + (1, 2.0, null, null), + (2, 1.0, 2, 3.0), + (3, 3.0, null, null), + (null, null, 3, 2.0), + (null, null, 4, 1.0) + ) + ) + + // --- Both inputs empty ------------------------------------------------------------------------ + + testOuterJoin( + "left outer join with both inputs empty", + left.filter("false"), + right.filter("false"), + leftKeys, + rightKeys, + LeftOuter, + condition, + Seq.empty + ) + + testOuterJoin( + "right outer join with both inputs empty", + left.filter("false"), + right.filter("false"), + leftKeys, + rightKeys, + RightOuter, + condition, + Seq.empty + ) + + testOuterJoin( + "full outer join with both inputs empty", + left.filter("false"), + right.filter("false"), + leftKeys, + rightKeys, + FullOuter, + condition, + Seq.empty + ) } From 1119be035e8ffb13d6c134acb7e620c9f6984bf0 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 5 Aug 2015 19:39:55 -0700 Subject: [PATCH 02/10] Begin to add more tests for inner joins. --- .../sql/execution/joins/InnerJoinSuite.scala | 131 ++++++++++++++++++ 1 file changed, 131 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala new file mode 100644 index 0000000000000..445091887ee6e --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.logical.Join +import org.apache.spark.sql.{execution, Row, DataFrame} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.{Filter, joins, SparkPlan, SparkPlanTest} + + +class InnerJoinSuite extends SparkPlanTest { + + private val testData2 = Seq( + (1, 1), + (2, 1), + (2, 2), + (3, 1), + (3, 2) + ).toDF("a", "b") + + private def testInnerJoin( + testName: String, + leftRows: DataFrame, + rightRows: DataFrame, + condition: Expression, + expectedAnswer: Seq[Product]): Unit = { + // Precondition: leftRows and rightRows should be sorted according to the join keys. + + val join = Join(leftRows.logicalPlan, rightRows.logicalPlan, Inner, Some(condition)) + ExtractEquiJoinKeys.unapply(join).foreach { + case (joinType, leftKeys, rightKeys, boundCondition, leftChild, rightChild) => + + def makeBroadcastHashJoin(left: SparkPlan, right: SparkPlan, side: BuildSide) = { + val broadcastHashJoin = + execution.joins.BroadcastHashJoin(leftKeys, rightKeys, side, left, right) + boundCondition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) + } + + def makeShuffledHashJoin(left: SparkPlan, right: SparkPlan, side: BuildSide) = { + val shuffledHashJoin = + execution.joins.ShuffledHashJoin(leftKeys, rightKeys, side, left, right) + boundCondition.map(Filter(_, shuffledHashJoin)).getOrElse(shuffledHashJoin) + } + + def makeSortMergeJoin(left: SparkPlan, right: SparkPlan) = { + val sortMergeJoin = + execution.joins.SortMergeJoin(leftKeys, rightKeys, left, right) + boundCondition.map(Filter(_, sortMergeJoin)).getOrElse(sortMergeJoin) + } + + test(s"$testName using BroadcastHashJoin (build=left)") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + makeBroadcastHashJoin(left, right, joins.BuildLeft), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } + + test(s"$testName using BroadcastHashJoin (build=right)") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + makeBroadcastHashJoin(left, right, joins.BuildRight), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } + + test(s"$testName using ShuffledHashJoin (build=left)") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + makeShuffledHashJoin(left, right, joins.BuildLeft), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } + + test(s"$testName using ShuffledHashJoin (build=right)") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + makeShuffledHashJoin(left, right, joins.BuildRight), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } + + test(s"$testName using SortMergeJoin") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + makeSortMergeJoin(left, right), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } + } + + test(s"$testName using BroadcastNestedLoopJoin (build=left)") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + joins.BroadcastNestedLoopJoin(left, right, joins.BuildLeft, Inner, Some(condition)), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } + + test(s"$testName using BroadcastNestedLoopJoin (build=right)") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + joins.BroadcastNestedLoopJoin(left, right, joins.BuildRight, Inner, Some(condition)), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } + } + + { + val left = testData2.where("a = 1") + val right = testData2.where("a = 2") + testInnerJoin( + "no matches", + left, + right, + (left.col("a") === right.col("a")).expr, + Seq.empty + ) + } + +} From 9039f5571899c858dffcbf1bdf8370f28a8c4c88 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 5 Aug 2015 19:56:48 -0700 Subject: [PATCH 03/10] Also test BroadcastNestedLoopJoin in OuterJoinSuite --- .../sql/execution/joins/InnerJoinSuite.scala | 1 - .../sql/execution/joins/OuterJoinSuite.scala | 68 +++++++++++-------- 2 files changed, 38 insertions(+), 31 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala index 445091887ee6e..609ab7e18b59f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala @@ -41,7 +41,6 @@ class InnerJoinSuite extends SparkPlanTest { rightRows: DataFrame, condition: Expression, expectedAnswer: Seq[Product]): Unit = { - // Precondition: leftRows and rightRows should be sorted according to the join keys. val join = Join(leftRows.logicalPlan, rightRows.logicalPlan, Inner, Some(condition)) ExtractEquiJoinKeys.unapply(join).foreach { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala index 25975c11b4084..979d5dce188e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.execution.joins +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter} -import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.execution.{joins, SparkPlan, SparkPlanTest} class OuterJoinSuite extends SparkPlanTest { @@ -29,27 +31,43 @@ class OuterJoinSuite extends SparkPlanTest { testName: String, leftRows: DataFrame, rightRows: DataFrame, - leftKeys: Seq[Expression], - rightKeys: Seq[Expression], joinType: JoinType, - condition: Option[Expression], + condition: Expression, expectedAnswer: Seq[Product]): Unit = { // Precondition: leftRows and rightRows should be sorted according to the join keys. - test(s"$testName using ShuffledHashOuterJoin") { + val join = Join(leftRows.logicalPlan, rightRows.logicalPlan, Inner, Some(condition)) + ExtractEquiJoinKeys.unapply(join).foreach { + case (_, leftKeys, rightKeys, boundCondition, leftChild, rightChild) => + test(s"$testName using ShuffledHashOuterJoin") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + ShuffledHashOuterJoin(leftKeys, rightKeys, joinType, boundCondition, left, right), + expectedAnswer.map(Row.fromTuple), + sortAnswers = false) + } + + if (joinType != FullOuter) { + test(s"$testName using BroadcastHashOuterJoin") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + BroadcastHashOuterJoin(leftKeys, rightKeys, joinType, boundCondition, left, right), + expectedAnswer.map(Row.fromTuple), + sortAnswers = false) + } + } + } + + test(s"$testName using BroadcastNestedLoopJoin (build=left)") { checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - ShuffledHashOuterJoin(leftKeys, rightKeys, joinType, condition, left, right), + joins.BroadcastNestedLoopJoin(left, right, joins.BuildLeft, joinType, Some(condition)), expectedAnswer.map(Row.fromTuple), - sortAnswers = false) + sortAnswers = true) } - if (joinType != FullOuter) { - test(s"$testName using BroadcastHashOuterJoin") { - checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - BroadcastHashOuterJoin(leftKeys, rightKeys, joinType, condition, left, right), - expectedAnswer.map(Row.fromTuple), - sortAnswers = false) - } + test(s"$testName using BroadcastNestedLoopJoin (build=right)") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + joins.BroadcastNestedLoopJoin(left, right, joins.BuildRight, joinType, Some(condition)), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) } } @@ -65,9 +83,11 @@ class OuterJoinSuite extends SparkPlanTest { (4, 1.0) ).toDF("c", "d") - val leftKeys: List[Expression] = 'a :: Nil - val rightKeys: List[Expression] = 'c :: Nil - val condition = Some(LessThan('b, 'd)) + val condition = { + And( + (left.col("a") === right.col("c")).expr, + LessThan(left.col("b").expr, right.col("d").expr)) + } // --- Basic outer joins ------------------------------------------------------------------------ @@ -75,8 +95,6 @@ class OuterJoinSuite extends SparkPlanTest { "basic left outer join", left, right, - leftKeys, - rightKeys, LeftOuter, condition, Seq( @@ -90,8 +108,6 @@ class OuterJoinSuite extends SparkPlanTest { "basic right outer join", left, right, - leftKeys, - rightKeys, RightOuter, condition, Seq( @@ -105,8 +121,6 @@ class OuterJoinSuite extends SparkPlanTest { "basic full outer join", left, right, - leftKeys, - rightKeys, FullOuter, condition, Seq( @@ -124,8 +138,6 @@ class OuterJoinSuite extends SparkPlanTest { "left outer join with both inputs empty", left.filter("false"), right.filter("false"), - leftKeys, - rightKeys, LeftOuter, condition, Seq.empty @@ -135,8 +147,6 @@ class OuterJoinSuite extends SparkPlanTest { "right outer join with both inputs empty", left.filter("false"), right.filter("false"), - leftKeys, - rightKeys, RightOuter, condition, Seq.empty @@ -146,8 +156,6 @@ class OuterJoinSuite extends SparkPlanTest { "full outer join with both inputs empty", left.filter("false"), right.filter("false"), - leftKeys, - rightKeys, FullOuter, condition, Seq.empty From 6d9a0e461e2efeb32025bf3ce23b742ab99bffdf Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 5 Aug 2015 20:16:35 -0700 Subject: [PATCH 04/10] Also refactor semi join tests --- .../sql/execution/joins/OuterJoinSuite.scala | 1 - .../sql/execution/joins/SemiJoinSuite.scala | 87 +++++++++++-------- 2 files changed, 53 insertions(+), 35 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala index 979d5dce188e4..325209fd99611 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.execution.{joins, SparkPlan, SparkPlanTest} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala index 927e85a7db3dc..675a30773db18 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala @@ -17,13 +17,48 @@ package org.apache.spark.sql.execution.joins -import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.expressions.{LessThan, Expression} +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.logical.Join +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.catalyst.expressions.{And, LessThan, Expression} import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest} +class SemiJoinSuite extends SparkPlanTest { + + private def testLeftSemiJoin( + testName: String, + leftRows: DataFrame, + rightRows: DataFrame, + condition: Expression, + expectedAnswer: Seq[Product]): Unit = { + + val join = Join(leftRows.logicalPlan, rightRows.logicalPlan, Inner, Some(condition)) + ExtractEquiJoinKeys.unapply(join).foreach { + case (joinType, leftKeys, rightKeys, boundCondition, leftChild, rightChild) => + test(s"$testName using LeftSemiJoinHash") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + LeftSemiJoinHash(leftKeys, rightKeys, left, right, boundCondition), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } + + test(s"$testName using BroadcastLeftSemiJoinHash") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + BroadcastLeftSemiJoinHash(leftKeys, rightKeys, left, right, boundCondition), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } + } + + test(s"$testName using LeftSemiJoinBNL") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + LeftSemiJoinBNL(left, right, Some(condition)), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } + } -class SemiJoinSuite extends SparkPlanTest{ val left = Seq( (1, 2.0), (1, 2.0), @@ -39,36 +74,20 @@ class SemiJoinSuite extends SparkPlanTest{ (4, 1.0) ).toDF("c", "d") - val leftKeys: List[Expression] = 'a :: Nil - val rightKeys: List[Expression] = 'c :: Nil - val condition = Some(LessThan('b, 'd)) - - test("left semi join hash") { - checkAnswer2(left, right, (left: SparkPlan, right: SparkPlan) => - LeftSemiJoinHash(leftKeys, rightKeys, left, right, condition), - Seq( - (2, 1.0), - (2, 1.0) - ).map(Row.fromTuple)) - } - - test("left semi join BNL") { - checkAnswer2(left, right, (left: SparkPlan, right: SparkPlan) => - LeftSemiJoinBNL(left, right, condition), - Seq( - (1, 2.0), - (1, 2.0), - (2, 1.0), - (2, 1.0) - ).map(Row.fromTuple)) + val condition = { + And( + (left.col("a") === right.col("c")).expr, + LessThan(left.col("b").expr, right.col("d").expr)) } - test("broadcast left semi join hash") { - checkAnswer2(left, right, (left: SparkPlan, right: SparkPlan) => - BroadcastLeftSemiJoinHash(leftKeys, rightKeys, left, right, condition), - Seq( - (2, 1.0), - (2, 1.0) - ).map(Row.fromTuple)) - } + testLeftSemiJoin( + "basic test", + left, + right, + condition, + Seq( + (2, 1.0), + (2, 1.0) + ) + ) } From 4bf6407f32f5be5220e387aea16c8a7974b498a0 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 5 Aug 2015 20:22:33 -0700 Subject: [PATCH 05/10] BroadcastNestedLoopJoin cannot be used for inner joins --- .../execution/joins/BroadcastNestedLoopJoin.scala | 5 +++-- .../sql/execution/joins/InnerJoinSuite.scala | 15 --------------- 2 files changed, 3 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala index 23aebf4b068b4..017a44b9ca863 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala @@ -65,8 +65,9 @@ case class BroadcastNestedLoopJoin( left.output.map(_.withNullability(true)) ++ right.output case FullOuter => left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) - case _ => - left.output ++ right.output + case x => + throw new IllegalArgumentException( + s"BroadcastNestedLoopJoin should not take $x as the JoinType") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala index 609ab7e18b59f..79a4981148bbb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala @@ -24,7 +24,6 @@ import org.apache.spark.sql.{execution, Row, DataFrame} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.{Filter, joins, SparkPlan, SparkPlanTest} - class InnerJoinSuite extends SparkPlanTest { private val testData2 = Seq( @@ -99,20 +98,6 @@ class InnerJoinSuite extends SparkPlanTest { sortAnswers = true) } } - - test(s"$testName using BroadcastNestedLoopJoin (build=left)") { - checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - joins.BroadcastNestedLoopJoin(left, right, joins.BuildLeft, Inner, Some(condition)), - expectedAnswer.map(Row.fromTuple), - sortAnswers = true) - } - - test(s"$testName using BroadcastNestedLoopJoin (build=right)") { - checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - joins.BroadcastNestedLoopJoin(left, right, joins.BuildRight, Inner, Some(condition)), - expectedAnswer.map(Row.fromTuple), - sortAnswers = true) - } } { From 61edd2355794dc26e3f1421273962adfa2c76ba2 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 5 Aug 2015 22:22:29 -0700 Subject: [PATCH 06/10] Move more inner join tests to new framework --- .../sql/execution/joins/InnerJoinSuite.scala | 67 ++++++++++++++++--- 1 file changed, 58 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala index 79a4981148bbb..f0c627fb4a613 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala @@ -26,14 +26,6 @@ import org.apache.spark.sql.execution.{Filter, joins, SparkPlan, SparkPlanTest} class InnerJoinSuite extends SparkPlanTest { - private val testData2 = Seq( - (1, 1), - (2, 1), - (2, 2), - (3, 1), - (3, 2) - ).toDF("a", "b") - private def testInnerJoin( testName: String, leftRows: DataFrame, @@ -100,11 +92,68 @@ class InnerJoinSuite extends SparkPlanTest { } } + { + val upperCaseData = Seq( + (1, "A"), + (2, "B"), + (3, "C"), + (4, "D"), + (5, "E"), + (6, "F") + ).toDF("N", "L") + + val lowerCaseData = Seq( + (1, "a"), + (2, "b"), + (3, "c"), + (4, "d") + ).toDF("n", "l") + + testInnerJoin( + "inner join, one match per row", + upperCaseData, + lowerCaseData, + (upperCaseData.col("N") === lowerCaseData.col("n")).expr, + Seq( + (1, "A", 1, "a"), + (2, "B", 2, "b"), + (3, "C", 3, "c"), + (4, "D", 4, "d") + ) + ) + } + + private val testData2 = Seq( + (1, 1), + (1, 2), + (2, 1), + (2, 2), + (3, 1), + (3, 2) + ).toDF("a", "b") + + { + val left = testData2.where("a = 1") + val right = testData2.where("a = 1") + testInnerJoin( + "inner join, multiple matches", + left, + right, + (left.col("a") === right.col("a")).expr, + Seq( + (1, 1, 1, 1), + (1, 1, 1, 2), + (1, 2, 1, 1), + (1, 2, 1, 2) + ) + ) + } + { val left = testData2.where("a = 1") val right = testData2.where("a = 2") testInnerJoin( - "no matches", + "inner join, no matches", left, right, (left.col("a") === right.col("a")).expr, From c17406059a894e099a91f0adb6fce853efeef7af Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 5 Aug 2015 22:36:17 -0700 Subject: [PATCH 07/10] Use EnsureRequirements on the non-broadcast joins. --- .../spark/sql/execution/joins/InnerJoinSuite.scala | 10 ++++++---- .../spark/sql/execution/joins/OuterJoinSuite.scala | 7 +++---- .../spark/sql/execution/joins/SemiJoinSuite.scala | 5 +++-- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala index f0c627fb4a613..a96a9094ee733 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.{execution, Row, DataFrame} import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.execution.{Filter, joins, SparkPlan, SparkPlanTest} +import org.apache.spark.sql.execution._ class InnerJoinSuite extends SparkPlanTest { @@ -32,7 +32,6 @@ class InnerJoinSuite extends SparkPlanTest { rightRows: DataFrame, condition: Expression, expectedAnswer: Seq[Product]): Unit = { - val join = Join(leftRows.logicalPlan, rightRows.logicalPlan, Inner, Some(condition)) ExtractEquiJoinKeys.unapply(join).foreach { case (joinType, leftKeys, rightKeys, boundCondition, leftChild, rightChild) => @@ -46,13 +45,16 @@ class InnerJoinSuite extends SparkPlanTest { def makeShuffledHashJoin(left: SparkPlan, right: SparkPlan, side: BuildSide) = { val shuffledHashJoin = execution.joins.ShuffledHashJoin(leftKeys, rightKeys, side, left, right) - boundCondition.map(Filter(_, shuffledHashJoin)).getOrElse(shuffledHashJoin) + val filteredJoin = + boundCondition.map(Filter(_, shuffledHashJoin)).getOrElse(shuffledHashJoin) + EnsureRequirements(filteredJoin.sqlContext).apply(filteredJoin) } def makeSortMergeJoin(left: SparkPlan, right: SparkPlan) = { val sortMergeJoin = execution.joins.SortMergeJoin(leftKeys, rightKeys, left, right) - boundCondition.map(Filter(_, sortMergeJoin)).getOrElse(sortMergeJoin) + val filteredJoin = boundCondition.map(Filter(_, sortMergeJoin)).getOrElse(sortMergeJoin) + EnsureRequirements(filteredJoin.sqlContext).apply(filteredJoin) } test(s"$testName using BroadcastHashJoin (build=left)") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala index 325209fd99611..c6b4fb35e658a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.execution.{joins, SparkPlan, SparkPlanTest} +import org.apache.spark.sql.execution.{EnsureRequirements, joins, SparkPlan, SparkPlanTest} class OuterJoinSuite extends SparkPlanTest { @@ -33,14 +33,13 @@ class OuterJoinSuite extends SparkPlanTest { joinType: JoinType, condition: Expression, expectedAnswer: Seq[Product]): Unit = { - // Precondition: leftRows and rightRows should be sorted according to the join keys. - val join = Join(leftRows.logicalPlan, rightRows.logicalPlan, Inner, Some(condition)) ExtractEquiJoinKeys.unapply(join).foreach { case (_, leftKeys, rightKeys, boundCondition, leftChild, rightChild) => test(s"$testName using ShuffledHashOuterJoin") { checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - ShuffledHashOuterJoin(leftKeys, rightKeys, joinType, boundCondition, left, right), + EnsureRequirements(left.sqlContext).apply( + ShuffledHashOuterJoin(leftKeys, rightKeys, joinType, boundCondition, left, right)), expectedAnswer.map(Row.fromTuple), sortAnswers = false) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala index 675a30773db18..93704184d2be0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.{And, LessThan, Expression} -import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest} +import org.apache.spark.sql.execution.{EnsureRequirements, SparkPlan, SparkPlanTest} class SemiJoinSuite extends SparkPlanTest { @@ -38,7 +38,8 @@ class SemiJoinSuite extends SparkPlanTest { case (joinType, leftKeys, rightKeys, boundCondition, leftChild, rightChild) => test(s"$testName using LeftSemiJoinHash") { checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - LeftSemiJoinHash(leftKeys, rightKeys, left, right, boundCondition), + EnsureRequirements(left.sqlContext).apply( + LeftSemiJoinHash(leftKeys, rightKeys, left, right, boundCondition)), expectedAnswer.map(Row.fromTuple), sortAnswers = true) } From ec655854f11b0d33f9e542174e8ca402b57f16ac Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 10 Aug 2015 14:21:48 -0700 Subject: [PATCH 08/10] Test with entirely-null join keys. --- .../sql/execution/joins/OuterJoinSuite.scala | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala index c6b4fb35e658a..30358608602f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys import org.apache.spark.sql.catalyst.plans.logical.Join +import org.apache.spark.sql.types.{IntegerType, DoubleType, StructType} import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ @@ -69,17 +70,19 @@ class OuterJoinSuite extends SparkPlanTest { } } - val left = Seq( - (1, 2.0), - (2, 1.0), - (3, 3.0) - ).toDF("a", "b") + val left = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(Seq( + Row(1, 2.0), + Row(2, 1.0), + Row(3, 3.0), + Row(null, null) + )), new StructType().add("a", IntegerType).add("b", DoubleType)) - val right = Seq( - (2, 3.0), - (3, 2.0), - (4, 1.0) - ).toDF("c", "d") + val right = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(Seq( + Row(2, 3.0), + Row(3, 2.0), + Row(4, 1.0), + Row(null, null) + )), new StructType().add("c", IntegerType).add("d", DoubleType)) val condition = { And( @@ -98,7 +101,8 @@ class OuterJoinSuite extends SparkPlanTest { Seq( (1, 2.0, null, null), (2, 1.0, 2, 3.0), - (3, 3.0, null, null) + (3, 3.0, null, null), + (null, null, null, null) ) ) @@ -111,7 +115,8 @@ class OuterJoinSuite extends SparkPlanTest { Seq( (2, 1.0, 2, 3.0), (null, null, 3, 2.0), - (null, null, 4, 1.0) + (null, null, 4, 1.0), + (null, null, null, null) ) ) @@ -126,7 +131,9 @@ class OuterJoinSuite extends SparkPlanTest { (2, 1.0, 2, 3.0), (3, 3.0, null, null), (null, null, 3, 2.0), - (null, null, 4, 1.0) + (null, null, 4, 1.0), + (null, null, null, null), + (null, null, null, null) ) ) From 2a9165ef5fadee26b56ea5a6fe606269e0416ca0 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 10 Aug 2015 14:23:38 -0700 Subject: [PATCH 09/10] Fix Windows-style line endings. --- .../sql/execution/joins/OuterJoinSuite.scala | 336 +++++++++--------- 1 file changed, 168 insertions(+), 168 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala index 30358608602f8..04666fe8c143d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala @@ -1,168 +1,168 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.joins - -import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys -import org.apache.spark.sql.catalyst.plans.logical.Join -import org.apache.spark.sql.types.{IntegerType, DoubleType, StructType} -import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.execution.{EnsureRequirements, joins, SparkPlan, SparkPlanTest} - -class OuterJoinSuite extends SparkPlanTest { - - private def testOuterJoin( - testName: String, - leftRows: DataFrame, - rightRows: DataFrame, - joinType: JoinType, - condition: Expression, - expectedAnswer: Seq[Product]): Unit = { - val join = Join(leftRows.logicalPlan, rightRows.logicalPlan, Inner, Some(condition)) - ExtractEquiJoinKeys.unapply(join).foreach { - case (_, leftKeys, rightKeys, boundCondition, leftChild, rightChild) => - test(s"$testName using ShuffledHashOuterJoin") { - checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - EnsureRequirements(left.sqlContext).apply( - ShuffledHashOuterJoin(leftKeys, rightKeys, joinType, boundCondition, left, right)), - expectedAnswer.map(Row.fromTuple), - sortAnswers = false) - } - - if (joinType != FullOuter) { - test(s"$testName using BroadcastHashOuterJoin") { - checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - BroadcastHashOuterJoin(leftKeys, rightKeys, joinType, boundCondition, left, right), - expectedAnswer.map(Row.fromTuple), - sortAnswers = false) - } - } - } - - test(s"$testName using BroadcastNestedLoopJoin (build=left)") { - checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - joins.BroadcastNestedLoopJoin(left, right, joins.BuildLeft, joinType, Some(condition)), - expectedAnswer.map(Row.fromTuple), - sortAnswers = true) - } - - test(s"$testName using BroadcastNestedLoopJoin (build=right)") { - checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - joins.BroadcastNestedLoopJoin(left, right, joins.BuildRight, joinType, Some(condition)), - expectedAnswer.map(Row.fromTuple), - sortAnswers = true) - } - } - - val left = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(Seq( - Row(1, 2.0), - Row(2, 1.0), - Row(3, 3.0), - Row(null, null) - )), new StructType().add("a", IntegerType).add("b", DoubleType)) - - val right = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(Seq( - Row(2, 3.0), - Row(3, 2.0), - Row(4, 1.0), - Row(null, null) - )), new StructType().add("c", IntegerType).add("d", DoubleType)) - - val condition = { - And( - (left.col("a") === right.col("c")).expr, - LessThan(left.col("b").expr, right.col("d").expr)) - } - - // --- Basic outer joins ------------------------------------------------------------------------ - - testOuterJoin( - "basic left outer join", - left, - right, - LeftOuter, - condition, - Seq( - (1, 2.0, null, null), - (2, 1.0, 2, 3.0), - (3, 3.0, null, null), - (null, null, null, null) - ) - ) - - testOuterJoin( - "basic right outer join", - left, - right, - RightOuter, - condition, - Seq( - (2, 1.0, 2, 3.0), - (null, null, 3, 2.0), - (null, null, 4, 1.0), - (null, null, null, null) - ) - ) - - testOuterJoin( - "basic full outer join", - left, - right, - FullOuter, - condition, - Seq( - (1, 2.0, null, null), - (2, 1.0, 2, 3.0), - (3, 3.0, null, null), - (null, null, 3, 2.0), - (null, null, 4, 1.0), - (null, null, null, null), - (null, null, null, null) - ) - ) - - // --- Both inputs empty ------------------------------------------------------------------------ - - testOuterJoin( - "left outer join with both inputs empty", - left.filter("false"), - right.filter("false"), - LeftOuter, - condition, - Seq.empty - ) - - testOuterJoin( - "right outer join with both inputs empty", - left.filter("false"), - right.filter("false"), - RightOuter, - condition, - Seq.empty - ) - - testOuterJoin( - "full outer join with both inputs empty", - left.filter("false"), - right.filter("false"), - FullOuter, - condition, - Seq.empty - ) -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans.logical.Join +import org.apache.spark.sql.types.{IntegerType, DoubleType, StructType} +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.execution.{EnsureRequirements, joins, SparkPlan, SparkPlanTest} + +class OuterJoinSuite extends SparkPlanTest { + + private def testOuterJoin( + testName: String, + leftRows: DataFrame, + rightRows: DataFrame, + joinType: JoinType, + condition: Expression, + expectedAnswer: Seq[Product]): Unit = { + val join = Join(leftRows.logicalPlan, rightRows.logicalPlan, Inner, Some(condition)) + ExtractEquiJoinKeys.unapply(join).foreach { + case (_, leftKeys, rightKeys, boundCondition, leftChild, rightChild) => + test(s"$testName using ShuffledHashOuterJoin") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + EnsureRequirements(left.sqlContext).apply( + ShuffledHashOuterJoin(leftKeys, rightKeys, joinType, boundCondition, left, right)), + expectedAnswer.map(Row.fromTuple), + sortAnswers = false) + } + + if (joinType != FullOuter) { + test(s"$testName using BroadcastHashOuterJoin") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + BroadcastHashOuterJoin(leftKeys, rightKeys, joinType, boundCondition, left, right), + expectedAnswer.map(Row.fromTuple), + sortAnswers = false) + } + } + } + + test(s"$testName using BroadcastNestedLoopJoin (build=left)") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + joins.BroadcastNestedLoopJoin(left, right, joins.BuildLeft, joinType, Some(condition)), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } + + test(s"$testName using BroadcastNestedLoopJoin (build=right)") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + joins.BroadcastNestedLoopJoin(left, right, joins.BuildRight, joinType, Some(condition)), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } + } + + val left = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(Seq( + Row(1, 2.0), + Row(2, 1.0), + Row(3, 3.0), + Row(null, null) + )), new StructType().add("a", IntegerType).add("b", DoubleType)) + + val right = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(Seq( + Row(2, 3.0), + Row(3, 2.0), + Row(4, 1.0), + Row(null, null) + )), new StructType().add("c", IntegerType).add("d", DoubleType)) + + val condition = { + And( + (left.col("a") === right.col("c")).expr, + LessThan(left.col("b").expr, right.col("d").expr)) + } + + // --- Basic outer joins ------------------------------------------------------------------------ + + testOuterJoin( + "basic left outer join", + left, + right, + LeftOuter, + condition, + Seq( + (1, 2.0, null, null), + (2, 1.0, 2, 3.0), + (3, 3.0, null, null), + (null, null, null, null) + ) + ) + + testOuterJoin( + "basic right outer join", + left, + right, + RightOuter, + condition, + Seq( + (2, 1.0, 2, 3.0), + (null, null, 3, 2.0), + (null, null, 4, 1.0), + (null, null, null, null) + ) + ) + + testOuterJoin( + "basic full outer join", + left, + right, + FullOuter, + condition, + Seq( + (1, 2.0, null, null), + (2, 1.0, 2, 3.0), + (3, 3.0, null, null), + (null, null, 3, 2.0), + (null, null, 4, 1.0), + (null, null, null, null), + (null, null, null, null) + ) + ) + + // --- Both inputs empty ------------------------------------------------------------------------ + + testOuterJoin( + "left outer join with both inputs empty", + left.filter("false"), + right.filter("false"), + LeftOuter, + condition, + Seq.empty + ) + + testOuterJoin( + "right outer join with both inputs empty", + left.filter("false"), + right.filter("false"), + RightOuter, + condition, + Seq.empty + ) + + testOuterJoin( + "full outer join with both inputs empty", + left.filter("false"), + right.filter("false"), + FullOuter, + condition, + Seq.empty + ) +} From c700df8c95aacf793c932ccca2ea4465f5621289 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 10 Aug 2015 14:48:03 -0700 Subject: [PATCH 10/10] Test with a single partition in all operator join unit tests. --- .../sql/execution/joins/InnerJoinSuite.scala | 158 +++++++++--------- .../sql/execution/joins/OuterJoinSuite.scala | 65 +++---- .../sql/execution/joins/SemiJoinSuite.scala | 85 +++++----- .../apache/spark/sql/test/SQLTestUtils.scala | 2 +- 4 files changed, 164 insertions(+), 146 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala index a96a9094ee733..b9d5f0ef8fa46 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala @@ -20,11 +20,13 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical.Join -import org.apache.spark.sql.{execution, Row, DataFrame} +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} +import org.apache.spark.sql.{SQLConf, execution, Row, DataFrame} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution._ -class InnerJoinSuite extends SparkPlanTest { +class InnerJoinSuite extends SparkPlanTest with SQLTestUtils { private def testInnerJoin( testName: String, @@ -32,84 +34,88 @@ class InnerJoinSuite extends SparkPlanTest { rightRows: DataFrame, condition: Expression, expectedAnswer: Seq[Product]): Unit = { - val join = Join(leftRows.logicalPlan, rightRows.logicalPlan, Inner, Some(condition)) - ExtractEquiJoinKeys.unapply(join).foreach { - case (joinType, leftKeys, rightKeys, boundCondition, leftChild, rightChild) => - - def makeBroadcastHashJoin(left: SparkPlan, right: SparkPlan, side: BuildSide) = { - val broadcastHashJoin = - execution.joins.BroadcastHashJoin(leftKeys, rightKeys, side, left, right) - boundCondition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) - } - - def makeShuffledHashJoin(left: SparkPlan, right: SparkPlan, side: BuildSide) = { - val shuffledHashJoin = - execution.joins.ShuffledHashJoin(leftKeys, rightKeys, side, left, right) - val filteredJoin = - boundCondition.map(Filter(_, shuffledHashJoin)).getOrElse(shuffledHashJoin) - EnsureRequirements(filteredJoin.sqlContext).apply(filteredJoin) - } - - def makeSortMergeJoin(left: SparkPlan, right: SparkPlan) = { - val sortMergeJoin = - execution.joins.SortMergeJoin(leftKeys, rightKeys, left, right) - val filteredJoin = boundCondition.map(Filter(_, sortMergeJoin)).getOrElse(sortMergeJoin) - EnsureRequirements(filteredJoin.sqlContext).apply(filteredJoin) - } - - test(s"$testName using BroadcastHashJoin (build=left)") { - checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - makeBroadcastHashJoin(left, right, joins.BuildLeft), - expectedAnswer.map(Row.fromTuple), - sortAnswers = true) - } - - test(s"$testName using BroadcastHashJoin (build=right)") { - checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - makeBroadcastHashJoin(left, right, joins.BuildRight), - expectedAnswer.map(Row.fromTuple), - sortAnswers = true) - } - - test(s"$testName using ShuffledHashJoin (build=left)") { - checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - makeShuffledHashJoin(left, right, joins.BuildLeft), - expectedAnswer.map(Row.fromTuple), - sortAnswers = true) - } - - test(s"$testName using ShuffledHashJoin (build=right)") { - checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - makeShuffledHashJoin(left, right, joins.BuildRight), - expectedAnswer.map(Row.fromTuple), - sortAnswers = true) - } - - test(s"$testName using SortMergeJoin") { - checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - makeSortMergeJoin(left, right), - expectedAnswer.map(Row.fromTuple), - sortAnswers = true) - } + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { + val join = Join(leftRows.logicalPlan, rightRows.logicalPlan, Inner, Some(condition)) + ExtractEquiJoinKeys.unapply(join).foreach { + case (joinType, leftKeys, rightKeys, boundCondition, leftChild, rightChild) => + + def makeBroadcastHashJoin(left: SparkPlan, right: SparkPlan, side: BuildSide) = { + val broadcastHashJoin = + execution.joins.BroadcastHashJoin(leftKeys, rightKeys, side, left, right) + boundCondition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) + } + + def makeShuffledHashJoin(left: SparkPlan, right: SparkPlan, side: BuildSide) = { + val shuffledHashJoin = + execution.joins.ShuffledHashJoin(leftKeys, rightKeys, side, left, right) + val filteredJoin = + boundCondition.map(Filter(_, shuffledHashJoin)).getOrElse(shuffledHashJoin) + EnsureRequirements(sqlContext).apply(filteredJoin) + } + + def makeSortMergeJoin(left: SparkPlan, right: SparkPlan) = { + val sortMergeJoin = + execution.joins.SortMergeJoin(leftKeys, rightKeys, left, right) + val filteredJoin = boundCondition.map(Filter(_, sortMergeJoin)).getOrElse(sortMergeJoin) + EnsureRequirements(sqlContext).apply(filteredJoin) + } + + test(s"$testName using BroadcastHashJoin (build=left)") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + makeBroadcastHashJoin(left, right, joins.BuildLeft), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } + + test(s"$testName using BroadcastHashJoin (build=right)") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + makeBroadcastHashJoin(left, right, joins.BuildRight), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } + + test(s"$testName using ShuffledHashJoin (build=left)") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + makeShuffledHashJoin(left, right, joins.BuildLeft), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } + + test(s"$testName using ShuffledHashJoin (build=right)") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + makeShuffledHashJoin(left, right, joins.BuildRight), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } + + test(s"$testName using SortMergeJoin") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + makeSortMergeJoin(left, right), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } + } } } { - val upperCaseData = Seq( - (1, "A"), - (2, "B"), - (3, "C"), - (4, "D"), - (5, "E"), - (6, "F") - ).toDF("N", "L") - - val lowerCaseData = Seq( - (1, "a"), - (2, "b"), - (3, "c"), - (4, "d") - ).toDF("n", "l") + val upperCaseData = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(Seq( + Row(1, "A"), + Row(2, "B"), + Row(3, "C"), + Row(4, "D"), + Row(5, "E"), + Row(6, "F"), + Row(null, "G") + )), new StructType().add("N", IntegerType).add("L", StringType)) + + val lowerCaseData = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(Seq( + Row(1, "a"), + Row(2, "b"), + Row(3, "c"), + Row(4, "d"), + Row(null, "e") + )), new StructType().add("n", IntegerType).add("l", StringType)) testInnerJoin( "inner join, one match per row", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala index 04666fe8c143d..ac5f212f48073 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala @@ -19,13 +19,14 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys import org.apache.spark.sql.catalyst.plans.logical.Join +import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types.{IntegerType, DoubleType, StructType} -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.{SQLConf, DataFrame, Row} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.execution.{EnsureRequirements, joins, SparkPlan, SparkPlanTest} -class OuterJoinSuite extends SparkPlanTest { +class OuterJoinSuite extends SparkPlanTest with SQLTestUtils { private def testOuterJoin( testName: String, @@ -34,39 +35,41 @@ class OuterJoinSuite extends SparkPlanTest { joinType: JoinType, condition: Expression, expectedAnswer: Seq[Product]): Unit = { - val join = Join(leftRows.logicalPlan, rightRows.logicalPlan, Inner, Some(condition)) - ExtractEquiJoinKeys.unapply(join).foreach { - case (_, leftKeys, rightKeys, boundCondition, leftChild, rightChild) => - test(s"$testName using ShuffledHashOuterJoin") { - checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - EnsureRequirements(left.sqlContext).apply( - ShuffledHashOuterJoin(leftKeys, rightKeys, joinType, boundCondition, left, right)), - expectedAnswer.map(Row.fromTuple), - sortAnswers = false) - } - - if (joinType != FullOuter) { - test(s"$testName using BroadcastHashOuterJoin") { + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { + val join = Join(leftRows.logicalPlan, rightRows.logicalPlan, Inner, Some(condition)) + ExtractEquiJoinKeys.unapply(join).foreach { + case (_, leftKeys, rightKeys, boundCondition, leftChild, rightChild) => + test(s"$testName using ShuffledHashOuterJoin") { checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - BroadcastHashOuterJoin(leftKeys, rightKeys, joinType, boundCondition, left, right), + EnsureRequirements(sqlContext).apply( + ShuffledHashOuterJoin(leftKeys, rightKeys, joinType, boundCondition, left, right)), expectedAnswer.map(Row.fromTuple), - sortAnswers = false) + sortAnswers = true) } - } - } - - test(s"$testName using BroadcastNestedLoopJoin (build=left)") { - checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - joins.BroadcastNestedLoopJoin(left, right, joins.BuildLeft, joinType, Some(condition)), - expectedAnswer.map(Row.fromTuple), - sortAnswers = true) - } - test(s"$testName using BroadcastNestedLoopJoin (build=right)") { - checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - joins.BroadcastNestedLoopJoin(left, right, joins.BuildRight, joinType, Some(condition)), - expectedAnswer.map(Row.fromTuple), - sortAnswers = true) + if (joinType != FullOuter) { + test(s"$testName using BroadcastHashOuterJoin") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + BroadcastHashOuterJoin(leftKeys, rightKeys, joinType, boundCondition, left, right), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } + } + } + + test(s"$testName using BroadcastNestedLoopJoin (build=left)") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + joins.BroadcastNestedLoopJoin(left, right, joins.BuildLeft, joinType, Some(condition)), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } + + test(s"$testName using BroadcastNestedLoopJoin (build=right)") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + joins.BroadcastNestedLoopJoin(left, right, joins.BuildRight, joinType, Some(condition)), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala index 93704184d2be0..9a8a667d365a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala @@ -20,11 +20,13 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical.Join -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType} +import org.apache.spark.sql.{SQLConf, DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.{And, LessThan, Expression} import org.apache.spark.sql.execution.{EnsureRequirements, SparkPlan, SparkPlanTest} -class SemiJoinSuite extends SparkPlanTest { +class SemiJoinSuite extends SparkPlanTest with SQLTestUtils { private def testLeftSemiJoin( testName: String, @@ -32,48 +34,55 @@ class SemiJoinSuite extends SparkPlanTest { rightRows: DataFrame, condition: Expression, expectedAnswer: Seq[Product]): Unit = { + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { + val join = Join(leftRows.logicalPlan, rightRows.logicalPlan, Inner, Some(condition)) + ExtractEquiJoinKeys.unapply(join).foreach { + case (joinType, leftKeys, rightKeys, boundCondition, leftChild, rightChild) => + test(s"$testName using LeftSemiJoinHash") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + EnsureRequirements(left.sqlContext).apply( + LeftSemiJoinHash(leftKeys, rightKeys, left, right, boundCondition)), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } - val join = Join(leftRows.logicalPlan, rightRows.logicalPlan, Inner, Some(condition)) - ExtractEquiJoinKeys.unapply(join).foreach { - case (joinType, leftKeys, rightKeys, boundCondition, leftChild, rightChild) => - test(s"$testName using LeftSemiJoinHash") { - checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - EnsureRequirements(left.sqlContext).apply( - LeftSemiJoinHash(leftKeys, rightKeys, left, right, boundCondition)), - expectedAnswer.map(Row.fromTuple), - sortAnswers = true) - } + test(s"$testName using BroadcastLeftSemiJoinHash") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + BroadcastLeftSemiJoinHash(leftKeys, rightKeys, left, right, boundCondition), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } + } - test(s"$testName using BroadcastLeftSemiJoinHash") { - checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - BroadcastLeftSemiJoinHash(leftKeys, rightKeys, left, right, boundCondition), - expectedAnswer.map(Row.fromTuple), - sortAnswers = true) - } - } - - test(s"$testName using LeftSemiJoinBNL") { - checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - LeftSemiJoinBNL(left, right, Some(condition)), - expectedAnswer.map(Row.fromTuple), - sortAnswers = true) + test(s"$testName using LeftSemiJoinBNL") { + checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => + LeftSemiJoinBNL(left, right, Some(condition)), + expectedAnswer.map(Row.fromTuple), + sortAnswers = true) + } } } - val left = Seq( - (1, 2.0), - (1, 2.0), - (2, 1.0), - (2, 1.0), - (3, 3.0) - ).toDF("a", "b") + val left = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(Seq( + Row(1, 2.0), + Row(1, 2.0), + Row(2, 1.0), + Row(2, 1.0), + Row(3, 3.0), + Row(null, null), + Row(null, 5.0), + Row(6, null) + )), new StructType().add("a", IntegerType).add("b", DoubleType)) - val right = Seq( - (2, 3.0), - (2, 3.0), - (3, 2.0), - (4, 1.0) - ).toDF("c", "d") + val right = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(Seq( + Row(2, 3.0), + Row(2, 3.0), + Row(3, 2.0), + Row(4, 1.0), + Row(null, null), + Row(null, 5.0), + Row(6, null) + )), new StructType().add("c", IntegerType).add("d", DoubleType)) val condition = { And( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index 4c11acdab9ec0..1066695589778 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.SQLContext import org.apache.spark.util.Utils trait SQLTestUtils { this: SparkFunSuite => - def sqlContext: SQLContext + protected def sqlContext: SQLContext protected def configuration = sqlContext.sparkContext.hadoopConfiguration