-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20334][SQL] Return a better error message when correlated predicates contain aggregate expression that has mixture of outer and local references. #17636
Conversation
…s contain aggregate expression that has mixture of outer and local references
Test build #75792 has started for PR 17636 at commit |
|Aggregate expression: ${a.sql} | ||
|Outer references: ${outer.map(_.sql).mkString(", ")} | ||
|Local references: ${local.map(_.sql).mkString(", ")} | ||
""". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Thanks. fixed.
@@ -1305,6 +1329,8 @@ class Analyzer( | |||
case _: EqualTo | _: EqualNullSafe => false | |||
case _ => true | |||
} | |||
|
|||
correlated.foreach(checkMixedReferencesInsideAggregation(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: correlated.foreach(checkMixedReferencesInsideAggregation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Thanks. fixed.
|outer and local references, which is not supported yet. | ||
|Aggregate expression: ${a.sql} | ||
|Outer references: ${outer.map(_.sql).mkString(", ")} | ||
|Local references: ${local.map(_.sql).mkString(", ")} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comma at each line
|Aggregate expression: ${a.sql},
|Outer references: ${outer.map(_.sql).mkString(", ")},
|Local references: ${local.map(_.sql).mkString(", ")}.
|Aggregate expression: ${a.sql} | ||
|Outer references: ${outer.map(_.sql).mkString(", ")} | ||
|Local references: ${local.map(_.sql).mkString(", ")} | ||
""".stripMargin.replace("\n", " ").trim() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do not add comma, we can remove .replace("\n", " ").trim()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile It will be too many lines sean ? I think adding comma is better ?
Test build #75794 has started for PR 17636 at commit |
@@ -1305,6 +1328,8 @@ class Analyzer( | |||
case _: EqualTo | _: EqualNullSafe => false | |||
case _ => true | |||
} | |||
|
|||
correlated.foreach(checkMixedReferencesInsideAggregation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
failOnOuterReference
should be the single entry error checking for outer references.
Filter
is special. We can still call failOnOuterReference(f)
, but do the extra checking for Filter
in failOnOuterReference
.
Look good to me except the above comments. |
The error message |
@@ -1219,7 +1241,8 @@ class Analyzer( | |||
|
|||
// Make sure a plan's expressions do not contain outer references | |||
def failOnOuterReference(p: LogicalPlan): Unit = { | |||
if (p.expressions.exists(containsOuter)) { | |||
p.expressions.foreach(checkMixedReferencesInsideAggregation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
p
is not definitely an Aggregate
, so I think checkMixedReferencesInsideAggregation
will not only apply to aggregate expression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya checkMixedReferencesInsideAggregation only looks for AggregateExpression ? For other types its just a pass-through.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. got it. :-) Btw, Filter
can also have aggregate expression here.
@@ -1219,7 +1241,8 @@ class Analyzer( | |||
|
|||
// Make sure a plan's expressions do not contain outer references | |||
def failOnOuterReference(p: LogicalPlan): Unit = { | |||
if (p.expressions.exists(containsOuter)) { | |||
p.expressions.foreach(checkMixedReferencesInsideAggregation) | |||
if (!p.isInstanceOf[Filter] && p.expressions.exists(containsOuter)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this new condition p.isInstanceOf[Filter]
just a safe network? Actually we won't use failOnOuterReference
on Filter
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya In the new code, we are calling failOnOuterReference on Filter based on @gatorsmile's comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh. I found it.
@@ -1,42 +1,72 @@ | |||
-- The test file contains negative test cases | |||
-- of invalid queries where error messages are expected. | |||
|
|||
create temporary view t1 as select * from values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those just change for case, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya Yeah.. since i was on this test case, thought i should fix the case.
@@ -1219,7 +1241,8 @@ class Analyzer( | |||
|
|||
// Make sure a plan's expressions do not contain outer references | |||
def failOnOuterReference(p: LogicalPlan): Unit = { | |||
if (p.expressions.exists(containsOuter)) { | |||
p.expressions.foreach(checkMixedReferencesInsideAggregation) | |||
if (!p.isInstanceOf[Filter] && p.expressions.exists(containsOuter)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should put the check of containsOuter
before checkMixedReferencesInsideAggregation
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya This is by choice. So we want to return the same error for TC.01.03 and TC.01.04. Currently we return a "..not supported outside of WHERE/HAVING" error for TC.01.04 which is misleading.
@@ -1210,6 +1210,28 @@ class Analyzer( | |||
private def checkAndGetOuterReferences(sub: LogicalPlan): Seq[Expression] = { | |||
val outerReferences = ArrayBuffer.empty[Expression] | |||
|
|||
// Validate that correlated aggregate expression do not contain a mixture | |||
// of outer and local references. | |||
def checkMixedReferencesInsideAggregation(expr: Expression): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: checkMixedReferencesInsideAggregation -> checkMixedReferencesInsideAggregationExpr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya Will change it to checkMixedReferencesInsideAggregateExpr
@@ -1219,7 +1241,8 @@ class Analyzer( | |||
|
|||
// Make sure a plan's expressions do not contain outer references |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should change this comment accordingly too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya Ok. will do.
@@ -1219,7 +1241,8 @@ class Analyzer( | |||
|
|||
// Make sure a plan's expressions do not contain outer references | |||
def failOnOuterReference(p: LogicalPlan): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
failOnOuterReference -> failOnOuterOrMixedReference? Or is it fine to keep it unchanged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya ok, will change it to failOnInvalidOuterReference. That should include both the cases.
LGTM except for few minor comments. |
Test build #75798 has finished for PR 17636 at commit
|
Test build #75805 has finished for PR 17636 at commit
|
def failOnOuterReference(p: LogicalPlan): Unit = { | ||
if (p.expressions.exists(containsOuter)) { | ||
// Make sure a plan's expressions do not contain : | ||
// 1. Aggregate expressions that has mixture of outer and local references. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: has
-> have
@@ -1362,7 +1389,7 @@ class Analyzer( | |||
// Note: | |||
// Generator with join=false is treated as Category 4. | |||
case g: Generate if g.join => | |||
failOnOuterReference(g) | |||
failOnInvalidOuterReference(g) | |||
|
|||
// Category 4: Any other operators not in the above 3 categories | |||
// cannot be on a correlation path, that is they are allowed only |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have any test case to cover this scenario?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile I couldn't find a test for this. I will add one for now in SubquerySuite. I will move the negative tests to the sqlquerytestsuite in a follow-up pr.
@@ -367,6 +367,8 @@ case class OuterReference(e: NamedExpression) | |||
override def exprId: ExprId = e.exprId | |||
override def toAttribute: Attribute = e.toAttribute | |||
override def newInstance(): NamedExpression = OuterReference(e.newInstance()) | |||
override def sql: String = e.sql | |||
override def toString: String = e.toString |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to override this? If we keep it unchanged, will it be easier for us to find whether it is an outer reference in the plans?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Thats a good point Sean. I will revert this and error raising code to make sure its printed properly.
FROM t1 | ||
WHERE t1a in (SELECT min(t2a) | ||
FROM t2 | ||
GROUP by t2c |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: by
-> BY
having t3b > t2b )) | ||
SELECT * | ||
FROM t1 | ||
WHERE t1a in (SELECT min(t2a) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: in
-> IN
Test build #75867 has finished for PR 17636 at commit
|
Test build #75875 has finished for PR 17636 at commit
|
retest this please |
Test build #75880 has finished for PR 17636 at commit
|
Test build #75952 has finished for PR 17636 at commit
|
@gatorsmile I have addressed the comments. Can you please look at this when you get a chance. Thanks !! |
@@ -815,7 +815,7 @@ class SubquerySuite extends QueryTest with SharedSQLContext { | |||
|
|||
// Generate operator | |||
test("Correlated subqueries in LATERAL VIEW") { | |||
withTempView("t1", "t2") { | |||
withTempView("t1", "t2", "t3") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
t3
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uh... i took out the usage of t3 and forgot to take out from this line. Thanks for catching this.
| from t2 | ||
| where exists (select * | ||
| from t1 lateral view explode(t2.arr_c2) q as c2 | ||
| where t1.c1 = t2.c1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: capitalize the SQL keyword?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile The other test was in lower case . so i had it in lower case for consistency. I will change both the SQLs in this test. Thank you.
Test build #75960 has finished for PR 17636 at commit
|
LGTM |
Merging to master/branch-2.2. Thanks! |
@hvanhovell Thanks a lot. |
…icates contain aggregate expression that has mixture of outer and local references. ## What changes were proposed in this pull request? Address a follow up in [comment](apache#16954 (comment)) Currently subqueries with correlated predicates containing aggregate expression having mixture of outer references and local references generate a codegen error like following : ```SQL SELECT t1a FROM t1 GROUP BY 1 HAVING EXISTS (SELECT 1 FROM t2 WHERE t2a < min(t1a + t2a)); ``` Exception snippet. ``` Cannot evaluate expression: min((input[0, int, false] + input[4, int, false])) at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:226) at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.doGenCode(interfaces.scala:87) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:106) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:103) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:103) ``` After this PR, a better error message is issued. ``` org.apache.spark.sql.AnalysisException Error in query: Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t1.`t1a` + t2.`t2a`)), Outer references: t1.`t1a`, Local references: t2.`t2a`.; ``` ## How was this patch tested? Added tests in SQLQueryTestSuite. Author: Dilip Biswal <dbiswal@us.ibm.com> Closes apache#17636 from dilipbiswal/subquery_followup1.
What changes were proposed in this pull request?
Address a follow up in comment
Currently subqueries with correlated predicates containing aggregate expression having mixture of outer references and local references generate a codegen error like following :
Exception snippet.
After this PR, a better error message is issued.
How was this patch tested?
Added tests in SQLQueryTestSuite.