Skip to content

Commit

Permalink
[SPARK-37716][SQL] Improve error messages when a LateralJoin has non-…
Browse files Browse the repository at this point in the history
…deterministic expressions

### What changes were proposed in this pull request?
This PR allows the LateralJoin node's lateral subquery field to host non-deterministic expressions when the outer relation can produce at most one row. It also improves the error messages when a lateral join contains non-deterministic expressions that are not currently supported.

### Why are the changes needed?
SPARK-37199 changes PlanExpression's `deterministic` field definition: both the children and the plan itself have to be deterministic for the plan expression to be deterministic. So users can no longer use lateral join with non-deterministic lateral subqueries. This PR is to improve the error messages and allows a special case when the outer query only produces at most one row.

### Does this PR introduce _any_ user-facing change?
Yes. Improve error messages:
Before:
```
org.apache.spark.sql.AnalysisException: nondeterministic expressions are only allowed in
Project, Filter, Aggregate or Window
```
After
```
Non-deterministic lateral subqueries are not supported when joining with outer relations that produce more than one row
-- Or
Lateral join condition cannot be non-deterministic:
```
### How was this patch tested?
SQL query tests.

Closes #34987 from allisonwang-db/spark-37716-lateral-join.

Authored-by: allisonwang-db <allison.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
allisonwang-db authored and cloud-fan committed Dec 27, 2021
1 parent 780cf3c commit 67c39a0
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 2 deletions.
Expand Up @@ -539,7 +539,9 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {

case o if o.expressions.exists(!_.deterministic) &&
!o.isInstanceOf[Project] && !o.isInstanceOf[Filter] &&
!o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] =>
!o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] &&
// Lateral join is checked in checkSubqueryExpression.
!o.isInstanceOf[LateralJoin] =>
// The rule above is used to check Aggregate operator.
failAnalysis(
s"""nondeterministic expressions are only allowed in
Expand Down Expand Up @@ -752,6 +754,19 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {

case _: LateralSubquery =>
assert(plan.isInstanceOf[LateralJoin])
val join = plan.asInstanceOf[LateralJoin]
// A lateral join with a multi-row outer query and a non-deterministic lateral subquery
// cannot be decorrelated. Otherwise it may produce incorrect results.
if (!expr.deterministic && !join.left.maxRows.exists(_ <= 1)) {
expr.failAnalysis(
s"Non-deterministic lateral subqueries are not supported when joining with " +
s"outer relations that produce more than one row\n${expr.plan}")
}
// Check if the lateral join's join condition is deterministic.
if (join.condition.exists(!_.deterministic)) {
join.failAnalysis(
s"Lateral join condition cannot be non-deterministic: ${join.condition.get.sql}")
}
// Validate to make sure the correlations appearing in the query are valid and
// allowed by spark.
checkCorrelationsInSubquery(expr.plan, isScalarOrLateral = true)
Expand Down
10 changes: 10 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql
Expand Up @@ -49,6 +49,16 @@ SELECT * FROM t1 JOIN t2 JOIN LATERAL (SELECT t1.c2 + t2.c2);
-- expect error: cannot resolve `t2.c1`
SELECT * FROM t1 JOIN LATERAL (SELECT t1.c1 AS a, t2.c1 AS b) s JOIN t2 ON s.b = t2.c1;

-- SPARK-37716: lateral join with non-deterministic expressions.
-- non-deterministic lateral subquery with single row relation.
SELECT x FROM VALUES (0) t(x) JOIN LATERAL (SELECT x + rand(0) AS y);
SELECT x FROM (SELECT SUM(c1) AS x FROM t1), LATERAL (SELECT x + rand(0) AS y);
-- expect error: lateral subquery must be deterministic when joining with a multi-row relation.
SELECT * FROM t1, LATERAL (SELECT c1 + c2 + rand(0) AS c3);
SELECT * FROM t1, LATERAL (SELECT rand(0) FROM t2);
-- expect error: lateral join cannot have non-deterministic join condition.
SELECT * FROM t1 JOIN LATERAL (SELECT * FROM t2) s ON t1.c1 + rand(0) = s.c1;

-- multiple lateral joins
SELECT * FROM t1,
LATERAL (SELECT c1 + c2 AS a),
Expand Down
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 66
-- Number of queries: 71


-- !query
Expand Down Expand Up @@ -272,6 +272,60 @@ org.apache.spark.sql.AnalysisException
Column 't2.c1' does not exist. Did you mean one of the following? []; line 1 pos 50


-- !query
SELECT x FROM VALUES (0) t(x) JOIN LATERAL (SELECT x + rand(0) AS y)
-- !query schema
struct<x:int>
-- !query output
0


-- !query
SELECT x FROM (SELECT SUM(c1) AS x FROM t1), LATERAL (SELECT x + rand(0) AS y)
-- !query schema
struct<x:bigint>
-- !query output
1


-- !query
SELECT * FROM t1, LATERAL (SELECT c1 + c2 + rand(0) AS c3)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Non-deterministic lateral subqueries are not supported when joining with outer relations that produce more than one row
SubqueryAlias __auto_generated_subquery_name
+- Project [(cast((outer(c1#x) + outer(c2#x)) as double) + rand(0)) AS c3#x]
+- OneRowRelation
; line 1 pos 9


-- !query
SELECT * FROM t1, LATERAL (SELECT rand(0) FROM t2)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Non-deterministic lateral subqueries are not supported when joining with outer relations that produce more than one row
SubqueryAlias __auto_generated_subquery_name
+- Project [rand(0) AS rand(0)#x]
+- SubqueryAlias spark_catalog.default.t2
+- View (`default`.`t2`, [c1#x,c2#x])
+- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]
+- LocalRelation [col1#x, col2#x]
; line 1 pos 9


-- !query
SELECT * FROM t1 JOIN LATERAL (SELECT * FROM t2) s ON t1.c1 + rand(0) = s.c1
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Lateral join condition cannot be non-deterministic: ((CAST(spark_catalog.default.t1.c1 AS DOUBLE) + rand(0)) = CAST(s.c1 AS DOUBLE)); line 1 pos 17


-- !query
SELECT * FROM t1,
LATERAL (SELECT c1 + c2 AS a),
Expand Down

0 comments on commit 67c39a0

Please sign in to comment.