From 9c579bd7fee88b966ef221828526fea47703bd79 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 4 Sep 2018 14:43:14 +0800 Subject: [PATCH 01/24] Fix Python UDF accessing attibutes from both side of join in join conditions --- python/pyspark/sql/tests.py | 9 +++++++++ .../sql/catalyst/optimizer/Optimizer.scala | 18 +++++++++++++++--- .../optimizer/FilterPushdownSuite.scala | 2 +- .../python/BatchEvalPythonExecSuite.scala | 14 ++++++++++++++ 4 files changed, 39 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index b829baeca4775..54996d2a30b49 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -550,6 +550,15 @@ def test_udf_in_filter_on_top_of_join(self): right = self.spark.createDataFrame([Row(b=1)]) f = udf(lambda a, b: a == b, BooleanType()) df = left.crossJoin(right).filter(f("a", "b")) + + def test_udf_in_join_condition(self): + # regression test for SPARK-25314 + from pyspark.sql.functions import udf + left = self.spark.createDataFrame([Row(a=1)]) + right = self.spark.createDataFrame([Row(b=1)]) + f = udf(lambda a, b: a == b, BooleanType()) + df = left.crossJoin(right).filter(f("a", "b")) + self.assertEqual(df.collect(), [Row(a=1, b=1)]) self.assertEqual(df.collect(), [Row(a=1, b=1)]) def test_udf_without_arguments(self): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 7c461895c5e52..d5b5a8d168b1a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1234,9 +1234,21 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = + commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = + if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) Cross else joinType + + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty) { + Filter(others.reduceLeft(And), join) + } else { + join + } case RightOuter => // push down the left side only join filter for left side sub query val newLeft = leftJoinConditions. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 82a10254d846d..14498a984122a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -1153,7 +1153,7 @@ class FilterPushdownSuite extends PlanTest { "x.a".attr === Rand(10) && "y.b".attr === 5)) val correctAnswer = x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && "y.b".attr === 5), - condition = Some("x.a".attr === Rand(10))) + joinType = Cross).where("x.a".attr === Rand(10)) // CheckAnalysis will ensure nondeterministic expressions not appear in join condition. // TODO support nondeterministic expressions in join condition. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala index 289cc667a1c66..32567a99ce007 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.api.python.{PythonEvalType, PythonFunction} import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, GreaterThan, In} import org.apache.spark.sql.execution.{FilterExec, InputAdapter, SparkPlanTest, WholeStageCodegenExec} +import org.apache.spark.sql.functions.col import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.BooleanType @@ -31,6 +32,8 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { import testImplicits.newProductEncoder import testImplicits.localSeqToDatasetHolder + val dummyPythonUDF = new MyDummyPythonUDF + override def beforeAll(): Unit = { super.beforeAll() spark.udf.registerPython("dummyPythonUDF", new MyDummyPythonUDF) @@ -100,6 +103,17 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { } assert(qualifiedPlanNodes.size == 1) } + + test("Python UDF refers to the attributes from more than one child in join condition") { + val df = Seq(("Hello", 4)).toDF("a", "b") + val df2 = Seq(("Hello", 4)).toDF("c", "d") + val joinDF = df.join(df2, + dummyPythonUDF(col("a"), col("b")) === dummyPythonUDF(col("d"), col("c"))) + val qualifiedPlanNodes = joinDF.queryExecution.executedPlan.collect { + case b: BatchEvalPythonExec => b + } + assert(qualifiedPlanNodes.size == 1) + } } // This Python UDF is dummy and just for testing. Unable to execute. From 9ea1cf684a4bd704db9e7fd303982c9bcf34ca7c Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 4 Sep 2018 15:49:56 +0800 Subject: [PATCH 02/24] Address comments --- python/pyspark/sql/tests.py | 4 ++-- .../apache/spark/sql/catalyst/optimizer/Optimizer.scala | 7 ++++++- .../sql/execution/python/BatchEvalPythonExecSuite.scala | 3 ++- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 54996d2a30b49..d4b8509cd6cc3 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -550,6 +550,7 @@ def test_udf_in_filter_on_top_of_join(self): right = self.spark.createDataFrame([Row(b=1)]) f = udf(lambda a, b: a == b, BooleanType()) df = left.crossJoin(right).filter(f("a", "b")) + self.assertEqual(df.collect(), [Row(a=1, b=1)]) def test_udf_in_join_condition(self): # regression test for SPARK-25314 @@ -557,8 +558,7 @@ def test_udf_in_join_condition(self): left = self.spark.createDataFrame([Row(a=1)]) right = self.spark.createDataFrame([Row(b=1)]) f = udf(lambda a, b: a == b, BooleanType()) - df = left.crossJoin(right).filter(f("a", "b")) - self.assertEqual(df.collect(), [Row(a=1, b=1)]) + df = left.join(right, f("a", "b")) self.assertEqual(df.collect(), [Row(a=1, b=1)]) def test_udf_without_arguments(self): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d5b5a8d168b1a..2aed1fb1295b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1241,7 +1241,12 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { // the new join conditions, if all conditions is unevaluable, we should // change the join type to CrossJoin. val newJoinType = - if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) Cross else joinType + if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join.") + Cross + } else joinType val join = Join(newLeft, newRight, newJoinType, newJoinCond) if (others.nonEmpty) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala index 32567a99ce007..e9ab3fafeb51f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala @@ -104,7 +104,8 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { assert(qualifiedPlanNodes.size == 1) } - test("Python UDF refers to the attributes from more than one child in join condition") { + test("SPARK-25314: Python UDF refers to the attributes from more than one child" + + " in join condition") { val df = Seq(("Hello", 4)).toDF("a", "b") val df2 = Seq(("Hello", 4)).toDF("c", "d") val joinDF = df.join(df2, From b6b0aa6afb03eacf2f295ebeeaaa67732c1058c4 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 4 Sep 2018 22:14:53 +0800 Subject: [PATCH 03/24] Add crossJoinEnabled checking logic. --- .../sql/catalyst/optimizer/Optimizer.scala | 28 +++++++++++++------ .../python/BatchEvalPythonExecSuite.scala | 25 ++++++++++++----- 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2aed1fb1295b6..7c2408bb8b9f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1237,16 +1237,28 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { val (newJoinConditions, others) = commonJoinCondition.partition(canEvaluateWithinJoin) val newJoinCond = newJoinConditions.reduceLeftOption(And) - // if condition expression is unevaluable, it will be removed from - // the new join conditions, if all conditions is unevaluable, we should - // change the join type to CrossJoin. val newJoinType = if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { - logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + - s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + - s"turned to cross join.") - Cross - } else joinType + if (SQLConf.get.crossJoinEnabled) { + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") + Cross + } else { + // if the crossJoinEnabled is false, an AnalysisException will throw by + // [[CheckCartesianProducts]], we throw firstly here for better readable + // information. + throw new AnalysisException("Detected the whole commonJoinCondition:" + + "$commonJoinCondition of the join plan is unevaluable, we need to cast the" + + " join to cross join by setting the configuration variable" + + " spark.sql.crossJoin.enabled = true.") + } + } else { + joinType + } val join = Join(newLeft, newRight, newJoinType, newJoinCond) if (others.nonEmpty) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala index e9ab3fafeb51f..83753ed0186ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.spark.api.python.{PythonEvalType, PythonFunction} +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, GreaterThan, In} import org.apache.spark.sql.execution.{FilterExec, InputAdapter, SparkPlanTest, WholeStageCodegenExec} @@ -106,14 +107,24 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { test("SPARK-25314: Python UDF refers to the attributes from more than one child" + " in join condition") { - val df = Seq(("Hello", 4)).toDF("a", "b") - val df2 = Seq(("Hello", 4)).toDF("c", "d") - val joinDF = df.join(df2, - dummyPythonUDF(col("a"), col("b")) === dummyPythonUDF(col("d"), col("c"))) - val qualifiedPlanNodes = joinDF.queryExecution.executedPlan.collect { - case b: BatchEvalPythonExec => b + def dummyPythonUDFTest(): Unit = { + val df = Seq(("Hello", 4)).toDF("a", "b") + val df2 = Seq(("Hello", 4)).toDF("c", "d") + val joinDF = df.join(df2, + dummyPythonUDF(col("a"), col("c")) === dummyPythonUDF(col("d"), col("c"))) + val qualifiedPlanNodes = joinDF.queryExecution.executedPlan.collect { + case b: BatchEvalPythonExec => b + } + assert(qualifiedPlanNodes.size == 1) } - assert(qualifiedPlanNodes.size == 1) + // Test without spark.sql.crossJoin.enabled set + val errMsg = intercept[AnalysisException] { + dummyPythonUDFTest() + } + assert(errMsg.getMessage.startsWith("Detected the whole commonJoinCondition:")) + // Test with spark.sql.crossJoin.enabled=true + spark.conf.set("spark.sql.crossJoin.enabled", "true") + dummyPythonUDFTest() } } From b626fa79fb1f7492865e3f87ca50c6d26c1af288 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 4 Sep 2018 23:46:28 +0800 Subject: [PATCH 04/24] Address comments --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 10 +++++----- .../execution/python/BatchEvalPythonExecSuite.scala | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 7c2408bb8b9f1..7eed36aa08c8d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1252,16 +1252,16 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { // [[CheckCartesianProducts]], we throw firstly here for better readable // information. throw new AnalysisException("Detected the whole commonJoinCondition:" + - "$commonJoinCondition of the join plan is unevaluable, we need to cast the" + - " join to cross join by setting the configuration variable" + - " spark.sql.crossJoin.enabled = true.") + s"$commonJoinCondition of the join plan is unevaluable, we need to cast the " + + "join to cross join by setting the configuration variable " + + s"${SQLConf.CROSS_JOINS_ENABLED.key}=true") } } else { joinType } val join = Join(newLeft, newRight, newJoinType, newJoinCond) - if (others.nonEmpty) { + if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) { Filter(others.reduceLeft(And), join) } else { join @@ -1347,7 +1347,7 @@ object CheckCartesianProducts extends Rule[LogicalPlan] with PredicateHelper { |Join condition is missing or trivial. |Either: use the CROSS JOIN syntax to allow cartesian products between these |relations, or: enable implicit cartesian products by setting the configuration - |variable spark.sql.crossJoin.enabled=true""" + |variable ${SQLConf.CROSS_JOINS_ENABLED.key}=true""" .stripMargin) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala index 83753ed0186ec..58af3639427ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala @@ -105,8 +105,8 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { assert(qualifiedPlanNodes.size == 1) } - test("SPARK-25314: Python UDF refers to the attributes from more than one child" + - " in join condition") { + test("SPARK-25314: Python UDF refers to the attributes from more than one child " + + "in join condition") { def dummyPythonUDFTest(): Unit = { val df = Seq(("Hello", 4)).toDF("a", "b") val df2 = Seq(("Hello", 4)).toDF("c", "d") From 53dd0287881ad89020edc5f0e2fa22b6334453a4 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 5 Sep 2018 21:03:22 +0800 Subject: [PATCH 05/24] Fix the left semi join and more tests --- python/pyspark/sql/tests.py | 37 +++++++++- .../sql/catalyst/optimizer/Optimizer.scala | 74 +++++++++++++------ .../optimizer/FilterPushdownSuite.scala | 28 ++++++- .../python/BatchEvalPythonExecSuite.scala | 16 ++++ 4 files changed, 128 insertions(+), 27 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index d4b8509cd6cc3..07b5a060390dd 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -559,7 +559,42 @@ def test_udf_in_join_condition(self): right = self.spark.createDataFrame([Row(b=1)]) f = udf(lambda a, b: a == b, BooleanType()) df = left.join(right, f("a", "b")) - self.assertEqual(df.collect(), [Row(a=1, b=1)]) + with self.sql_conf({"spark.sql.crossJoin.enabled": True}): + self.assertEqual(df.collect(), [Row(a=1, b=1)]) + + def test_udf_in_left_semi_join_condition(self): + # regression test for SPARK-25314 + from pyspark.sql.functions import udf + left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) + right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1)]) + f = udf(lambda a, b: a == b, BooleanType()) + df = left.join(right, f("a", "b"), "leftsemi") + with self.sql_conf({"spark.sql.crossJoin.enabled": True}): + self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) + + def test_udf_and_filter_in_join_condition(self): + # regression test for SPARK-25314 + # test the complex scenario with both udf(non-deterministic) + # and normal filter(deterministic) + from pyspark.sql.functions import udf + left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) + right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, b1=1, b2=2)]) + f = udf(lambda a, b: a == b, BooleanType()) + df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2]) + with self.sql_conf({"spark.sql.crossJoin.enabled": True}): + self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=2, b1=1, b2=2)]) + + def test_udf_and_filter_in_left_semi_join_condition(self): + # regression test for SPARK-25314 + # test the complex scenario with both udf(non-deterministic) + # and normal filter(deterministic) + from pyspark.sql.functions import udf + left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) + right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, b1=1, b2=2)]) + f = udf(lambda a, b: a == b, BooleanType()) + df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2], "left_semi") + with self.sql_conf({"spark.sql.crossJoin.enabled": True}): + self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) def test_udf_without_arguments(self): self.spark.catalog.registerFunction("foo", lambda: "bar") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 7eed36aa08c8d..9746530a2d21c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1175,6 +1175,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic) } + private def tryToGetCrossType(commonJoinCondition: Seq[Expression], j: LogicalPlan) = { + if (SQLConf.get.crossJoinEnabled) { + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") + Cross + } else { + // if the crossJoinEnabled is false, an AnalysisException will throw by + // [[CheckCartesianProducts]], we throw firstly here for better readable + // information. + throw new AnalysisException("Detected the whole commonJoinCondition:" + + s"$commonJoinCondition of the join plan is unevaluable, we need to cast the " + + "join to cross join by setting the configuration variable " + + s"${SQLConf.CROSS_JOINS_ENABLED.key}=true") + } + } + def apply(plan: LogicalPlan): LogicalPlan = plan transform { // push the where condition down into join filter case f @ Filter(filterCondition, Join(left, right, joinType, joinCondition)) => @@ -1228,7 +1248,29 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) joinType match { - case _: InnerLike | LeftSemi => + case LeftSemi => + // push down the single side only join filter for both sides sub queries + val newLeft = leftJoinConditions. + reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) + val newRight = rightJoinConditions. + reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) + val (newJoinConditions, others) = + commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // need to add cross join when unevaluable condition exists + val newJoinType = if (others.nonEmpty) { + tryToGetCrossType(commonJoinCondition, j) + } else { + joinType + } + + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty) { + Project(newLeft.output.map(_.toAttribute), Filter(others.reduceLeft(And), join)) + } else { + join + } + case _: InnerLike => // push down the single side only join filter for both sides sub queries val newLeft = leftJoinConditions. reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) @@ -1237,31 +1279,15 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { val (newJoinConditions, others) = commonJoinCondition.partition(canEvaluateWithinJoin) val newJoinCond = newJoinConditions.reduceLeftOption(And) - val newJoinType = - if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { - if (SQLConf.get.crossJoinEnabled) { - // if condition expression is unevaluable, it will be removed from - // the new join conditions, if all conditions is unevaluable, we should - // change the join type to CrossJoin. - logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + - "plan is unevaluable, it will be ignored and the join plan will be " + - s"turned to cross join. This plan shows below:\n $j") - Cross - } else { - // if the crossJoinEnabled is false, an AnalysisException will throw by - // [[CheckCartesianProducts]], we throw firstly here for better readable - // information. - throw new AnalysisException("Detected the whole commonJoinCondition:" + - s"$commonJoinCondition of the join plan is unevaluable, we need to cast the " + - "join to cross join by setting the configuration variable " + - s"${SQLConf.CROSS_JOINS_ENABLED.key}=true") - } - } else { - joinType - } + // only need to add cross join when whole commonJoinCondition are unevaluable + val newJoinType = if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + tryToGetCrossType(commonJoinCondition, j) + } else { + joinType + } val join = Join(newLeft, newRight, newJoinType, newJoinCond) - if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) { + if (others.nonEmpty) { Filter(others.reduceLeft(And), join) } else { join diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 14498a984122a..39e5ffb520a15 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.IntegerType import org.apache.spark.unsafe.types.CalendarInterval @@ -1157,8 +1158,31 @@ class FilterPushdownSuite extends PlanTest { // CheckAnalysis will ensure nondeterministic expressions not appear in join condition. // TODO support nondeterministic expressions in join condition. - comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze, - checkAnalysis = false) + withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze, + checkAnalysis = false) + } + } + + test("join condition pushdown: deterministic and non-deterministic in left semi join") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + // Verify that all conditions except the watermark touching condition are pushed down + // by the optimizer and others are not. + val originalQuery = x.join(y, LeftSemi, Some("x.a".attr === 5 && "y.a".attr === 5 && + "x.a".attr === Rand(10) && "y.b".attr === 5)) + val correctAnswer = + x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && "y.b".attr === 5), + joinType = Cross).where("x.a".attr === Rand(10)) + .select("x.a".attr, "x.b".attr, "x.c".attr) + + // CheckAnalysis will ensure nondeterministic expressions not appear in join condition. + // TODO support nondeterministic expressions in join condition. + withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze, + checkAnalysis = false) + } } test("watermark pushdown: no pushdown on watermark attribute #1") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala index 58af3639427ac..5b992f478216a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala @@ -126,6 +126,22 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { spark.conf.set("spark.sql.crossJoin.enabled", "true") dummyPythonUDFTest() } + + test("SPARK-25314") { + def dummyPythonUDFTest(): Unit = { + val df = Seq(("Hello", 4)).toDF("a", "b") + val df2 = Seq(("Hello", 4)).toDF("c", "d") + val joinDF = df.join(df2, + dummyPythonUDF(col("a"), col("c")) === dummyPythonUDF(col("d"), col("c")), "left_semi") + val qualifiedPlanNodes = joinDF.queryExecution.executedPlan.collect { + case b: BatchEvalPythonExec => b + } + assert(qualifiedPlanNodes.size == 1) + } + // Test with spark.sql.crossJoin.enabled=true + spark.conf.set("spark.sql.crossJoin.enabled", "true") + dummyPythonUDFTest() + } } // This Python UDF is dummy and just for testing. Unable to execute. From 4ca7fd1e7813dd64026c1318af6f769800f24d1a Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 5 Sep 2018 21:17:15 +0800 Subject: [PATCH 06/24] fix python style --- python/pyspark/sql/tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 07b5a060390dd..b425198b19f75 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -592,7 +592,7 @@ def test_udf_and_filter_in_left_semi_join_condition(self): left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, b1=1, b2=2)]) f = udf(lambda a, b: a == b, BooleanType()) - df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2], "left_semi") + df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2], "left_semi") with self.sql_conf({"spark.sql.crossJoin.enabled": True}): self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) From 1109eb35a375a35181d7104607ae4a90b180e45f Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 7 Sep 2018 15:54:55 +0800 Subject: [PATCH 07/24] Address comments --- .../sql/catalyst/optimizer/Optimizer.scala | 45 +++++++++++-------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 9746530a2d21c..f18db9cac1ed2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1186,8 +1186,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { Cross } else { // if the crossJoinEnabled is false, an AnalysisException will throw by - // [[CheckCartesianProducts]], we throw firstly here for better readable - // information. + // CheckCartesianProducts, we throw firstly here for better readable information. throw new AnalysisException("Detected the whole commonJoinCondition:" + s"$commonJoinCondition of the join plan is unevaluable, we need to cast the " + "join to cross join by setting the configuration variable " + @@ -1195,6 +1194,28 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { } } + /** + * Generate new left and right child of join by pushing down the side only join filter, + * split commonJoinCondition based on the expression can be evaluated within join or not. + * + * @return (newLeftChild, newRightChild, newJoinCondition, conditionCannotEvaluateWithinJoin) + */ + private def getNewChildAndSplitCondForJoin( + join: Join, + leftJoinConditions: Seq[Expression], + rightJoinConditions: Seq[Expression], + commonJoinCondition: Seq[Expression]) = { + // push down the single side only join filter for both sides sub queries + val newLeft = leftJoinConditions. + reduceLeftOption(And).map(Filter(_, join.left)).getOrElse(join.left) + val newRight = rightJoinConditions. + reduceLeftOption(And).map(Filter(_, join.right)).getOrElse(join.right) + val (newJoinConditions, others) = + commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + (newLeft, newRight, newJoinCond, others) + } + def apply(plan: LogicalPlan): LogicalPlan = plan transform { // push the where condition down into join filter case f @ Filter(filterCondition, Join(left, right, joinType, joinCondition)) => @@ -1249,14 +1270,8 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { joinType match { case LeftSemi => - // push down the single side only join filter for both sides sub queries - val newLeft = leftJoinConditions. - reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) - val newRight = rightJoinConditions. - reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val (newJoinConditions, others) = - commonJoinCondition.partition(canEvaluateWithinJoin) - val newJoinCond = newJoinConditions.reduceLeftOption(And) + val (newLeft, newRight, newJoinCond, others) = getNewChildAndSplitCondForJoin( + j, leftJoinConditions, rightJoinConditions, commonJoinCondition) // need to add cross join when unevaluable condition exists val newJoinType = if (others.nonEmpty) { tryToGetCrossType(commonJoinCondition, j) @@ -1271,14 +1286,8 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { join } case _: InnerLike => - // push down the single side only join filter for both sides sub queries - val newLeft = leftJoinConditions. - reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) - val newRight = rightJoinConditions. - reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val (newJoinConditions, others) = - commonJoinCondition.partition(canEvaluateWithinJoin) - val newJoinCond = newJoinConditions.reduceLeftOption(And) + val (newLeft, newRight, newJoinCond, others) = getNewChildAndSplitCondForJoin( + j, leftJoinConditions, rightJoinConditions, commonJoinCondition) // only need to add cross join when whole commonJoinCondition are unevaluable val newJoinType = if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { tryToGetCrossType(commonJoinCondition, j) From c6345fe573c04ddc77263e13a35bc5d1498bfeac Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 7 Sep 2018 15:57:19 +0800 Subject: [PATCH 08/24] Delete mistake commit --- .../python/BatchEvalPythonExecSuite.scala | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala index 5b992f478216a..58af3639427ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala @@ -126,22 +126,6 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { spark.conf.set("spark.sql.crossJoin.enabled", "true") dummyPythonUDFTest() } - - test("SPARK-25314") { - def dummyPythonUDFTest(): Unit = { - val df = Seq(("Hello", 4)).toDF("a", "b") - val df2 = Seq(("Hello", 4)).toDF("c", "d") - val joinDF = df.join(df2, - dummyPythonUDF(col("a"), col("c")) === dummyPythonUDF(col("d"), col("c")), "left_semi") - val qualifiedPlanNodes = joinDF.queryExecution.executedPlan.collect { - case b: BatchEvalPythonExec => b - } - assert(qualifiedPlanNodes.size == 1) - } - // Test with spark.sql.crossJoin.enabled=true - spark.conf.set("spark.sql.crossJoin.enabled", "true") - dummyPythonUDFTest() - } } // This Python UDF is dummy and just for testing. Unable to execute. From 83660d57dcdba91b8b5da72c7dcc9225ab4504af Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Sat, 8 Sep 2018 17:51:19 +0800 Subject: [PATCH 09/24] Address comments --- .../sql/catalyst/optimizer/Optimizer.scala | 36 +++++++------------ 1 file changed, 13 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f18db9cac1ed2..15066441ce3de 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1195,12 +1195,10 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { } /** - * Generate new left and right child of join by pushing down the side only join filter, - * split commonJoinCondition based on the expression can be evaluated within join or not. - * - * @return (newLeftChild, newRightChild, newJoinCondition, conditionCannotEvaluateWithinJoin) + * Generate new join by pushing down the side only join filter, split commonJoinCondition + * based on the expression can be evaluated within join or not and return unevaluable part. */ - private def getNewChildAndSplitCondForJoin( + private def getNewJoinAndUnevaluableCond( join: Join, leftJoinConditions: Seq[Expression], rightJoinConditions: Seq[Expression], @@ -1213,7 +1211,13 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { val (newJoinConditions, others) = commonJoinCondition.partition(canEvaluateWithinJoin) val newJoinCond = newJoinConditions.reduceLeftOption(And) - (newLeft, newRight, newJoinCond, others) + // need to add cross join when unevaluable condition exists + val newJoinType = if (others.nonEmpty) { + tryToGetCrossType(commonJoinCondition, join) + } else { + join.joinType + } + (Join(newLeft, newRight, newJoinType, newJoinCond), others) } def apply(plan: LogicalPlan): LogicalPlan = plan transform { @@ -1270,32 +1274,18 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { joinType match { case LeftSemi => - val (newLeft, newRight, newJoinCond, others) = getNewChildAndSplitCondForJoin( + val (join, others) = getNewJoinAndUnevaluableCond( j, leftJoinConditions, rightJoinConditions, commonJoinCondition) - // need to add cross join when unevaluable condition exists - val newJoinType = if (others.nonEmpty) { - tryToGetCrossType(commonJoinCondition, j) - } else { - joinType - } - val join = Join(newLeft, newRight, newJoinType, newJoinCond) if (others.nonEmpty) { - Project(newLeft.output.map(_.toAttribute), Filter(others.reduceLeft(And), join)) + Project(join.left.output.map(_.toAttribute), Filter(others.reduceLeft(And), join)) } else { join } case _: InnerLike => - val (newLeft, newRight, newJoinCond, others) = getNewChildAndSplitCondForJoin( + val (join, others) = getNewJoinAndUnevaluableCond( j, leftJoinConditions, rightJoinConditions, commonJoinCondition) - // only need to add cross join when whole commonJoinCondition are unevaluable - val newJoinType = if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { - tryToGetCrossType(commonJoinCondition, j) - } else { - joinType - } - val join = Join(newLeft, newRight, newJoinType, newJoinCond) if (others.nonEmpty) { Filter(others.reduceLeft(And), join) } else { From fdc86ca13802368932d0cda78b138abdbdedb891 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Sat, 8 Sep 2018 18:53:44 +0800 Subject: [PATCH 10/24] Add UT for common filter and udf --- python/pyspark/sql/tests.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index b425198b19f75..fb618b8520b75 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -596,6 +596,30 @@ def test_udf_and_filter_in_left_semi_join_condition(self): with self.sql_conf({"spark.sql.crossJoin.enabled": True}): self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) + def test_udf_and_common_filter_in_join_condition(self): + # regression test for SPARK-25314 + # test the complex scenario with both udf(non-deterministic) + # and common filter(deterministic) + from pyspark.sql.functions import udf + left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) + right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, b1=3, b2=1)]) + f = udf(lambda a, b: a == b, BooleanType()) + df = left.join(right, [f("a", "b"), left.a1 == right.b1]) + with self.sql_conf({"spark.sql.crossJoin.enabled": True}): + self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=1, b1=1, b2=1)]) + + def test_udf_and_common_filter_in_left_semi_join_condition(self): + # regression test for SPARK-25314 + # test the complex scenario with both udf(non-deterministic) + # and common filter(deterministic) + from pyspark.sql.functions import udf + left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) + right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, b1=3, b2=1)]) + f = udf(lambda a, b: a == b, BooleanType()) + df = left.join(right, [f("a", "b"), left.a1 == right.b1], "left_semi") + with self.sql_conf({"spark.sql.crossJoin.enabled": True}): + self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) + def test_udf_without_arguments(self): self.spark.catalog.registerFunction("foo", lambda: "bar") [row] = self.spark.sql("SELECT foo()").collect() From fbf32f4dd17e634e2cce889b9245258f5ba2c9c6 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 14 Sep 2018 00:04:13 +0800 Subject: [PATCH 11/24] limit the change scope only to PythonUDF --- .../sql/catalyst/optimizer/Optimizer.scala | 18 +++++++++--------- .../optimizer/FilterPushdownSuite.scala | 8 +++----- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 15066441ce3de..5d7b89f00fa4d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1210,14 +1210,14 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, join.right)).getOrElse(join.right) val (newJoinConditions, others) = commonJoinCondition.partition(canEvaluateWithinJoin) - val newJoinCond = newJoinConditions.reduceLeftOption(And) // need to add cross join when unevaluable condition exists - val newJoinType = if (others.nonEmpty) { - tryToGetCrossType(commonJoinCondition, join) + val pythonUDFExist = others.exists(_.find(_.isInstanceOf[PythonUDF]).isDefined) + val (newJoinType, newJoinCond) = if (pythonUDFExist) { + (tryToGetCrossType(commonJoinCondition, join), newJoinConditions.reduceLeftOption(And)) } else { - join.joinType + (join.joinType, commonJoinCondition.reduceLeftOption(And)) } - (Join(newLeft, newRight, newJoinType, newJoinCond), others) + (Join(newLeft, newRight, newJoinType, newJoinCond), pythonUDFExist, others) } def apply(plan: LogicalPlan): LogicalPlan = plan transform { @@ -1274,19 +1274,19 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { joinType match { case LeftSemi => - val (join, others) = getNewJoinAndUnevaluableCond( + val (join, crossAdded, others) = getNewJoinAndUnevaluableCond( j, leftJoinConditions, rightJoinConditions, commonJoinCondition) - if (others.nonEmpty) { + if (crossAdded) { Project(join.left.output.map(_.toAttribute), Filter(others.reduceLeft(And), join)) } else { join } case _: InnerLike => - val (join, others) = getNewJoinAndUnevaluableCond( + val (join, crossAdded, others) = getNewJoinAndUnevaluableCond( j, leftJoinConditions, rightJoinConditions, commonJoinCondition) - if (others.nonEmpty) { + if (crossAdded) { Filter(others.reduceLeft(And), join) } else { join diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 39e5ffb520a15..8482120e828d0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -1154,14 +1154,12 @@ class FilterPushdownSuite extends PlanTest { "x.a".attr === Rand(10) && "y.b".attr === 5)) val correctAnswer = x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && "y.b".attr === 5), - joinType = Cross).where("x.a".attr === Rand(10)) + condition = Some("x.a".attr === Rand(10))) // CheckAnalysis will ensure nondeterministic expressions not appear in join condition. // TODO support nondeterministic expressions in join condition. - withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { - comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze, - checkAnalysis = false) - } + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze, + checkAnalysis = false) } test("join condition pushdown: deterministic and non-deterministic in left semi join") { From 292b09c75ceecd8b47192ed6dea9cc5bd6aa2672 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Sat, 22 Sep 2018 23:47:52 +0800 Subject: [PATCH 12/24] Reimplement the logic in Analyzer instead of Optimizer --- .../sql/catalyst/analysis/Analyzer.scala | 51 ++++++++++++- .../sql/catalyst/optimizer/Optimizer.scala | 72 +++---------------- .../optimizer/FilterPushdownSuite.scala | 21 ------ .../scala/org/apache/spark/sql/Dataset.scala | 18 +++-- .../python/BatchEvalPythonExecSuite.scala | 24 +++---- 5 files changed, 78 insertions(+), 108 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e3b17121bf350..0db291630ecab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -194,7 +194,8 @@ class Analyzer( Batch("Nondeterministic", Once, PullOutNondeterministic), Batch("UDF", Once, - HandleNullInputsForUDF), + HandleNullInputsForUDF, + HandlePythonUDFInJoinCondition(conf)), Batch("FixNullability", Once, FixNullability), Batch("Subquery", Once, @@ -2181,6 +2182,54 @@ class Analyzer( } } + /** + * Correctly handle PythonUDF which need access both side of join side by changing the new join + * type to Cross. + */ + case class HandlePythonUDFInJoinCondition(conf: SQLConf) extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case j @ Join(_, _, joinType, condition) + if condition.map(splitConjunctivePredicates).getOrElse(Nil).exists( + _.collectFirst { case udf: PythonUDF => udf }.isDefined) => + if (conf.crossJoinEnabled) { + // if condition expression contains python udf, it will be moved out from + // the new join conditions, and the join type will be changed to CrossJoin. + logWarning(s"The join condition:$condition of the join plan contains " + + "PythonUDF, it will be moved out and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") + val (udf, rest) = + condition.map(splitConjunctivePredicates).get.partition( + _.collectFirst { case udf: PythonUDF => udf }.isDefined) + val newCondition = if (rest.isEmpty) { + Option.empty + } else { + Some(rest.reduceLeft(And)) + } + val newJoin = j.copy(joinType = Cross, condition = newCondition) + joinType match { + case _: InnerLike => + Filter(udf.reduceLeft(And), newJoin) + case LeftSemi => + Project( + j.left.output.map(_.toAttribute), Filter(udf.reduceLeft(And), newJoin)) + case _ => + // TODO: Need to support more join type + j + } + } else { + // if the crossJoinEnabled is false, a RuntimeException will be thrown later while + // the PythonUDF need to access both side of join. We give a warning log not throw + // an Exception first here because the python udf may only access one side. + logWarning(s"Detected the join condition:$condition of this join plan contains" + + " PythonUDF, if the PythonUDF need to access both side of join, it will get" + + " an invalid PythonUDF RuntimeException with message `requires attributes from" + + " more than one child`, we need to cast the join to cross join by setting the" + + s" config ${SQLConf.CROSS_JOINS_ENABLED.key}=true") + j + } + } + } + /** * Check and add proper window frames for all window functions. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 5d7b89f00fa4d..7c461895c5e52 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1175,51 +1175,6 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic) } - private def tryToGetCrossType(commonJoinCondition: Seq[Expression], j: LogicalPlan) = { - if (SQLConf.get.crossJoinEnabled) { - // if condition expression is unevaluable, it will be removed from - // the new join conditions, if all conditions is unevaluable, we should - // change the join type to CrossJoin. - logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + - "plan is unevaluable, it will be ignored and the join plan will be " + - s"turned to cross join. This plan shows below:\n $j") - Cross - } else { - // if the crossJoinEnabled is false, an AnalysisException will throw by - // CheckCartesianProducts, we throw firstly here for better readable information. - throw new AnalysisException("Detected the whole commonJoinCondition:" + - s"$commonJoinCondition of the join plan is unevaluable, we need to cast the " + - "join to cross join by setting the configuration variable " + - s"${SQLConf.CROSS_JOINS_ENABLED.key}=true") - } - } - - /** - * Generate new join by pushing down the side only join filter, split commonJoinCondition - * based on the expression can be evaluated within join or not and return unevaluable part. - */ - private def getNewJoinAndUnevaluableCond( - join: Join, - leftJoinConditions: Seq[Expression], - rightJoinConditions: Seq[Expression], - commonJoinCondition: Seq[Expression]) = { - // push down the single side only join filter for both sides sub queries - val newLeft = leftJoinConditions. - reduceLeftOption(And).map(Filter(_, join.left)).getOrElse(join.left) - val newRight = rightJoinConditions. - reduceLeftOption(And).map(Filter(_, join.right)).getOrElse(join.right) - val (newJoinConditions, others) = - commonJoinCondition.partition(canEvaluateWithinJoin) - // need to add cross join when unevaluable condition exists - val pythonUDFExist = others.exists(_.find(_.isInstanceOf[PythonUDF]).isDefined) - val (newJoinType, newJoinCond) = if (pythonUDFExist) { - (tryToGetCrossType(commonJoinCondition, join), newJoinConditions.reduceLeftOption(And)) - } else { - (join.joinType, commonJoinCondition.reduceLeftOption(And)) - } - (Join(newLeft, newRight, newJoinType, newJoinCond), pythonUDFExist, others) - } - def apply(plan: LogicalPlan): LogicalPlan = plan transform { // push the where condition down into join filter case f @ Filter(filterCondition, Join(left, right, joinType, joinCondition)) => @@ -1273,24 +1228,15 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) joinType match { - case LeftSemi => - val (join, crossAdded, others) = getNewJoinAndUnevaluableCond( - j, leftJoinConditions, rightJoinConditions, commonJoinCondition) - - if (crossAdded) { - Project(join.left.output.map(_.toAttribute), Filter(others.reduceLeft(And), join)) - } else { - join - } - case _: InnerLike => - val (join, crossAdded, others) = getNewJoinAndUnevaluableCond( - j, leftJoinConditions, rightJoinConditions, commonJoinCondition) + case _: InnerLike | LeftSemi => + // push down the single side only join filter for both sides sub queries + val newLeft = leftJoinConditions. + reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) + val newRight = rightJoinConditions. + reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) + val newJoinCond = commonJoinCondition.reduceLeftOption(And) - if (crossAdded) { - Filter(others.reduceLeft(And), join) - } else { - join - } + Join(newLeft, newRight, joinType, newJoinCond) case RightOuter => // push down the left side only join filter for left side sub query val newLeft = leftJoinConditions. @@ -1372,7 +1318,7 @@ object CheckCartesianProducts extends Rule[LogicalPlan] with PredicateHelper { |Join condition is missing or trivial. |Either: use the CROSS JOIN syntax to allow cartesian products between these |relations, or: enable implicit cartesian products by setting the configuration - |variable ${SQLConf.CROSS_JOINS_ENABLED.key}=true""" + |variable spark.sql.crossJoin.enabled=true""" .stripMargin) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 8482120e828d0..5747509c54476 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -1162,27 +1162,6 @@ class FilterPushdownSuite extends PlanTest { checkAnalysis = false) } - test("join condition pushdown: deterministic and non-deterministic in left semi join") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - - // Verify that all conditions except the watermark touching condition are pushed down - // by the optimizer and others are not. - val originalQuery = x.join(y, LeftSemi, Some("x.a".attr === 5 && "y.a".attr === 5 && - "x.a".attr === Rand(10) && "y.b".attr === 5)) - val correctAnswer = - x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && "y.b".attr === 5), - joinType = Cross).where("x.a".attr === Rand(10)) - .select("x.a".attr, "x.b".attr, "x.c".attr) - - // CheckAnalysis will ensure nondeterministic expressions not appear in join condition. - // TODO support nondeterministic expressions in join condition. - withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { - comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze, - checkAnalysis = false) - } - } - test("watermark pushdown: no pushdown on watermark attribute #1") { val interval = new CalendarInterval(2, 2000L) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index fa14aa14ee968..4758dcc125d23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -995,7 +995,8 @@ class Dataset[T] private[sql]( // After the cloning, left and right side will have distinct expression ids. val plan = withPlan( Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr))) - .queryExecution.analyzed.asInstanceOf[Join] + .queryExecution.analyzed + val joinPlan = plan.collectFirst { case j: Join => j }.get // If auto self join alias is disabled, return the plan. if (!sparkSession.sessionState.conf.dataFrameSelfJoinAutoResolveAmbiguity) { @@ -1012,21 +1013,24 @@ class Dataset[T] private[sql]( // Otherwise, find the trivially true predicates and automatically resolves them to both sides. // By the time we get here, since we have already run analysis, all attributes should've been // resolved and become AttributeReference. - val cond = plan.condition.map { _.transform { + val cond = joinPlan.condition.map { _.transform { case catalyst.expressions.EqualTo(a: AttributeReference, b: AttributeReference) if a.sameRef(b) => catalyst.expressions.EqualTo( - withPlan(plan.left).resolve(a.name), - withPlan(plan.right).resolve(b.name)) + withPlan(joinPlan.left).resolve(a.name), + withPlan(joinPlan.right).resolve(b.name)) case catalyst.expressions.EqualNullSafe(a: AttributeReference, b: AttributeReference) if a.sameRef(b) => catalyst.expressions.EqualNullSafe( - withPlan(plan.left).resolve(a.name), - withPlan(plan.right).resolve(b.name)) + withPlan(joinPlan.left).resolve(a.name), + withPlan(joinPlan.right).resolve(b.name)) }} withPlan { - plan.copy(condition = cond) + plan.transformDown { + case j: Join if j == joinPlan => + j.copy(condition = cond) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala index 58af3639427ac..6e338a93c0072 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala @@ -107,24 +107,16 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { test("SPARK-25314: Python UDF refers to the attributes from more than one child " + "in join condition") { - def dummyPythonUDFTest(): Unit = { - val df = Seq(("Hello", 4)).toDF("a", "b") - val df2 = Seq(("Hello", 4)).toDF("c", "d") - val joinDF = df.join(df2, - dummyPythonUDF(col("a"), col("c")) === dummyPythonUDF(col("d"), col("c"))) - val qualifiedPlanNodes = joinDF.queryExecution.executedPlan.collect { - case b: BatchEvalPythonExec => b - } - assert(qualifiedPlanNodes.size == 1) - } - // Test without spark.sql.crossJoin.enabled set - val errMsg = intercept[AnalysisException] { - dummyPythonUDFTest() - } - assert(errMsg.getMessage.startsWith("Detected the whole commonJoinCondition:")) // Test with spark.sql.crossJoin.enabled=true spark.conf.set("spark.sql.crossJoin.enabled", "true") - dummyPythonUDFTest() + val df = Seq(("Hello", 4)).toDF("a", "b") + val df2 = Seq(("Hello", 4)).toDF("c", "d") + val joinDF = df.join(df2, + dummyPythonUDF(col("a"), col("c")) === dummyPythonUDF(col("d"), col("c"))) + val qualifiedPlanNodes = joinDF.queryExecution.executedPlan.collect { + case b: BatchEvalPythonExec => b + } + assert(qualifiedPlanNodes.size == 1) } } From 6749a963f8afee90407d3ed0553b2918cf5f304b Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Sun, 23 Sep 2018 11:44:45 +0800 Subject: [PATCH 13/24] config should be set in analyzer --- python/pyspark/sql/tests.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index fb618b8520b75..5687de0e1f11b 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -558,8 +558,8 @@ def test_udf_in_join_condition(self): left = self.spark.createDataFrame([Row(a=1)]) right = self.spark.createDataFrame([Row(b=1)]) f = udf(lambda a, b: a == b, BooleanType()) - df = left.join(right, f("a", "b")) with self.sql_conf({"spark.sql.crossJoin.enabled": True}): + df = left.join(right, f("a", "b")) self.assertEqual(df.collect(), [Row(a=1, b=1)]) def test_udf_in_left_semi_join_condition(self): @@ -568,8 +568,8 @@ def test_udf_in_left_semi_join_condition(self): left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1)]) f = udf(lambda a, b: a == b, BooleanType()) - df = left.join(right, f("a", "b"), "leftsemi") with self.sql_conf({"spark.sql.crossJoin.enabled": True}): + df = left.join(right, f("a", "b"), "leftsemi") self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) def test_udf_and_filter_in_join_condition(self): @@ -580,8 +580,8 @@ def test_udf_and_filter_in_join_condition(self): left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, b1=1, b2=2)]) f = udf(lambda a, b: a == b, BooleanType()) - df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2]) with self.sql_conf({"spark.sql.crossJoin.enabled": True}): + df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2]) self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=2, b1=1, b2=2)]) def test_udf_and_filter_in_left_semi_join_condition(self): @@ -592,8 +592,8 @@ def test_udf_and_filter_in_left_semi_join_condition(self): left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, b1=1, b2=2)]) f = udf(lambda a, b: a == b, BooleanType()) - df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2], "left_semi") with self.sql_conf({"spark.sql.crossJoin.enabled": True}): + df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2], "left_semi") self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) def test_udf_and_common_filter_in_join_condition(self): @@ -604,8 +604,8 @@ def test_udf_and_common_filter_in_join_condition(self): left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, b1=3, b2=1)]) f = udf(lambda a, b: a == b, BooleanType()) - df = left.join(right, [f("a", "b"), left.a1 == right.b1]) with self.sql_conf({"spark.sql.crossJoin.enabled": True}): + df = left.join(right, [f("a", "b"), left.a1 == right.b1]) self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=1, b1=1, b2=1)]) def test_udf_and_common_filter_in_left_semi_join_condition(self): @@ -616,8 +616,8 @@ def test_udf_and_common_filter_in_left_semi_join_condition(self): left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, b1=3, b2=1)]) f = udf(lambda a, b: a == b, BooleanType()) - df = left.join(right, [f("a", "b"), left.a1 == right.b1], "left_semi") with self.sql_conf({"spark.sql.crossJoin.enabled": True}): + df = left.join(right, [f("a", "b"), left.a1 == right.b1], "left_semi") self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) def test_udf_without_arguments(self): From a598a4ed4e0ec56f372d81d0ffe35521f00bd13f Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 25 Sep 2018 16:19:50 +0800 Subject: [PATCH 14/24] Move the rule to optimizer --- python/pyspark/sql/tests.py | 28 +++++++-- .../sql/catalyst/analysis/Analyzer.scala | 51 +--------------- .../sql/catalyst/optimizer/Optimizer.scala | 58 ++++++++++++++++++- .../optimizer/FilterPushdownSuite.scala | 1 - .../scala/org/apache/spark/sql/Dataset.scala | 18 +++--- .../python/BatchEvalPythonExecSuite.scala | 24 +++++--- 6 files changed, 103 insertions(+), 77 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 5687de0e1f11b..bace282b945a9 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -558,8 +558,8 @@ def test_udf_in_join_condition(self): left = self.spark.createDataFrame([Row(a=1)]) right = self.spark.createDataFrame([Row(b=1)]) f = udf(lambda a, b: a == b, BooleanType()) + df = left.join(right, f("a", "b")) with self.sql_conf({"spark.sql.crossJoin.enabled": True}): - df = left.join(right, f("a", "b")) self.assertEqual(df.collect(), [Row(a=1, b=1)]) def test_udf_in_left_semi_join_condition(self): @@ -568,8 +568,8 @@ def test_udf_in_left_semi_join_condition(self): left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1)]) f = udf(lambda a, b: a == b, BooleanType()) + df = left.join(right, f("a", "b"), "leftsemi") with self.sql_conf({"spark.sql.crossJoin.enabled": True}): - df = left.join(right, f("a", "b"), "leftsemi") self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) def test_udf_and_filter_in_join_condition(self): @@ -580,8 +580,8 @@ def test_udf_and_filter_in_join_condition(self): left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, b1=1, b2=2)]) f = udf(lambda a, b: a == b, BooleanType()) + df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2]) with self.sql_conf({"spark.sql.crossJoin.enabled": True}): - df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2]) self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=2, b1=1, b2=2)]) def test_udf_and_filter_in_left_semi_join_condition(self): @@ -592,8 +592,8 @@ def test_udf_and_filter_in_left_semi_join_condition(self): left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, b1=1, b2=2)]) f = udf(lambda a, b: a == b, BooleanType()) + df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2], "left_semi") with self.sql_conf({"spark.sql.crossJoin.enabled": True}): - df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2], "left_semi") self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) def test_udf_and_common_filter_in_join_condition(self): @@ -604,8 +604,8 @@ def test_udf_and_common_filter_in_join_condition(self): left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, b1=3, b2=1)]) f = udf(lambda a, b: a == b, BooleanType()) + df = left.join(right, [f("a", "b"), left.a1 == right.b1]) with self.sql_conf({"spark.sql.crossJoin.enabled": True}): - df = left.join(right, [f("a", "b"), left.a1 == right.b1]) self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=1, b1=1, b2=1)]) def test_udf_and_common_filter_in_left_semi_join_condition(self): @@ -616,10 +616,26 @@ def test_udf_and_common_filter_in_left_semi_join_condition(self): left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, b1=3, b2=1)]) f = udf(lambda a, b: a == b, BooleanType()) + df = left.join(right, [f("a", "b"), left.a1 == right.b1], "left_semi") with self.sql_conf({"spark.sql.crossJoin.enabled": True}): - df = left.join(right, [f("a", "b"), left.a1 == right.b1], "left_semi") self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) + def test_udf_not_supported_in_join_condition(self): + # regression test for SPARK-25314 + # test python udf is not supported in join type besides left_semi and inner join. + from pyspark.sql.functions import udf + left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) + right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, b1=3, b2=1)]) + f = udf(lambda a, b: a == b, BooleanType()) + def runWithJoinType(join_type, type_string): + with self.assertRaisesRegexp(AnalysisException, + 'Using PythonUDF.*%s is not supported.' % type_string): + left.join(right, [f("a", "b"), left.a1 == right.b1], join_type).collect() + runWithJoinType("full", "FullOuter") + runWithJoinType("left", "LeftOuter") + runWithJoinType("right", "RightOuter") + runWithJoinType("leftanti", "LeftAnti") + def test_udf_without_arguments(self): self.spark.catalog.registerFunction("foo", lambda: "bar") [row] = self.spark.sql("SELECT foo()").collect() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0db291630ecab..e3b17121bf350 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -194,8 +194,7 @@ class Analyzer( Batch("Nondeterministic", Once, PullOutNondeterministic), Batch("UDF", Once, - HandleNullInputsForUDF, - HandlePythonUDFInJoinCondition(conf)), + HandleNullInputsForUDF), Batch("FixNullability", Once, FixNullability), Batch("Subquery", Once, @@ -2182,54 +2181,6 @@ class Analyzer( } } - /** - * Correctly handle PythonUDF which need access both side of join side by changing the new join - * type to Cross. - */ - case class HandlePythonUDFInJoinCondition(conf: SQLConf) extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { - case j @ Join(_, _, joinType, condition) - if condition.map(splitConjunctivePredicates).getOrElse(Nil).exists( - _.collectFirst { case udf: PythonUDF => udf }.isDefined) => - if (conf.crossJoinEnabled) { - // if condition expression contains python udf, it will be moved out from - // the new join conditions, and the join type will be changed to CrossJoin. - logWarning(s"The join condition:$condition of the join plan contains " + - "PythonUDF, it will be moved out and the join plan will be " + - s"turned to cross join. This plan shows below:\n $j") - val (udf, rest) = - condition.map(splitConjunctivePredicates).get.partition( - _.collectFirst { case udf: PythonUDF => udf }.isDefined) - val newCondition = if (rest.isEmpty) { - Option.empty - } else { - Some(rest.reduceLeft(And)) - } - val newJoin = j.copy(joinType = Cross, condition = newCondition) - joinType match { - case _: InnerLike => - Filter(udf.reduceLeft(And), newJoin) - case LeftSemi => - Project( - j.left.output.map(_.toAttribute), Filter(udf.reduceLeft(And), newJoin)) - case _ => - // TODO: Need to support more join type - j - } - } else { - // if the crossJoinEnabled is false, a RuntimeException will be thrown later while - // the PythonUDF need to access both side of join. We give a warning log not throw - // an Exception first here because the python udf may only access one side. - logWarning(s"Detected the join condition:$condition of this join plan contains" + - " PythonUDF, if the PythonUDF need to access both side of join, it will get" + - " an invalid PythonUDF RuntimeException with message `requires attributes from" + - " more than one child`, we need to cast the join to cross join by setting the" + - s" config ${SQLConf.CROSS_JOINS_ENABLED.key}=true") - j - } - } - } - /** * Check and add proper window frames for all window functions. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 7c461895c5e52..2b8f2cc5210d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -173,6 +173,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) ColumnPruning, CollapseProject, RemoveRedundantProject) :+ + Batch("Extract PythonUDF From JoinCondition", Once, + HandlePythonUDFInJoinCondition) :+ Batch("UpdateAttributeReferences", Once, UpdateNullabilityInAttributeReferences) } @@ -202,7 +204,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) ReplaceDistinctWithAggregate.ruleName :: PullupCorrelatedPredicates.ruleName :: RewriteCorrelatedScalarSubquery.ruleName :: - RewritePredicateSubquery.ruleName :: Nil + RewritePredicateSubquery.ruleName :: + HandlePythonUDFInJoinCondition.ruleName :: Nil /** * Optimize all the subqueries inside expression. @@ -1260,6 +1263,59 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { } } +/** + * Correctly handle PythonUDF which need access both side of join side by changing the new join + * type to Cross. + */ +object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateHelper { + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case j @ Join(_, _, joinType, condition) + if condition.map(splitConjunctivePredicates).getOrElse(Nil).exists( + _.collectFirst { case udf: PythonUDF => udf }.isDefined) => + if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) { + // The current strategy only support InnerLike and LeftSemi join because other type + // can not simply be resolved by adding a Cross join. If we pass the plan here, it'll + // still get a an invalid PythonUDF RuntimeException with message `requires attributes + // from more than one child`, we throw firstly here for better readable information. + throw new AnalysisException("Using PythonUDF in join condition of join type" + + s" $joinType is not supported.") + } + if (SQLConf.get.crossJoinEnabled) { + // if condition expression contains python udf, it will be moved out from + // the new join conditions, and the join type will be changed to CrossJoin. + logWarning(s"The join condition:$condition of the join plan contains " + + "PythonUDF, it will be moved out and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") + val (udf, rest) = + condition.map(splitConjunctivePredicates).get.partition( + _.collectFirst { case udf: PythonUDF => udf }.isDefined) + val newCondition = if (rest.isEmpty) { + Option.empty + } else { + Some(rest.reduceLeft(And)) + } + val newJoin = j.copy(joinType = Cross, condition = newCondition) + joinType match { + case _: InnerLike => + Filter(udf.reduceLeft(And), newJoin) + case LeftSemi => + Project( + j.left.output.map(_.toAttribute), Filter(udf.reduceLeft(And), newJoin)) + } + } else { + // if the crossJoinEnabled is false, a RuntimeException will be thrown later while + // the PythonUDF need to access both side of join, we throw firstly here for better + // readable information. + throw new AnalysisException(s"Detected the join condition:$condition of this join " + + "plan contains PythonUDF, if the PythonUDF need to access both side of join, " + + "it will get an invalid PythonUDF RuntimeException with message `requires attributes " + + "from more than one child`, we need to cast the join to cross join by setting the" + + s" config ${SQLConf.CROSS_JOINS_ENABLED.key}=true") + j + } + } +} + /** * Combines two adjacent [[Limit]] operators into one, merging the * expressions into one single expression. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 5747509c54476..82a10254d846d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.IntegerType import org.apache.spark.unsafe.types.CalendarInterval diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 4758dcc125d23..fa14aa14ee968 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -995,8 +995,7 @@ class Dataset[T] private[sql]( // After the cloning, left and right side will have distinct expression ids. val plan = withPlan( Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr))) - .queryExecution.analyzed - val joinPlan = plan.collectFirst { case j: Join => j }.get + .queryExecution.analyzed.asInstanceOf[Join] // If auto self join alias is disabled, return the plan. if (!sparkSession.sessionState.conf.dataFrameSelfJoinAutoResolveAmbiguity) { @@ -1013,24 +1012,21 @@ class Dataset[T] private[sql]( // Otherwise, find the trivially true predicates and automatically resolves them to both sides. // By the time we get here, since we have already run analysis, all attributes should've been // resolved and become AttributeReference. - val cond = joinPlan.condition.map { _.transform { + val cond = plan.condition.map { _.transform { case catalyst.expressions.EqualTo(a: AttributeReference, b: AttributeReference) if a.sameRef(b) => catalyst.expressions.EqualTo( - withPlan(joinPlan.left).resolve(a.name), - withPlan(joinPlan.right).resolve(b.name)) + withPlan(plan.left).resolve(a.name), + withPlan(plan.right).resolve(b.name)) case catalyst.expressions.EqualNullSafe(a: AttributeReference, b: AttributeReference) if a.sameRef(b) => catalyst.expressions.EqualNullSafe( - withPlan(joinPlan.left).resolve(a.name), - withPlan(joinPlan.right).resolve(b.name)) + withPlan(plan.left).resolve(a.name), + withPlan(plan.right).resolve(b.name)) }} withPlan { - plan.transformDown { - case j: Join if j == joinPlan => - j.copy(condition = cond) - } + plan.copy(condition = cond) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala index 6e338a93c0072..5470d0fe6f982 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala @@ -107,16 +107,24 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { test("SPARK-25314: Python UDF refers to the attributes from more than one child " + "in join condition") { + def dummyPythonUDFTest(): Unit = { + val df = Seq(("Hello", 4)).toDF("a", "b") + val df2 = Seq(("Hello", 4)).toDF("c", "d") + val joinDF = df.join(df2, + dummyPythonUDF(col("a"), col("c")) === dummyPythonUDF(col("d"), col("c"))) + val qualifiedPlanNodes = joinDF.queryExecution.executedPlan.collect { + case b: BatchEvalPythonExec => b + } + assert(qualifiedPlanNodes.size == 1) + } + // Test without spark.sql.crossJoin.enabled set + val errMsg = intercept[AnalysisException] { + dummyPythonUDFTest() + } + assert(errMsg.getMessage.startsWith("Detected the join condition:")) // Test with spark.sql.crossJoin.enabled=true spark.conf.set("spark.sql.crossJoin.enabled", "true") - val df = Seq(("Hello", 4)).toDF("a", "b") - val df2 = Seq(("Hello", 4)).toDF("c", "d") - val joinDF = df.join(df2, - dummyPythonUDF(col("a"), col("c")) === dummyPythonUDF(col("d"), col("c"))) - val qualifiedPlanNodes = joinDF.queryExecution.executedPlan.collect { - case b: BatchEvalPythonExec => b - } - assert(qualifiedPlanNodes.size == 1) + dummyPythonUDFTest() } } From a2c8ddd250f77318453aa45e2dcec49b83001973 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 25 Sep 2018 16:31:23 +0800 Subject: [PATCH 15/24] style fix --- python/pyspark/sql/tests.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index bace282b945a9..ebd717e69b5e7 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -627,8 +627,10 @@ def test_udf_not_supported_in_join_condition(self): left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, b1=3, b2=1)]) f = udf(lambda a, b: a == b, BooleanType()) + def runWithJoinType(join_type, type_string): - with self.assertRaisesRegexp(AnalysisException, + with self.assertRaisesRegexp( + AnalysisException, 'Using PythonUDF.*%s is not supported.' % type_string): left.join(right, [f("a", "b"), left.a1 == right.b1], join_type).collect() runWithJoinType("full", "FullOuter") From 005bb3f2fbe0e3322b42892c25b8e6109879bc8a Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 25 Sep 2018 18:47:00 +0800 Subject: [PATCH 16/24] Address comments from Wenchen --- .../sql/catalyst/optimizer/Optimizer.scala | 53 ------------------ .../spark/sql/catalyst/optimizer/joins.scala | 55 +++++++++++++++++++ 2 files changed, 55 insertions(+), 53 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2b8f2cc5210d6..229ccabbe61bf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1263,59 +1263,6 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { } } -/** - * Correctly handle PythonUDF which need access both side of join side by changing the new join - * type to Cross. - */ -object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateHelper { - override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { - case j @ Join(_, _, joinType, condition) - if condition.map(splitConjunctivePredicates).getOrElse(Nil).exists( - _.collectFirst { case udf: PythonUDF => udf }.isDefined) => - if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) { - // The current strategy only support InnerLike and LeftSemi join because other type - // can not simply be resolved by adding a Cross join. If we pass the plan here, it'll - // still get a an invalid PythonUDF RuntimeException with message `requires attributes - // from more than one child`, we throw firstly here for better readable information. - throw new AnalysisException("Using PythonUDF in join condition of join type" + - s" $joinType is not supported.") - } - if (SQLConf.get.crossJoinEnabled) { - // if condition expression contains python udf, it will be moved out from - // the new join conditions, and the join type will be changed to CrossJoin. - logWarning(s"The join condition:$condition of the join plan contains " + - "PythonUDF, it will be moved out and the join plan will be " + - s"turned to cross join. This plan shows below:\n $j") - val (udf, rest) = - condition.map(splitConjunctivePredicates).get.partition( - _.collectFirst { case udf: PythonUDF => udf }.isDefined) - val newCondition = if (rest.isEmpty) { - Option.empty - } else { - Some(rest.reduceLeft(And)) - } - val newJoin = j.copy(joinType = Cross, condition = newCondition) - joinType match { - case _: InnerLike => - Filter(udf.reduceLeft(And), newJoin) - case LeftSemi => - Project( - j.left.output.map(_.toAttribute), Filter(udf.reduceLeft(And), newJoin)) - } - } else { - // if the crossJoinEnabled is false, a RuntimeException will be thrown later while - // the PythonUDF need to access both side of join, we throw firstly here for better - // readable information. - throw new AnalysisException(s"Detected the join condition:$condition of this join " + - "plan contains PythonUDF, if the PythonUDF need to access both side of join, " + - "it will get an invalid PythonUDF RuntimeException with message `requires attributes " + - "from more than one child`, we need to cast the join to cross join by setting the" + - s" config ${SQLConf.CROSS_JOINS_ENABLED.key}=true") - j - } - } -} - /** * Combines two adjacent [[Limit]] operators into one, merging the * expressions into one single expression. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index edbeaf273fd6f..3deb90ce06cbb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer import scala.annotation.tailrec +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins import org.apache.spark.sql.catalyst.plans._ @@ -152,3 +153,57 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) } } + +/** + * Correctly handle PythonUDF which need access both side of join side by changing the new join + * type to Cross. + */ +object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateHelper { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + case j @ Join(_, _, joinType, condition) + if condition.map(splitConjunctivePredicates).getOrElse(Nil).exists( + _.collectFirst { case udf: PythonUDF => udf }.isDefined) => + if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) { + // The current strategy only support InnerLike and LeftSemi join because for other type, + // it breaks SQL semantic if we run the join condition as a filter after join. If we pass + // the plan here, it'll still get a an invalid PythonUDF RuntimeException with message + // `requires attributes from more than one child`, we throw firstly here for better + // readable information. + throw new AnalysisException("Using PythonUDF in join condition of join type" + + s" $joinType is not supported.") + } + if (SQLConf.get.crossJoinEnabled) { + // if condition expression contains python udf, it will be moved out from + // the new join conditions, and the join type will be changed to CrossJoin. + logWarning(s"The join condition:$condition of the join plan contains " + + "PythonUDF, it will be moved out and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") + val (udf, rest) = + condition.map(splitConjunctivePredicates).get.partition( + _.collectFirst { case udf: PythonUDF => udf }.isDefined) + val newCondition = if (rest.isEmpty) { + Option.empty + } else { + Some(rest.reduceLeft(And)) + } + val newJoin = j.copy(joinType = Cross, condition = newCondition) + joinType match { + case _: InnerLike => + Filter(udf.reduceLeft(And), newJoin) + case LeftSemi => + Project( + j.left.output.map(_.toAttribute), Filter(udf.reduceLeft(And), newJoin)) + } + } else { + // if the crossJoinEnabled is false, a RuntimeException will be thrown later while + // the PythonUDF need to access both side of join, we throw firstly here for better + // readable information. + throw new AnalysisException(s"Detected the join condition:$condition of this join " + + "plan contains PythonUDF, if the PythonUDF need to access both side of join, " + + "it will get an invalid PythonUDF RuntimeException with message `requires attributes " + + "from more than one child`, we need to cast the join to cross join by setting the" + + s" config ${SQLConf.CROSS_JOINS_ENABLED.key}=true") + j + } + } +} From b0dfab31762244df387336fc4104b7c400cab3bc Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 25 Sep 2018 19:19:22 +0800 Subject: [PATCH 17/24] fix exhaustive match --- .../scala/org/apache/spark/sql/catalyst/optimizer/joins.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 3deb90ce06cbb..aa7fd0922bb7c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -193,6 +193,9 @@ object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateHe case LeftSemi => Project( j.left.output.map(_.toAttribute), Filter(udf.reduceLeft(And), newJoin)) + case _ => + throw new AnalysisException("Using PythonUDF in join condition of join type" + + s" $joinType is not supported.") } } else { // if the crossJoinEnabled is false, a RuntimeException will be thrown later while From 306fcb998123ae7dd0e31f7d8faac1bf8f06d8c8 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 26 Sep 2018 00:34:26 +0800 Subject: [PATCH 18/24] Move cross join detection logic into CheckCartesianProducts --- .../sql/catalyst/optimizer/Optimizer.scala | 23 ++++++++++++++++--- .../spark/sql/catalyst/optimizer/joins.scala | 20 +++++++--------- .../CheckCartesianProductsSuite.scala | 18 ++++++++++++++- 3 files changed, 45 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 229ccabbe61bf..1dfbca6d6d8f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -165,6 +165,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) Batch("LocalRelation", fixedPoint, ConvertToLocalRelation, PropagateEmptyRelation) :+ + Batch("Extract PythonUDF From JoinCondition", Once, + HandlePythonUDFInJoinCondition) :+ // The following batch should be executed after batch "Join Reorder" and "LocalRelation". Batch("Check Cartesian Products", Once, CheckCartesianProducts) :+ @@ -173,8 +175,6 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) ColumnPruning, CollapseProject, RemoveRedundantProject) :+ - Batch("Extract PythonUDF From JoinCondition", Once, - HandlePythonUDFInJoinCondition) :+ Batch("UpdateAttributeReferences", Once, UpdateNullabilityInAttributeReferences) } @@ -1307,10 +1307,27 @@ object CheckCartesianProducts extends Rule[LogicalPlan] with PredicateHelper { } } + /** + * Check if a join contains PythonUDF in join condition. + */ + def hasPythonUDFInJoinCondition(join: Join): Boolean = { + val conditions = join.condition.map(splitConjunctivePredicates).getOrElse(Nil) + conditions.exists(HandlePythonUDFInJoinCondition.hasPythonUDF) + } + def apply(plan: LogicalPlan): LogicalPlan = if (SQLConf.get.crossJoinEnabled) { plan } else plan transform { + case j @ Join(_, _, _, _) if hasPythonUDFInJoinCondition(j) => + // if the crossJoinEnabled is false, a RuntimeException will be thrown later while + // the PythonUDF need to access both side of join, we throw firstly here for better + // readable information. + throw new AnalysisException(s"Detected the join condition:${j.condition} of this join " + + "plan contains PythonUDF, if the PythonUDF need to access both side of join, " + + "it will get an invalid PythonUDF RuntimeException with message `requires attributes " + + "from more than one child`, we need to cast the join to cross join by setting the" + + s" configuration variable ${SQLConf.CROSS_JOINS_ENABLED.key}=true") case j @ Join(left, right, Inner | LeftOuter | RightOuter | FullOuter, _) if isCartesianProduct(j) => throw new AnalysisException( @@ -1321,7 +1338,7 @@ object CheckCartesianProducts extends Rule[LogicalPlan] with PredicateHelper { |Join condition is missing or trivial. |Either: use the CROSS JOIN syntax to allow cartesian products between these |relations, or: enable implicit cartesian products by setting the configuration - |variable spark.sql.crossJoin.enabled=true""" + |variable ${SQLConf.CROSS_JOINS_ENABLED.key}=true""" .stripMargin) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index aa7fd0922bb7c..62eb7987be6b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -159,10 +159,13 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { * type to Cross. */ object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateHelper { + def hasPythonUDF(expression: Expression): Boolean = { + expression.collectFirst { case udf: PythonUDF => udf }.isDefined + } + override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case j @ Join(_, _, joinType, condition) - if condition.map(splitConjunctivePredicates).getOrElse(Nil).exists( - _.collectFirst { case udf: PythonUDF => udf }.isDefined) => + if condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) => if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) { // The current strategy only support InnerLike and LeftSemi join because for other type, // it breaks SQL semantic if we run the join condition as a filter after join. If we pass @@ -179,8 +182,7 @@ object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateHe "PythonUDF, it will be moved out and the join plan will be " + s"turned to cross join. This plan shows below:\n $j") val (udf, rest) = - condition.map(splitConjunctivePredicates).get.partition( - _.collectFirst { case udf: PythonUDF => udf }.isDefined) + condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF) val newCondition = if (rest.isEmpty) { Option.empty } else { @@ -198,14 +200,8 @@ object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateHe s" $joinType is not supported.") } } else { - // if the crossJoinEnabled is false, a RuntimeException will be thrown later while - // the PythonUDF need to access both side of join, we throw firstly here for better - // readable information. - throw new AnalysisException(s"Detected the join condition:$condition of this join " + - "plan contains PythonUDF, if the PythonUDF need to access both side of join, " + - "it will get an invalid PythonUDF RuntimeException with message `requires attributes " + - "from more than one child`, we need to cast the join to cross join by setting the" + - s" config ${SQLConf.CROSS_JOINS_ENABLED.key}=true") + // Just pass through the original join, the checking for crossJoinEnabled will be done + // later in CheckCartesianProducts. j } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CheckCartesianProductsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CheckCartesianProductsSuite.scala index 788fedb3c8e8e..81d915d4b124c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CheckCartesianProductsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CheckCartesianProductsSuite.scala @@ -19,14 +19,16 @@ package org.apache.spark.sql.catalyst.optimizer import org.scalatest.Matchers._ +import org.apache.spark.api.python.PythonEvalType import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{And, Expression, PythonUDF} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.internal.SQLConf.CROSS_JOINS_ENABLED +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} class CheckCartesianProductsSuite extends PlanTest { @@ -81,6 +83,20 @@ class CheckCartesianProductsSuite extends PlanTest { } } + test("CheckCartesianProducts throws an exception for PythonUDF exists in join condition") { + val pythonUdf = PythonUDF("pyUDF", null, + StructType(Seq(StructField("a", IntegerType))), + Seq.empty, + PythonEvalType.SQL_BATCHED_UDF, + udfDeterministic = true) + withSQLConf(CROSS_JOINS_ENABLED.key -> "false") { + val thrownException = the [AnalysisException] thrownBy { + performCartesianProductCheck(Inner, Some(Seq(pythonUdf, 'a === 1).reduceLeft(And))) + } + assert(thrownException.message.contains("this join plan contains PythonUDF")) + } + } + private def performCartesianProductCheck( joinType: JoinType, condition: Option[Expression] = None): Unit = { From 98cd3cc52346842201eaa11f3c9b757f42c47215 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 26 Sep 2018 13:56:56 +0800 Subject: [PATCH 19/24] Address comment --- python/pyspark/sql/tests.py | 12 ++++-------- .../sql/catalyst/optimizer/Optimizer.scala | 18 ++++++------------ .../spark/sql/catalyst/optimizer/joins.scala | 4 ++-- 3 files changed, 12 insertions(+), 22 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index ebd717e69b5e7..a2b5ffe94c151 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -574,8 +574,7 @@ def test_udf_in_left_semi_join_condition(self): def test_udf_and_filter_in_join_condition(self): # regression test for SPARK-25314 - # test the complex scenario with both udf(non-deterministic) - # and normal filter(deterministic) + # test the complex scenario with both udf and normal filter from pyspark.sql.functions import udf left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, b1=1, b2=2)]) @@ -586,8 +585,7 @@ def test_udf_and_filter_in_join_condition(self): def test_udf_and_filter_in_left_semi_join_condition(self): # regression test for SPARK-25314 - # test the complex scenario with both udf(non-deterministic) - # and normal filter(deterministic) + # test the complex scenario with both udf and normal filter from pyspark.sql.functions import udf left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, b1=1, b2=2)]) @@ -598,8 +596,7 @@ def test_udf_and_filter_in_left_semi_join_condition(self): def test_udf_and_common_filter_in_join_condition(self): # regression test for SPARK-25314 - # test the complex scenario with both udf(non-deterministic) - # and common filter(deterministic) + # test the complex scenario with both udf and common filter from pyspark.sql.functions import udf left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, b1=3, b2=1)]) @@ -610,8 +607,7 @@ def test_udf_and_common_filter_in_join_condition(self): def test_udf_and_common_filter_in_left_semi_join_condition(self): # regression test for SPARK-25314 - # test the complex scenario with both udf(non-deterministic) - # and common filter(deterministic) + # test the complex scenario with both udf and common filter from pyspark.sql.functions import udf left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, b1=3, b2=1)]) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 1dfbca6d6d8f2..523bd56ea0e02 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -166,8 +166,9 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) ConvertToLocalRelation, PropagateEmptyRelation) :+ Batch("Extract PythonUDF From JoinCondition", Once, - HandlePythonUDFInJoinCondition) :+ - // The following batch should be executed after batch "Join Reorder" and "LocalRelation". + PullOutPythonUDFInJoinCondition) :+ + // The following batch should be executed after batch "Join Reorder" "LocalRelation" and + // "Extract PythonUDF From JoinCondition". Batch("Check Cartesian Products", Once, CheckCartesianProducts) :+ Batch("RewriteSubquery", Once, @@ -205,7 +206,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) PullupCorrelatedPredicates.ruleName :: RewriteCorrelatedScalarSubquery.ruleName :: RewritePredicateSubquery.ruleName :: - HandlePythonUDFInJoinCondition.ruleName :: Nil + PullOutPythonUDFInJoinCondition.ruleName :: Nil /** * Optimize all the subqueries inside expression. @@ -1307,19 +1308,12 @@ object CheckCartesianProducts extends Rule[LogicalPlan] with PredicateHelper { } } - /** - * Check if a join contains PythonUDF in join condition. - */ - def hasPythonUDFInJoinCondition(join: Join): Boolean = { - val conditions = join.condition.map(splitConjunctivePredicates).getOrElse(Nil) - conditions.exists(HandlePythonUDFInJoinCondition.hasPythonUDF) - } - def apply(plan: LogicalPlan): LogicalPlan = if (SQLConf.get.crossJoinEnabled) { plan } else plan transform { - case j @ Join(_, _, _, _) if hasPythonUDFInJoinCondition(j) => + case j @ Join(_, _, _, condition) + if condition.isDefined && PullOutPythonUDFInJoinCondition.hasPythonUDF(condition.get) => // if the crossJoinEnabled is false, a RuntimeException will be thrown later while // the PythonUDF need to access both side of join, we throw firstly here for better // readable information. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 62eb7987be6b1..9049bf7980eff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -158,14 +158,14 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { * Correctly handle PythonUDF which need access both side of join side by changing the new join * type to Cross. */ -object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateHelper { +object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateHelper { def hasPythonUDF(expression: Expression): Boolean = { expression.collectFirst { case udf: PythonUDF => udf }.isDefined } override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case j @ Join(_, _, joinType, condition) - if condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) => + if condition.isDefined && hasPythonUDF(condition.get) => if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) { // The current strategy only support InnerLike and LeftSemi join because for other type, // it breaks SQL semantic if we run the join condition as a filter after join. If we pass From d1db33a97cae25b892d47f9b983ff3fb35c2d253 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 26 Sep 2018 16:49:59 +0800 Subject: [PATCH 20/24] Revert the changes of original plan and enhance UT --- python/pyspark/sql/tests.py | 16 ++++-- .../sql/catalyst/optimizer/Optimizer.scala | 12 +---- .../spark/sql/catalyst/optimizer/joins.scala | 49 +++++++++---------- .../CheckCartesianProductsSuite.scala | 18 +------ .../python/BatchEvalPythonExecSuite.scala | 2 +- 5 files changed, 37 insertions(+), 60 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index a2b5ffe94c151..dcb432e53df21 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -559,6 +559,8 @@ def test_udf_in_join_condition(self): right = self.spark.createDataFrame([Row(b=1)]) f = udf(lambda a, b: a == b, BooleanType()) df = left.join(right, f("a", "b")) + with self.assertRaisesRegexp(AnalysisException, 'Detected implicit cartesian product'): + df.collect() with self.sql_conf({"spark.sql.crossJoin.enabled": True}): self.assertEqual(df.collect(), [Row(a=1, b=1)]) @@ -569,6 +571,8 @@ def test_udf_in_left_semi_join_condition(self): right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1)]) f = udf(lambda a, b: a == b, BooleanType()) df = left.join(right, f("a", "b"), "leftsemi") + with self.assertRaisesRegexp(AnalysisException, 'Detected implicit cartesian product'): + df.collect() with self.sql_conf({"spark.sql.crossJoin.enabled": True}): self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) @@ -580,6 +584,8 @@ def test_udf_and_filter_in_join_condition(self): right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, b1=1, b2=2)]) f = udf(lambda a, b: a == b, BooleanType()) df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2]) + with self.assertRaisesRegexp(AnalysisException, 'Detected implicit cartesian product'): + df.collect() with self.sql_conf({"spark.sql.crossJoin.enabled": True}): self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=2, b1=1, b2=2)]) @@ -591,6 +597,8 @@ def test_udf_and_filter_in_left_semi_join_condition(self): right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, b1=1, b2=2)]) f = udf(lambda a, b: a == b, BooleanType()) df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2], "left_semi") + with self.assertRaisesRegexp(AnalysisException, 'Detected implicit cartesian product'): + df.collect() with self.sql_conf({"spark.sql.crossJoin.enabled": True}): self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) @@ -602,8 +610,8 @@ def test_udf_and_common_filter_in_join_condition(self): right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, b1=3, b2=1)]) f = udf(lambda a, b: a == b, BooleanType()) df = left.join(right, [f("a", "b"), left.a1 == right.b1]) - with self.sql_conf({"spark.sql.crossJoin.enabled": True}): - self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=1, b1=1, b2=1)]) + # do not need spark.sql.crossJoin.enabled=true for udf is not the only join condition. + self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=1, b1=1, b2=1)]) def test_udf_and_common_filter_in_left_semi_join_condition(self): # regression test for SPARK-25314 @@ -613,8 +621,8 @@ def test_udf_and_common_filter_in_left_semi_join_condition(self): right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, b1=3, b2=1)]) f = udf(lambda a, b: a == b, BooleanType()) df = left.join(right, [f("a", "b"), left.a1 == right.b1], "left_semi") - with self.sql_conf({"spark.sql.crossJoin.enabled": True}): - self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) + # do not need spark.sql.crossJoin.enabled=true for udf is not the only join condition. + self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) def test_udf_not_supported_in_join_condition(self): # regression test for SPARK-25314 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 523bd56ea0e02..df6eb9a848b13 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1312,16 +1312,6 @@ object CheckCartesianProducts extends Rule[LogicalPlan] with PredicateHelper { if (SQLConf.get.crossJoinEnabled) { plan } else plan transform { - case j @ Join(_, _, _, condition) - if condition.isDefined && PullOutPythonUDFInJoinCondition.hasPythonUDF(condition.get) => - // if the crossJoinEnabled is false, a RuntimeException will be thrown later while - // the PythonUDF need to access both side of join, we throw firstly here for better - // readable information. - throw new AnalysisException(s"Detected the join condition:${j.condition} of this join " + - "plan contains PythonUDF, if the PythonUDF need to access both side of join, " + - "it will get an invalid PythonUDF RuntimeException with message `requires attributes " + - "from more than one child`, we need to cast the join to cross join by setting the" + - s" configuration variable ${SQLConf.CROSS_JOINS_ENABLED.key}=true") case j @ Join(left, right, Inner | LeftOuter | RightOuter | FullOuter, _) if isCartesianProduct(j) => throw new AnalysisException( @@ -1332,7 +1322,7 @@ object CheckCartesianProducts extends Rule[LogicalPlan] with PredicateHelper { |Join condition is missing or trivial. |Either: use the CROSS JOIN syntax to allow cartesian products between these |relations, or: enable implicit cartesian products by setting the configuration - |variable ${SQLConf.CROSS_JOINS_ENABLED.key}=true""" + |variable spark.sql.crossJoin.enabled=true""" .stripMargin) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 9049bf7980eff..27e7f3a5f8769 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -175,34 +175,29 @@ object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateH throw new AnalysisException("Using PythonUDF in join condition of join type" + s" $joinType is not supported.") } - if (SQLConf.get.crossJoinEnabled) { - // if condition expression contains python udf, it will be moved out from - // the new join conditions, and the join type will be changed to CrossJoin. - logWarning(s"The join condition:$condition of the join plan contains " + - "PythonUDF, it will be moved out and the join plan will be " + - s"turned to cross join. This plan shows below:\n $j") - val (udf, rest) = - condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF) - val newCondition = if (rest.isEmpty) { - Option.empty - } else { - Some(rest.reduceLeft(And)) - } - val newJoin = j.copy(joinType = Cross, condition = newCondition) - joinType match { - case _: InnerLike => - Filter(udf.reduceLeft(And), newJoin) - case LeftSemi => - Project( - j.left.output.map(_.toAttribute), Filter(udf.reduceLeft(And), newJoin)) - case _ => - throw new AnalysisException("Using PythonUDF in join condition of join type" + - s" $joinType is not supported.") - } + // if condition expression contains python udf, it will be moved out from + // the new join conditions, and the join type will be changed to CrossJoin. + logWarning(s"The join condition:$condition of the join plan contains " + + "PythonUDF, it will be moved out and the join plan will be turned to cross " + + s"join when its the only condition. This plan shows below:\n $j") + val (udf, rest) = + condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF) + val newCondition = if (rest.isEmpty) { + Option.empty } else { - // Just pass through the original join, the checking for crossJoinEnabled will be done - // later in CheckCartesianProducts. - j + Some(rest.reduceLeft(And)) + } + val newJoin = j.copy(condition = newCondition) + joinType match { + case _: InnerLike => + Filter(udf.reduceLeft(And), newJoin) + case LeftSemi => + Project( + j.left.output.map(_.toAttribute), + Filter(udf.reduceLeft(And), newJoin.copy(joinType = Inner))) + case _ => + throw new AnalysisException("Using PythonUDF in join condition of join type" + + s" $joinType is not supported.") } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CheckCartesianProductsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CheckCartesianProductsSuite.scala index 81d915d4b124c..788fedb3c8e8e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CheckCartesianProductsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CheckCartesianProductsSuite.scala @@ -19,16 +19,14 @@ package org.apache.spark.sql.catalyst.optimizer import org.scalatest.Matchers._ -import org.apache.spark.api.python.PythonEvalType import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{And, Expression, PythonUDF} +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.internal.SQLConf.CROSS_JOINS_ENABLED -import org.apache.spark.sql.types.{IntegerType, StructField, StructType} class CheckCartesianProductsSuite extends PlanTest { @@ -83,20 +81,6 @@ class CheckCartesianProductsSuite extends PlanTest { } } - test("CheckCartesianProducts throws an exception for PythonUDF exists in join condition") { - val pythonUdf = PythonUDF("pyUDF", null, - StructType(Seq(StructField("a", IntegerType))), - Seq.empty, - PythonEvalType.SQL_BATCHED_UDF, - udfDeterministic = true) - withSQLConf(CROSS_JOINS_ENABLED.key -> "false") { - val thrownException = the [AnalysisException] thrownBy { - performCartesianProductCheck(Inner, Some(Seq(pythonUdf, 'a === 1).reduceLeft(And))) - } - assert(thrownException.message.contains("this join plan contains PythonUDF")) - } - } - private def performCartesianProductCheck( joinType: JoinType, condition: Option[Expression] = None): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala index 5470d0fe6f982..4a24898c0073d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala @@ -121,7 +121,7 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { val errMsg = intercept[AnalysisException] { dummyPythonUDFTest() } - assert(errMsg.getMessage.startsWith("Detected the join condition:")) + assert(errMsg.getMessage.startsWith("Detected implicit cartesian product")) // Test with spark.sql.crossJoin.enabled=true spark.conf.set("spark.sql.crossJoin.enabled", "true") dummyPythonUDFTest() From 87f0f50a47cc2804f908b96fd1d3a5cef9d747be Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 26 Sep 2018 21:41:45 +0800 Subject: [PATCH 21/24] Address comments --- .../spark/sql/catalyst/optimizer/joins.scala | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 27e7f3a5f8769..e6b4cfd020e8b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -155,8 +155,10 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { } /** - * Correctly handle PythonUDF which need access both side of join side by changing the new join - * type to Cross. + * PythonUDF in join condition can not be evaluated, this rule will detect the PythonUDF + * and pull them out from join condition. For python udf accessing attributes from only one side, + * they would be pushed down by operation push down rules. If not(e.g. user disables filter push + * down rules), we need to pull them out in this rule too. */ object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateHelper { def hasPythonUDF(expression: Expression): Boolean = { @@ -175,22 +177,22 @@ object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateH throw new AnalysisException("Using PythonUDF in join condition of join type" + s" $joinType is not supported.") } - // if condition expression contains python udf, it will be moved out from - // the new join conditions, and the join type will be changed to CrossJoin. - logWarning(s"The join condition:$condition of the join plan contains " + - "PythonUDF, it will be moved out and the join plan will be turned to cross " + - s"join when its the only condition. This plan shows below:\n $j") + // If condition expression contains python udf, it will be moved out from + // the new join conditions. If join condition has python udf only, it will be turned + // to cross join and the crossJoinEnable will be checked in CheckCartesianProducts. val (udf, rest) = condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF) val newCondition = if (rest.isEmpty) { - Option.empty + logWarning(s"The join condition:$condition of the join plan contains " + + "PythonUDF only, it will be moved out and the join plan will be turned to cross " + + s"join. This plan shows below:\n $j") + None } else { Some(rest.reduceLeft(And)) } val newJoin = j.copy(condition = newCondition) joinType match { - case _: InnerLike => - Filter(udf.reduceLeft(And), newJoin) + case _: InnerLike => Filter(udf.reduceLeft(And), newJoin) case LeftSemi => Project( j.left.output.map(_.toAttribute), From d2739af8e1074b9eb1c32f27cf85f884859ca7f6 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 26 Sep 2018 22:02:52 +0800 Subject: [PATCH 22/24] Address comments from Marco --- .../spark/sql/catalyst/optimizer/joins.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index e6b4cfd020e8b..7149edee0173e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -157,7 +157,7 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { /** * PythonUDF in join condition can not be evaluated, this rule will detect the PythonUDF * and pull them out from join condition. For python udf accessing attributes from only one side, - * they would be pushed down by operation push down rules. If not(e.g. user disables filter push + * they are pushed down by operation push down rules. If not (e.g. user disables filter push * down rules), we need to pull them out in this rule too. */ object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateHelper { @@ -178,14 +178,12 @@ object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateH s" $joinType is not supported.") } // If condition expression contains python udf, it will be moved out from - // the new join conditions. If join condition has python udf only, it will be turned - // to cross join and the crossJoinEnable will be checked in CheckCartesianProducts. + // the new join conditions. val (udf, rest) = - condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF) + splitConjunctivePredicates(condition.get).partition(hasPythonUDF) val newCondition = if (rest.isEmpty) { - logWarning(s"The join condition:$condition of the join plan contains " + - "PythonUDF only, it will be moved out and the join plan will be turned to cross " + - s"join. This plan shows below:\n $j") + logWarning(s"The join condition:$condition of the join plan contains PythonUDF only," + + s" it will be moved out and the join plan will be turned to cross join.") None } else { Some(rest.reduceLeft(And)) @@ -196,7 +194,7 @@ object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateH case LeftSemi => Project( j.left.output.map(_.toAttribute), - Filter(udf.reduceLeft(And), newJoin.copy(joinType = Inner))) + Filter(udf.reduceLeft(And), newJoin.copy(joinType = Inner))) case _ => throw new AnalysisException("Using PythonUDF in join condition of join type" + s" $joinType is not supported.") From 7f6695421f875f593970df071f151306b095a5b8 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 27 Sep 2018 00:01:41 +0800 Subject: [PATCH 23/24] Address comment --- .../sql/execution/python/BatchEvalPythonExecSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala index 4a24898c0073d..7046f35a7a166 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, GreaterThan, In} import org.apache.spark.sql.execution.{FilterExec, InputAdapter, SparkPlanTest, WholeStageCodegenExec} import org.apache.spark.sql.functions.col +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.BooleanType @@ -123,8 +124,9 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { } assert(errMsg.getMessage.startsWith("Detected implicit cartesian product")) // Test with spark.sql.crossJoin.enabled=true - spark.conf.set("spark.sql.crossJoin.enabled", "true") - dummyPythonUDFTest() + withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { + dummyPythonUDFTest() + } } } From 2b6977de4a3b3489b9c2172a6a8a39831bf1d048 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 27 Sep 2018 10:25:00 +0800 Subject: [PATCH 24/24] Delete unnecessary end-to-end tests --- python/pyspark/sql/tests.py | 26 ----------------- .../python/BatchEvalPythonExecSuite.scala | 28 ------------------- 2 files changed, 54 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index dcb432e53df21..aa2df0e4b68d4 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -576,32 +576,6 @@ def test_udf_in_left_semi_join_condition(self): with self.sql_conf({"spark.sql.crossJoin.enabled": True}): self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) - def test_udf_and_filter_in_join_condition(self): - # regression test for SPARK-25314 - # test the complex scenario with both udf and normal filter - from pyspark.sql.functions import udf - left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) - right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, b1=1, b2=2)]) - f = udf(lambda a, b: a == b, BooleanType()) - df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2]) - with self.assertRaisesRegexp(AnalysisException, 'Detected implicit cartesian product'): - df.collect() - with self.sql_conf({"spark.sql.crossJoin.enabled": True}): - self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=2, b1=1, b2=2)]) - - def test_udf_and_filter_in_left_semi_join_condition(self): - # regression test for SPARK-25314 - # test the complex scenario with both udf and normal filter - from pyspark.sql.functions import udf - left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) - right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, b1=1, b2=2)]) - f = udf(lambda a, b: a == b, BooleanType()) - df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2], "left_semi") - with self.assertRaisesRegexp(AnalysisException, 'Detected implicit cartesian product'): - df.collect() - with self.sql_conf({"spark.sql.crossJoin.enabled": True}): - self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) - def test_udf_and_common_filter_in_join_condition(self): # regression test for SPARK-25314 # test the complex scenario with both udf and common filter diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala index 7046f35a7a166..289cc667a1c66 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala @@ -21,12 +21,9 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.spark.api.python.{PythonEvalType, PythonFunction} -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, GreaterThan, In} import org.apache.spark.sql.execution.{FilterExec, InputAdapter, SparkPlanTest, WholeStageCodegenExec} -import org.apache.spark.sql.functions.col -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.BooleanType @@ -34,8 +31,6 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { import testImplicits.newProductEncoder import testImplicits.localSeqToDatasetHolder - val dummyPythonUDF = new MyDummyPythonUDF - override def beforeAll(): Unit = { super.beforeAll() spark.udf.registerPython("dummyPythonUDF", new MyDummyPythonUDF) @@ -105,29 +100,6 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { } assert(qualifiedPlanNodes.size == 1) } - - test("SPARK-25314: Python UDF refers to the attributes from more than one child " + - "in join condition") { - def dummyPythonUDFTest(): Unit = { - val df = Seq(("Hello", 4)).toDF("a", "b") - val df2 = Seq(("Hello", 4)).toDF("c", "d") - val joinDF = df.join(df2, - dummyPythonUDF(col("a"), col("c")) === dummyPythonUDF(col("d"), col("c"))) - val qualifiedPlanNodes = joinDF.queryExecution.executedPlan.collect { - case b: BatchEvalPythonExec => b - } - assert(qualifiedPlanNodes.size == 1) - } - // Test without spark.sql.crossJoin.enabled set - val errMsg = intercept[AnalysisException] { - dummyPythonUDFTest() - } - assert(errMsg.getMessage.startsWith("Detected implicit cartesian product")) - // Test with spark.sql.crossJoin.enabled=true - withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { - dummyPythonUDFTest() - } - } } // This Python UDF is dummy and just for testing. Unable to execute.