From af8708658fd01d2f908a57dc87511059b50d44d3 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 13 Aug 2015 10:47:22 -0700 Subject: [PATCH 1/2] Fix last. --- .../spark/sql/catalyst/expressions/aggregates.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index 2cf8312ea59aa..5e8298aaaa9cb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -650,6 +650,7 @@ case class FirstFunction(expr: Expression, base: AggregateExpression1) extends A var result: Any = null override def update(input: InternalRow): Unit = { + // We ignore null values. if (result == null) { result = expr.eval(input) } @@ -679,10 +680,14 @@ case class LastFunction(expr: Expression, base: AggregateExpression1) extends Ag var result: Any = null override def update(input: InternalRow): Unit = { - result = input + val value = expr.eval(input) + // We ignore null values. + if (value != null) { + result = value + } } override def eval(input: InternalRow): Any = { - if (result != null) expr.eval(result.asInstanceOf[InternalRow]) else null + result } } From b28c42abccb7f9011961d421839ec2937f9fe12b Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 13 Aug 2015 10:58:59 -0700 Subject: [PATCH 2/2] Regression test. --- .../hive/execution/AggregationQuerySuite.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 7b5aa4763fd9e..ca9d5fb646ce0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -479,6 +479,21 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Be Row(0, null, 1, 1, null, 0) :: Nil) } + test("test Last implemented based on AggregateExpression1") { + // TODO: Remove this test once we remove AggregateExpression1. + import org.apache.spark.sql.functions._ + val df = Seq((1, 1), (2, 2), (3, 3)).toDF("i", "j").repartition(1) + withSQLConf( + SQLConf.SHUFFLE_PARTITIONS.key -> "1", + SQLConf.USE_SQL_AGGREGATE2.key -> "false") { + + checkAnswer( + df.groupBy("i").agg(last("j")), + df + ) + } + } + test("error handling") { withSQLConf("spark.sql.useAggregate2" -> "false") { val errorMessage = intercept[AnalysisException] {