From b98865127a39bde885f9b1680cfe608629d59d51 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 29 Jul 2016 17:43:56 -0400 Subject: [PATCH 01/14] [SPARK-16804][SQL] Correlated subqueries containing LIMIT return incorrect results ## What changes were proposed in this pull request? This patch fixes the incorrect results in the rule ResolveSubquery in Catalyst's Analysis phase. ## How was this patch tested? ./dev/run-tests a new unit test on the problematic pattern. --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 ++++++++++ .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 8 ++++++++ 2 files changed, 18 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2efa997ff22d2..c3ee6517875c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1021,6 +1021,16 @@ class Analyzer( case e: Expand => failOnOuterReferenceInSubTree(e, "an EXPAND") e + case l @ LocalLimit(_, child) => + failOnOuterReferenceInSubTree(l, "LIMIT") + l + // Since LIMIT is represented as GlobalLimit(, (LocalLimit (, child)) + // and we are walking bottom up, we will fail on LocalLimit before + // reaching GlobalLimit. + // The code below is just a safety net. + case g @ GlobalLimit(_, child) => + failOnOuterReferenceInSubTree(g, "LIMIT") + g case p => failOnOuterReference(p) p diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index ff112c51697ad..b78a988eddbb0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -533,5 +533,13 @@ class AnalysisErrorSuite extends AnalysisTest { Exists(Union(LocalRelation(b), Filter(EqualTo(OuterReference(a), c), LocalRelation(c)))), LocalRelation(a)) assertAnalysisError(plan3, "Accessing outer query column is not allowed in" :: Nil) + + val plan4 = Filter( + Exists( + Limit(1, + Filter(EqualTo(OuterReference(a), b), LocalRelation(b))) + ), + LocalRelation(a)) + assertAnalysisError(plan4, "Accessing outer query column is not allowed in LIMIT" :: Nil) } } From 069ed8f8e5f14dca7a15701945d42fc27fe82f3c Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 29 Jul 2016 17:50:02 -0400 Subject: [PATCH 02/14] [SPARK-16804][SQL] Correlated subqueries containing LIMIT return incorrect results ## What changes were proposed in this pull request? This patch fixes the incorrect results in the rule ResolveSubquery in Catalyst's Analysis phase. ## How was this patch tested? ./dev/run-tests a new unit test on the problematic pattern. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c3ee6517875c7..357c763f59467 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1022,14 +1022,14 @@ class Analyzer( failOnOuterReferenceInSubTree(e, "an EXPAND") e case l @ LocalLimit(_, child) => - failOnOuterReferenceInSubTree(l, "LIMIT") + failOnOuterReferenceInSubTree(l, "a LIMIT") l // Since LIMIT is represented as GlobalLimit(, (LocalLimit (, child)) // and we are walking bottom up, we will fail on LocalLimit before // reaching GlobalLimit. // The code below is just a safety net. case g @ GlobalLimit(_, child) => - failOnOuterReferenceInSubTree(g, "LIMIT") + failOnOuterReferenceInSubTree(g, "a LIMIT") g case p => failOnOuterReference(p) From edca333c081e6d4e53a91b496fba4a3ef4ee89ac Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 29 Jul 2016 20:28:15 -0400 Subject: [PATCH 03/14] New positive test cases --- .../org/apache/spark/sql/SubquerySuite.scala | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index afed342ff8e2a..52387b4b72a16 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -571,4 +571,33 @@ class SubquerySuite extends QueryTest with SharedSQLContext { Row(1.0, false) :: Row(1.0, false) :: Row(2.0, true) :: Row(2.0, true) :: Row(3.0, false) :: Row(5.0, true) :: Row(null, false) :: Row(null, true) :: Nil) } + + test("SPARK-16804: Correlated subqueries containing LIMIT - 1") { + withTempView("onerow") { + Seq(1).toDF("c1").createOrReplaceTempView("onerow") + + checkAnswer( + sql( + """ + | select c1 from onerow t1 + | where exists (select 1 from onerow t2 where t1.c1=t2.c1) + | and exists (select 1 from onerow LIMIT 1)""".stripMargin), + Row(1) :: Nil) + } + } + + test("SPARK-16804: Correlated subqueries containing LIMIT - 2") { + withTempView("onerow") { + Seq(1).toDF("c1").createOrReplaceTempView("onerow") + + checkAnswer( + sql( + """ + | select c1 from onerow t1 + | where exists (select 1 + | from (select 1 from onerow t2 LIMIT 1) + | where t1.c1=t2.c1)""".stripMargin), + Row(1) :: Nil) + } + } } From 64184fdb77c1a305bb2932e82582da28bb4c0e53 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Mon, 1 Aug 2016 09:20:09 -0400 Subject: [PATCH 04/14] Fix unit test case failure --- .../apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index b78a988eddbb0..c08de826bd945 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -540,6 +540,6 @@ class AnalysisErrorSuite extends AnalysisTest { Filter(EqualTo(OuterReference(a), b), LocalRelation(b))) ), LocalRelation(a)) - assertAnalysisError(plan4, "Accessing outer query column is not allowed in LIMIT" :: Nil) + assertAnalysisError(plan4, "Accessing outer query column is not allowed in a LIMIT" :: Nil) } } From 29f82b05c9e40e7934397257c674b260a8e8a996 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 5 Aug 2016 13:42:01 -0400 Subject: [PATCH 05/14] blocking TABLESAMPLE --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 7 +++++-- .../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 8 ++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 357c763f59467..9d99c4173d4af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1021,16 +1021,19 @@ class Analyzer( case e: Expand => failOnOuterReferenceInSubTree(e, "an EXPAND") e - case l @ LocalLimit(_, child) => + case l @ LocalLimit(_, _) => failOnOuterReferenceInSubTree(l, "a LIMIT") l // Since LIMIT is represented as GlobalLimit(, (LocalLimit (, child)) // and we are walking bottom up, we will fail on LocalLimit before // reaching GlobalLimit. // The code below is just a safety net. - case g @ GlobalLimit(_, child) => + case g @ GlobalLimit(_, _) => failOnOuterReferenceInSubTree(g, "a LIMIT") g + case s @ Sample(_, _, _, _, _) => + failOnOuterReferenceInSubTree(s, "a TABLESAMPLE") + s case p => failOnOuterReference(p) p diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index c08de826bd945..0b7d681be5114 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -541,5 +541,13 @@ class AnalysisErrorSuite extends AnalysisTest { ), LocalRelation(a)) assertAnalysisError(plan4, "Accessing outer query column is not allowed in a LIMIT" :: Nil) + + val plan5 = Filter( + Exists( + Sample(0.0, 0.5, false, 1L, + Filter(EqualTo(OuterReference(a), b), LocalRelation(b)))().select('b) + ), + LocalRelation(a)) + assertAnalysisError(plan5, "Accessing outer query column is not allowed in a TABLESAMPLE" :: Nil) } } From ac43ab47907a1ccd6d22f920415fbb4de93d4720 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 5 Aug 2016 17:10:19 -0400 Subject: [PATCH 06/14] Fixing code styling --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 9d99c4173d4af..29ede7048a2db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1021,17 +1021,17 @@ class Analyzer( case e: Expand => failOnOuterReferenceInSubTree(e, "an EXPAND") e - case l @ LocalLimit(_, _) => + case l : LocalLimit => failOnOuterReferenceInSubTree(l, "a LIMIT") l // Since LIMIT is represented as GlobalLimit(, (LocalLimit (, child)) // and we are walking bottom up, we will fail on LocalLimit before // reaching GlobalLimit. // The code below is just a safety net. - case g @ GlobalLimit(_, _) => + case g : GlobalLimit => failOnOuterReferenceInSubTree(g, "a LIMIT") g - case s @ Sample(_, _, _, _, _) => + case s : Sample => failOnOuterReferenceInSubTree(s, "a TABLESAMPLE") s case p => From 631d396031e8bf627eb1f4872a4d3a17c144536c Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Sun, 7 Aug 2016 14:39:44 -0400 Subject: [PATCH 07/14] Correcting Scala test style --- .../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 0b7d681be5114..8935d979414ae 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -548,6 +548,7 @@ class AnalysisErrorSuite extends AnalysisTest { Filter(EqualTo(OuterReference(a), b), LocalRelation(b)))().select('b) ), LocalRelation(a)) - assertAnalysisError(plan5, "Accessing outer query column is not allowed in a TABLESAMPLE" :: Nil) + assertAnalysisError(plan5, + "Accessing outer query column is not allowed in a TABLESAMPLE" :: Nil) } } From 7eb9b2dbba3633a1958e38e0019e3ce816300514 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Sun, 7 Aug 2016 22:31:09 -0400 Subject: [PATCH 08/14] One (last) attempt to correct the Scala style tests --- .../apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 8935d979414ae..6438065fb292e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -548,7 +548,7 @@ class AnalysisErrorSuite extends AnalysisTest { Filter(EqualTo(OuterReference(a), b), LocalRelation(b)))().select('b) ), LocalRelation(a)) - assertAnalysisError(plan5, + assertAnalysisError(plan5, "Accessing outer query column is not allowed in a TABLESAMPLE" :: Nil) } } From baf0e6084a838ce2d72eeeac9d7618ae4536ffb6 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Thu, 3 Nov 2016 23:54:24 -0400 Subject: [PATCH 09/14] First version of code+test cases --- .../sql/catalyst/analysis/Analyzer.scala | 34 +++++++ .../org/apache/spark/sql/SubquerySuite.scala | 91 +++++++++++++++++++ 2 files changed, 125 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5011f2fdbf9b7..6ed43f4595553 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -977,6 +977,13 @@ class Analyzer( localPredicateReferences -- p.outputSet } + // SPARK-17348 + // Report a non-supported case where there exists a correlated predicate + // that is not an equality predicate and the outer reference of the correlated predicate + // is not at the immediate parent operator. + def failOnNonEqualityPredicate(p: LogicalPlan): Unit = { + failAnalysis(s"Correlated column is not allowed in a non-equality predicate:\n $p") + } // Simplify the predicates before pulling them out. val transformed = BooleanSimplification(sub) transformUp { case f @ Filter(cond, child) => @@ -1044,6 +1051,33 @@ class Analyzer( failOnOuterReference(p) p } + + // SPARK-17348 + // Looking for a potential incorrect result case. + // When a correlated predicate is a non-equality predicate + // it must be placed at the immediate child operator. + // Otherwise, the pull up of the correlated predicate + // will generate a plan with a different semantics + // which could return incorrect result. + var continue : Boolean = true + for (pm <- predicateMap if continue) { + assert(pm._2.nonEmpty, "Correlated predicate(s) does not exist.") + for (p <- pm._2 if continue) + p match { + case EqualTo(_, _) | EqualNullSafe(_, _) => + None + case _ => + assert(transformed.children.nonEmpty) + if (!(transformed.isInstanceOf[Project]) || + !(pm._1 fastEquals transformed.asInstanceOf[Project].child)) { + continue = false + } + } + } + if (!continue) { + failOnNonEqualityPredicate(sub) + } + (transformed, predicateMap.values.flatten.toSeq) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index eab45050f7e63..b893e74576277 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -625,4 +625,95 @@ class SubquerySuite extends QueryTest with SharedSQLContext { Row(1) :: Nil) } } + + test("SPARK-17348: Correlated subqueries with non-equality predicate (good case)") { + withTempView("t1", "t2") { + Seq((1, 1)).toDF("c1", "c2").createOrReplaceTempView("t1") + Seq((1, 1), (2, 0)).toDF("c1", "c2").createOrReplaceTempView("t2") + + // Simple case + checkAnswer( + sql( + """ + | select c1 + | from t1 + | where c1 in (select t2.c1 + | from t2 + | where t1.c2 >= t2.c2)""".stripMargin), + Row(1) :: Nil) + + // More complex case with OR predicate + checkAnswer( + sql( + """ + | select t1.c1 + | from t1, t1 as t3 + | where t1.c1 = t3.c1 + | and (t1.c1 in (select t2.c1 + | from t2 + | where t1.c2 >= t2.c2 + | or t3.c2 < t2.c2) + | or t1.c2 >= 0)""".stripMargin), + Row(1) :: Nil) + } + } + + test("SPARK-17348: Correlated subqueries with non-equality predicate (error case)") { + withTempView("t1", "t2", "t3", "t4") { + Seq((1, 1)).toDF("c1", "c2").createOrReplaceTempView("t1") + Seq((1, 1), (2, 0)).toDF("c1", "c2").createOrReplaceTempView("t2") + Seq((2, 1)).toDF("c1", "c2").createOrReplaceTempView("t3") + Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t4") + + // Simplest case + intercept[AnalysisException] { + sql( + """ + | select t1.c1 + | from t1 + | where t1.c1 in (select max(t2.c1) + | from t2 + | where t1.c2 >= t2.c2)""".stripMargin).collect() + } + + // Add a HAVING on top and augmented within an OR predicate + intercept[AnalysisException] { + sql( + """ + | select t1.c1 + | from t1 + | where t1.c1 in (select max(t2.c1) + | from t2 + | where t1.c2 >= t2.c2 + | having count(*) > 0 ) + | or t1.c2 >= 0""".stripMargin).collect() + } + + // Add a HAVING on top and augmented within an OR predicate + intercept[AnalysisException] { + sql( + """ + | select t1.c1 + | from t1, t1 as t3 + | where t1.c1 = t3.c1 + | and (t1.c1 in (select max(t2.c1) + | from t2 + | where t1.c2 = t2.c2 + | or t3.c2 = t2.c2) + | )""".stripMargin).collect() + } + + // In Window expression: changing the data set to + // demonstrate if this query ran, it would return incorrect result. + intercept[AnalysisException] { + sql( + """ + | select c1 + | from t3 + | where c1 in (select max(t4.c1) over () + | from t4 + | where t3.c2 >= t4.c2)""".stripMargin).collect() + } + } + } } From 4e6d99b92cd0908856371569479debc72e03703c Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 4 Nov 2016 12:14:03 -0400 Subject: [PATCH 10/14] Address rxin's comment: inline the call to report Analyzer exception --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 6ed43f4595553..2852f9b1c20d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -977,13 +977,6 @@ class Analyzer( localPredicateReferences -- p.outputSet } - // SPARK-17348 - // Report a non-supported case where there exists a correlated predicate - // that is not an equality predicate and the outer reference of the correlated predicate - // is not at the immediate parent operator. - def failOnNonEqualityPredicate(p: LogicalPlan): Unit = { - failAnalysis(s"Correlated column is not allowed in a non-equality predicate:\n $p") - } // Simplify the predicates before pulling them out. val transformed = BooleanSimplification(sub) transformUp { case f @ Filter(cond, child) => @@ -1075,7 +1068,8 @@ class Analyzer( } } if (!continue) { - failOnNonEqualityPredicate(sub) + // Report a non-supported case as an exception + failAnalysis(s"Correlated column is not allowed in a non-equality predicate:\n$sub") } (transformed, predicateMap.values.flatten.toSeq) From 7e3f91a6a131d2275093b4b4c1abf4d5fc27adf3 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 11 Nov 2016 22:24:02 -0500 Subject: [PATCH 11/14] rework to limit to Aggregate/Window --- .../sql/catalyst/analysis/Analyzer.scala | 77 ++++++++++++------- 1 file changed, 49 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7c1b40f72524b..712664ed6e1f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1031,6 +1031,35 @@ class Analyzer( } } + // SPARK-17348: A potential incorrect result case. + // When a correlated predicate is a non-equality predicate, + // certain operators are not permitted from the operator + // hosting the correlated predicate up to the operator on the outer table. + // Otherwise, the pull up of the correlated predicate + // will generate a plan with a different semantics + // which could return incorrect result. + // Currently we check for Aggregate and Window operators + // + // Below shows an example of a Logical Plan during Analyzer phase that + // show this problem. Pulling the correlated predicate [outer(c2#77) >= ..] + // through the Aggregate (or Window) operator could alter the result of + // the Aggregate. + // + // Project [c1#76] + // +- Project [c1#87, c2#88] + // : (Aggregate or Window operator) + // : +- Filter [outer(c2#77) >= c2#88)] + // : +- SubqueryAlias t2, `t2` + // : +- Project [_1#84 AS c1#87, _2#85 AS c2#88] + // : +- LocalRelation [_1#84, _2#85] + // +- SubqueryAlias t1, `t1` + // +- Project [_1#73 AS c1#76, _2#74 AS c2#77] + // +- LocalRelation [_1#73, _2#74] + def failOnNonEqualCorrelatedPredicate(p: LogicalPlan): Unit = { + // Report a non-supported case as an exception + failAnalysis(s"Correlated column is not allowed in a non-equality predicate:\n$p") + } + /** Determine which correlated predicate references are missing from this plan. */ def missingReferences(p: LogicalPlan): AttributeSet = { val localPredicateReferences = p.collect(predicateMap) @@ -1041,12 +1070,24 @@ class Analyzer( localPredicateReferences -- p.outputSet } + var foundNonEqualCorrelatedPred : Boolean = false + // Simplify the predicates before pulling them out. val transformed = BooleanSimplification(sub) transformUp { case f @ Filter(cond, child) => // Find all predicates with an outer reference. val (correlated, local) = splitConjunctivePredicates(cond).partition(containsOuter) + // Find any non-equality correlated predicates + for (p <- correlated) if (!foundNonEqualCorrelatedPred) { + p match { + case EqualTo(_, _) | EqualNullSafe(_, _) => + None + case _ => + foundNonEqualCorrelatedPred = true + } + } + // Rewrite the filter without the correlated predicates if any. correlated match { case Nil => f @@ -1069,11 +1110,19 @@ class Analyzer( case a @ Aggregate(grouping, expressions, child) => failOnOuterReference(a) val referencesToAdd = missingReferences(a) + if (foundNonEqualCorrelatedPred) { + failOnNonEqualCorrelatedPredicate(a) + } if (referencesToAdd.nonEmpty) { Aggregate(grouping ++ referencesToAdd, expressions ++ referencesToAdd, child) } else { a } + case w : Window => + if (foundNonEqualCorrelatedPred) { + failOnNonEqualCorrelatedPredicate(w) + } + w case j @ Join(left, _, RightOuter, _) => failOnOuterReference(j) failOnOuterReferenceInSubTree(left, "a RIGHT OUTER JOIN") @@ -1108,34 +1157,6 @@ class Analyzer( failOnOuterReference(p) p } - - // SPARK-17348 - // Looking for a potential incorrect result case. - // When a correlated predicate is a non-equality predicate - // it must be placed at the immediate child operator. - // Otherwise, the pull up of the correlated predicate - // will generate a plan with a different semantics - // which could return incorrect result. - var continue : Boolean = true - for (pm <- predicateMap if continue) { - assert(pm._2.nonEmpty, "Correlated predicate(s) does not exist.") - for (p <- pm._2 if continue) - p match { - case EqualTo(_, _) | EqualNullSafe(_, _) => - None - case _ => - assert(transformed.children.nonEmpty) - if (!(transformed.isInstanceOf[Project]) || - !(pm._1 fastEquals transformed.asInstanceOf[Project].child)) { - continue = false - } - } - } - if (!continue) { - // Report a non-supported case as an exception - failAnalysis(s"Correlated column is not allowed in a non-equality predicate:\n$sub") - } - (transformed, predicateMap.values.flatten.toSeq) } From e1f7e8750cdb85811e24896cc8577f441548963e Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Mon, 14 Nov 2016 10:28:46 -0500 Subject: [PATCH 12/14] Minor styling change --- .../sql/catalyst/analysis/Analyzer.scala | 27 ++++++++----------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 712664ed6e1f7..c14f353517088 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1055,9 +1055,11 @@ class Analyzer( // +- SubqueryAlias t1, `t1` // +- Project [_1#73 AS c1#76, _2#74 AS c2#77] // +- LocalRelation [_1#73, _2#74] - def failOnNonEqualCorrelatedPredicate(p: LogicalPlan): Unit = { - // Report a non-supported case as an exception - failAnalysis(s"Correlated column is not allowed in a non-equality predicate:\n$p") + def failOnNonEqualCorrelatedPredicate(found: Boolean, p: LogicalPlan): Unit = { + if (found) { + // Report a non-supported case as an exception + failAnalysis(s"Correlated column is not allowed in a non-equality predicate:\n$p") + } } /** Determine which correlated predicate references are missing from this plan. */ @@ -1079,13 +1081,9 @@ class Analyzer( val (correlated, local) = splitConjunctivePredicates(cond).partition(containsOuter) // Find any non-equality correlated predicates - for (p <- correlated) if (!foundNonEqualCorrelatedPred) { - p match { - case EqualTo(_, _) | EqualNullSafe(_, _) => - None - case _ => - foundNonEqualCorrelatedPred = true - } + foundNonEqualCorrelatedPred = foundNonEqualCorrelatedPred || correlated.exists { + case _: EqualTo | _: EqualNullSafe => false + case _ => true } // Rewrite the filter without the correlated predicates if any. @@ -1109,19 +1107,16 @@ class Analyzer( } case a @ Aggregate(grouping, expressions, child) => failOnOuterReference(a) + failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, a) + val referencesToAdd = missingReferences(a) - if (foundNonEqualCorrelatedPred) { - failOnNonEqualCorrelatedPredicate(a) - } if (referencesToAdd.nonEmpty) { Aggregate(grouping ++ referencesToAdd, expressions ++ referencesToAdd, child) } else { a } case w : Window => - if (foundNonEqualCorrelatedPred) { - failOnNonEqualCorrelatedPredicate(w) - } + failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, w) w case j @ Join(left, _, RightOuter, _) => failOnOuterReference(j) From 361f89338f94b2f483b4c1f63e9fd86745bf0acd Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Mon, 14 Nov 2016 10:47:56 -0500 Subject: [PATCH 13/14] Remove redundant check of equality pred in ScalarSubquery --- .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 7 ------- 1 file changed, 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 3455a567b7786..7b75c1f70974b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -119,13 +119,6 @@ trait CheckAnalysis extends PredicateHelper { } case s @ ScalarSubquery(query, conditions, _) if conditions.nonEmpty => - // Make sure we are using equi-joins. - conditions.foreach { - case _: EqualTo | _: EqualNullSafe => // ok - case e => failAnalysis( - s"The correlated scalar subquery can only contain equality predicates: $e") - } - // Make sure correlated scalar subqueries contain one row for every outer row by // enforcing that they are aggregates which contain exactly one aggregate expressions. // The analyzer has already checked that subquery contained only one output column, and From 9def9a70cf9dc5517d3cfb9d50611f68521e0854 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Mon, 14 Nov 2016 11:22:21 -0500 Subject: [PATCH 14/14] Update error msg in SubquerySuite --- .../src/test/scala/org/apache/spark/sql/SubquerySuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index a2311f731b786..c84a6f161893c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -498,10 +498,10 @@ class SubquerySuite extends QueryTest with SharedSQLContext { test("non-equal correlated scalar subquery") { val msg1 = intercept[AnalysisException] { - sql("select a, (select b from l l2 where l2.a < l1.a) sum_b from l l1") + sql("select a, (select sum(b) from l l2 where l2.a < l1.a) sum_b from l l1") } assert(msg1.getMessage.contains( - "The correlated scalar subquery can only contain equality predicates")) + "Correlated column is not allowed in a non-equality predicate:")) } test("disjunctive correlated scalar subquery") {