Skip to content

Commit

Permalink
fix order by
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jun 21, 2015
1 parent d18f401 commit 4cfd23c
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.catalyst.analysis

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{SimpleCatalystConf, CatalystConf}
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -153,9 +151,11 @@ class Analyzer(
}

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case Aggregate(groups, aggs, child) if child.resolved =>
case Aggregate(groups, aggs, child)
if child.resolved && aggs.exists(_.isInstanceOf[UnresolvedAlias]) =>
Aggregate(groups, assignAliases(aggs), child)
case Project(projectList, child) if child.resolved =>
case Project(projectList, child)
if child.resolved && projectList.exists(_.isInstanceOf[UnresolvedAlias]) =>
Project(assignAliases(projectList), child)
}
}
Expand Down Expand Up @@ -371,12 +371,10 @@ class Analyzer(
q.asInstanceOf[GroupingAnalytics].gid
case u @ UnresolvedAttribute(nameParts) =>
// Leave unchanged if resolution fails. Hopefully will be resolved next round.
val result = withPosition(u) {
q.resolveChildren(nameParts, resolver).map {
case UnresolvedAlias(child) => child
case other => other
}.getOrElse(u)
}
val result =
withPosition(u) {
q.resolveChildren(nameParts, resolver).map(trimUnresolvedAlias).getOrElse(u)
}
logDebug(s"Resolving $u to $result")
result
case UnresolvedExtractValue(child, fieldExpr) if child.resolved =>
Expand All @@ -402,6 +400,11 @@ class Analyzer(
exprs.exists(_.collect { case _: Star => true }.nonEmpty)
}

private def trimUnresolvedAlias(ne: NamedExpression) = ne match {
case UnresolvedAlias(child) => child
case other => other
}

private def resolveSortOrders(ordering: Seq[SortOrder], plan: LogicalPlan, throws: Boolean) = {
ordering.map { order =>
// Resolve SortOrder in one round.
Expand All @@ -411,7 +414,7 @@ class Analyzer(
try {
val newOrder = order transformUp {
case u @ UnresolvedAttribute(nameParts) =>
plan.resolve(nameParts, resolver).getOrElse(u)
plan.resolve(nameParts, resolver).map(trimUnresolvedAlias).getOrElse(u)
case UnresolvedExtractValue(child, fieldName) if child.resolved =>
ExtractValue(child, fieldName, resolver)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,6 @@ case class UnresolvedAlias(child: Expression) extends NamedExpression

override lazy val resolved = false

override def eval(input: Row = null): Any =
override def eval(input: InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.python.SerDeUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.{MultiAlias, ResolvedStar, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, ResolvedStar, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{Filter, _}
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
Expand Down

0 comments on commit 4cfd23c

Please sign in to comment.