Skip to content

Commit

Permalink
add 'Last' component
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyunh committed Aug 29, 2014
1 parent 98ddbe6 commit b3df91b
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 0 deletions.
2 changes: 2 additions & 0 deletions sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected val DISTINCT = Keyword("DISTINCT")
protected val FALSE = Keyword("FALSE")
protected val FIRST = Keyword("FIRST")
protected val LAST = Keyword("LAST")
protected val FROM = Keyword("FROM")
protected val FULL = Keyword("FULL")
protected val GROUP = Keyword("GROUP")
Expand Down Expand Up @@ -311,6 +312,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
case s ~ _ ~ _ ~ _ ~ _ ~ e => ApproxCountDistinct(e, s.toDouble)
} |
FIRST ~> "(" ~> expression <~ ")" ^^ { case exp => First(exp) } |
LAST ~> "(" ~> expression <~ ")" ^^ { case exp => Last(exp) } |
AVG ~> "(" ~> expression <~ ")" ^^ { case exp => Average(exp) } |
MIN ~> "(" ~> expression <~ ")" ^^ { case exp => Min(exp) } |
MAX ~> "(" ~> expression <~ ")" ^^ { case exp => Max(exp) } |
Expand Down
1 change: 1 addition & 0 deletions sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ package object dsl {
def approxCountDistinct(e: Expression, rsd: Double = 0.05) = ApproxCountDistinct(e, rsd)
def avg(e: Expression) = Average(e)
def first(e: Expression) = First(e)
def last(e: Expression) = Last(e)
def min(e: Expression) = Min(e)
def max(e: Expression) = Max(e)
def upper(e: Expression) = Upper(e)
Expand Down
29 changes: 29 additions & 0 deletions sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,21 @@ case class First(child: Expression) extends PartialAggregate with trees.UnaryNod
override def newInstance() = new FirstFunction(child, this)
}

case class Last(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
override def references = child.references
override def nullable = true
override def dataType = child.dataType
override def toString = s"LAST($child)"

override def asPartial: SplitEvaluation = {
val partialLast = Alias(Last(child), "PartialLast")()
SplitEvaluation(
Last(partialLast.toAttribute),
partialLast :: Nil)
}
override def newInstance() = new LastFunction(child, this)
}

case class AverageFunction(expr: Expression, base: AggregateExpression)
extends AggregateFunction {

Expand Down Expand Up @@ -489,3 +504,17 @@ case class FirstFunction(expr: Expression, base: AggregateExpression) extends Ag

override def eval(input: Row): Any = result
}

case class LastFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction {
def this() = this(null, null) // Required for serialization.

var result: Any = null

override def update(input: Row): Unit = {
if (result == null) {
result = input
}
}

override def eval(input: Row): Any = if (result != null) expr.eval(result.asInstanceOf[Row]) else null
}

0 comments on commit b3df91b

Please sign in to comment.