Skip to content

Commit

Permalink
[SPARK-13310] [SQL] Resolve Missing Sorting Columns in Generate
Browse files Browse the repository at this point in the history
```scala
// case 1: missing sort columns are resolvable if join is true
sql("SELECT explode(a) AS val, b FROM data WHERE b < 2 order by val, c")
// case 2: missing sort columns are not resolvable if join is false. Thus, issue an error message in this case
sql("SELECT explode(a) AS val FROM data order by val, c")
```

When sort columns are not in `Generate`, we can resolve them when `join` is equal to `true`. Still trying to add more test cases for the other `UnaryNode` types.

Could you review the changes? davies cloud-fan Thanks!

Author: gatorsmile <gatorsmile@gmail.com>

Closes #11198 from gatorsmile/missingInSort.
  • Loading branch information
gatorsmile authored and davies committed Feb 20, 2016
1 parent a4a081d commit f88c641
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -609,17 +609,24 @@ class Analyzer(
case sa @ Sort(_, _, child: Aggregate) => sa

case s @ Sort(order, _, child) if !s.resolved && child.resolved =>
val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder])
val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)
val missingAttrs = requiredAttrs -- child.outputSet
if (missingAttrs.nonEmpty) {
// Add missing attributes and then project them away after the sort.
Project(child.output,
Sort(newOrder, s.global, addMissingAttr(child, missingAttrs)))
} else if (newOrder != order) {
s.copy(order = newOrder)
} else {
s
try {
val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder])
val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)
val missingAttrs = requiredAttrs -- child.outputSet
if (missingAttrs.nonEmpty) {
// Add missing attributes and then project them away after the sort.
Project(child.output,
Sort(newOrder, s.global, addMissingAttr(child, missingAttrs)))
} else if (newOrder != order) {
s.copy(order = newOrder)
} else {
s
}
} catch {
// Attempting to resolve it might fail. When this happens, return the original plan.
// Users will see an AnalysisException for resolution failure of missing attributes
// in Sort
case ae: AnalysisException => s
}
}

Expand Down Expand Up @@ -649,6 +656,11 @@ class Analyzer(
}
val newAggregateExpressions = a.aggregateExpressions ++ missingAttrs
a.copy(aggregateExpressions = newAggregateExpressions)
case g: Generate =>
// If join is false, we will convert it to true for getting from the child the missing
// attributes that its child might have or could have.
val missing = missingAttrs -- g.child.outputSet
g.copy(join = true, child = addMissingAttr(g.child, missing))
case u: UnaryNode =>
u.withNewChildren(addMissingAttr(u.child, missingAttrs) :: Nil)
case other =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ class AnalysisErrorSuite extends AnalysisTest {
mapRelation.orderBy('map.asc),
"sort" :: "type" :: "map<int,int>" :: Nil)

errorTest(
"sorting by attributes are not from grouping expressions",
testRelation2.groupBy('a, 'c)('a, 'c, count('a).as("a3")).orderBy('b.asc),
"cannot resolve" :: "'b'" :: "given input columns" :: "[a, c, a3]" :: Nil)

errorTest(
"non-boolean filters",
testRelation.where(Literal(1)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, FunctionRegistry}
import org.apache.spark.sql.catalyst.parser.ParserConf
import org.apache.spark.sql.execution.SparkQl
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.hive.{HiveContext, HiveQl, MetastoreRelation}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -927,6 +926,33 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
).map(i => Row(i._1, i._2, i._3, i._4)))
}

test("Sorting columns are not in Generate") {
withTempTable("data") {
sqlContext.range(1, 5)
.select(array($"id", $"id" + 1).as("a"), $"id".as("b"), (lit(10) - $"id").as("c"))
.registerTempTable("data")

// case 1: missing sort columns are resolvable if join is true
checkAnswer(
sql("SELECT explode(a) AS val, b FROM data WHERE b < 2 order by val, c"),
Row(1, 1) :: Row(2, 1) :: Nil)

// case 2: missing sort columns are resolvable if join is false
checkAnswer(
sql("SELECT explode(a) AS val FROM data order by val, c"),
Seq(1, 2, 2, 3, 3, 4, 4, 5).map(i => Row(i)))

// case 3: missing sort columns are resolvable if join is true and outer is true
checkAnswer(
sql(
"""
|SELECT C.val, b FROM data LATERAL VIEW OUTER explode(a) C as val
|where b < 2 order by c, val, b
""".stripMargin),
Row(1, 1) :: Row(2, 1) :: Nil)
}
}

test("window function: Sorting columns are not in Project") {
val data = Seq(
WindowData(1, "d", 10),
Expand Down

0 comments on commit f88c641

Please sign in to comment.