From 78e69018ecaffb9598f4ea2b51900850ee3fb988 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 19 Jul 2016 23:56:50 -0700 Subject: [PATCH 1/9] Add regression tests --- .../execution/SQLWindowFunctionSuite.scala | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala index 77e97dff8c221..5032821d776e5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala @@ -367,4 +367,42 @@ class SQLWindowFunctionSuite extends QueryTest with SQLTestUtils with TestHiveSi | select * from v2 order by key limit 1 """.stripMargin), Row(0, 3)) } + + test("lead/lag should return the default value if the offset row does not exist") { + checkAnswer(sql( + """ + |SELECT + | lag(123, 100, 321) OVER (ORDER BY id) as lag + |FROM (SELECT 1 as id) tmp + """.stripMargin), + Row(321)) + + checkAnswer(sql( + """ + |SELECT + | lead(123, 100, 321) OVER (ORDER BY id) as lag + |FROM (SELECT 1 as id) tmp + """.stripMargin), + Row(321)) + } + + test("lead/lag should be able to handle null input value correctly") { + checkAnswer(sql( + """ + |SELECT + | row_number() OVER (ORDER BY id) as row_number, + | lag(id, 1, 321) OVER (ORDER BY id) as lag + |FROM (SELECT cast(null as int) as id UNION ALL select cast(null as int) as id) tmp + """.stripMargin), + Row(1, 321) :: Row(2, null) :: Nil) + + checkAnswer(sql( + """ + |SELECT + | row_number() OVER (ORDER BY id) as row_number, + | lead(id, 1, 321) OVER (ORDER BY id) as lead + |FROM (SELECT cast(null as int) as id UNION ALL select cast(null as int) as id) tmp + """.stripMargin), + Row(1, null) :: Row(2, 321) :: Nil) + } } From da5f36f5daa16c4aba605cb939b313c92274b24e Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 20 Jul 2016 00:22:17 -0700 Subject: [PATCH 2/9] Fix SPARK-16642 --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d1d2c59caed9a..61162ccdba810 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1787,7 +1787,8 @@ class Analyzer( s @ WindowSpecDefinition(_, o, UnspecifiedFrame)) if wf.frame != UnspecifiedFrame => WindowExpression(wf, s.copy(frameSpecification = wf.frame)) - case we @ WindowExpression(e, s @ WindowSpecDefinition(_, o, UnspecifiedFrame)) => + case we @ WindowExpression(e, s @ WindowSpecDefinition(_, o, UnspecifiedFrame)) + if e.resolved => val frame = SpecifiedWindowFrame.defaultWindowFrame(o.nonEmpty, acceptWindowFrame = true) we.copy(windowSpec = s.copy(frameSpecification = frame)) } From 02ee1915ab2519c876f60162ff00aaa155142eec Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 20 Jul 2016 01:43:04 -0700 Subject: [PATCH 3/9] OffsetWindowFunction's nullable should also check its input's nullable field. --- .../spark/sql/catalyst/expressions/windowExpressions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index c0b453dccf5e9..7b55a01561c47 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -348,7 +348,7 @@ abstract class OffsetWindowFunction */ override def foldable: Boolean = false - override def nullable: Boolean = default == null || default.nullable + override def nullable: Boolean = default == null || default.nullable || input.nullable override lazy val frame = { // This will be triggered by the Analyzer. From 506393b3eec45f7b62615adfe317a230e8de4128 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 20 Jul 2016 01:43:28 -0700 Subject: [PATCH 4/9] Change the behavior of lead/lag back to Spark 1.6's behavior, which is explained below: * When the offset row does not exits, default values will be used. * lead/lag always respect null input values. --- .../spark/sql/execution/WindowExec.scala | 34 +++++++++++++++---- .../execution/SQLWindowFunctionSuite.scala | 34 ++++++++++++------- 2 files changed, 48 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala index 93f007f5b348b..7149603018695 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala @@ -582,25 +582,43 @@ private[execution] final class OffsetWindowFunctionFrame( /** Row used to combine the offset and the current row. */ private[this] val join = new JoinedRow - /** Create the projection. */ + /** + * Create the projection used when the offset row exists. + * Please note that this project always respect null input values (like PostgreSQL). + */ private[this] val projection = { // Collect the expressions and bind them. val inputAttrs = inputSchema.map(_.withNullability(true)) - val numInputAttributes = inputAttrs.size val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map { case e: OffsetWindowFunction => val input = BindReferences.bindReference(e.input, inputAttrs) + input + case e => + BindReferences.bindReference(e, inputAttrs) + } + + // Create the projection. + newMutableProjection(boundExpressions, Nil).target(target) + } + + /** Create the projection used when the offset row DOES NOT exists. */ + private[this] val fillDefaultValue = { + // Collect the expressions and bind them. + val inputAttrs = inputSchema.map(_.withNullability(true)) + val numInputAttributes = inputAttrs.size + val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map { + case e: OffsetWindowFunction => if (e.default == null || e.default.foldable && e.default.eval() == null) { - // Without default value. - input + // The default value is null. + Literal.create(null, e.dataType) } else { - // With default value. + // The default value is an expression. val default = BindReferences.bindReference(e.default, inputAttrs).transform { // Shift the input reference to its default version. case BoundReference(o, dataType, nullable) => BoundReference(o + numInputAttributes, dataType, nullable) } - org.apache.spark.sql.catalyst.expressions.Coalesce(input :: default :: Nil) + default } case e => BindReferences.bindReference(e, inputAttrs) @@ -625,10 +643,12 @@ private[execution] final class OffsetWindowFunctionFrame( if (inputIndex >= 0 && inputIndex < input.size) { val r = input.next() join(r, current) + projection(join) } else { join(emptyRow, current) + // Use default values since the offset row does not exist. + fillDefaultValue(join) } - projection(join) inputIndex += 1 } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala index 5032821d776e5..839be26b9d4dd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala @@ -372,37 +372,45 @@ class SQLWindowFunctionSuite extends QueryTest with SQLTestUtils with TestHiveSi checkAnswer(sql( """ |SELECT - | lag(123, 100, 321) OVER (ORDER BY id) as lag + | lag(123, 100, 321) OVER (ORDER BY id) as lag, + | lead(123, 100, 321) OVER (ORDER BY id) as lead |FROM (SELECT 1 as id) tmp """.stripMargin), - Row(321)) + Row(321, 321)) checkAnswer(sql( """ |SELECT - | lead(123, 100, 321) OVER (ORDER BY id) as lag - |FROM (SELECT 1 as id) tmp + | lag(123, 100, a) OVER (ORDER BY id) as lag, + | lead(123, 100, a) OVER (ORDER BY id) as lead + |FROM (SELECT 1 as id, 2 as a) tmp """.stripMargin), - Row(321)) + Row(2, 2)) } test("lead/lag should be able to handle null input value correctly") { checkAnswer(sql( """ |SELECT - | row_number() OVER (ORDER BY id) as row_number, - | lag(id, 1, 321) OVER (ORDER BY id) as lag - |FROM (SELECT cast(null as int) as id UNION ALL select cast(null as int) as id) tmp + | b, + | lag(a, 1, 321) OVER (ORDER BY b) as lag, + | lead(a, 1, 321) OVER (ORDER BY b) as lead + |FROM (SELECT cast(null as int) as a, 1 as b + | UNION ALL + | select cast(null as int) as id, 2 as b) tmp """.stripMargin), - Row(1, 321) :: Row(2, null) :: Nil) + Row(1, 321, null) :: Row(2, null, 321) :: Nil) checkAnswer(sql( """ |SELECT - | row_number() OVER (ORDER BY id) as row_number, - | lead(id, 1, 321) OVER (ORDER BY id) as lead - |FROM (SELECT cast(null as int) as id UNION ALL select cast(null as int) as id) tmp + | b, + | lag(a, 1, c) OVER (ORDER BY b) as lag, + | lead(a, 1, c) OVER (ORDER BY b) as lead + |FROM (SELECT cast(null as int) as a, 1 as b, 3 as c + | UNION ALL + | select cast(null as int) as id, 2 as b, 4 as c) tmp """.stripMargin), - Row(1, null) :: Row(2, 321) :: Nil) + Row(1, 3, null) :: Row(2, null, 4) :: Nil) } } From c2fd2d786f952877cf51f685056b1068c455f776 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 20 Jul 2016 01:58:03 -0700 Subject: [PATCH 5/9] doc change --- .../spark/sql/catalyst/expressions/windowExpressions.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index 7b55a01561c47..d2e04be09620d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -382,7 +382,7 @@ abstract class OffsetWindowFunction * * @param input expression to evaluate 'offset' rows after the current row. * @param offset rows to jump ahead in the partition. - * @param default to use when the input value is null or when the offset is larger than the window. + * @param default to use when the offset is larger than the window. */ @ExpressionDescription(usage = """_FUNC_(input, offset, default) - LEAD returns the value of 'x' at 'offset' rows @@ -409,7 +409,7 @@ case class Lead(input: Expression, offset: Expression, default: Expression) * * @param input expression to evaluate 'offset' rows before the current row. * @param offset rows to jump back in the partition. - * @param default to use when the input value is null or when the offset is smaller than the window. + * @param default to use when the offset is smaller than the window. */ @ExpressionDescription(usage = """_FUNC_(input, offset, default) - LAG returns the value of 'x' at 'offset' rows From 51e693729a11170545c42366a6d46d130128173a Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 20 Jul 2016 08:42:39 -0700 Subject: [PATCH 6/9] Fix doc --- .../expressions/windowExpressions.scala | 40 ++++++++++--------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index d2e04be09620d..7fd25758a8241 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -373,20 +373,22 @@ abstract class OffsetWindowFunction } /** - * The Lead function returns the value of 'x' at 'offset' rows after the current row in the window. - * Offsets start at 0, which is the current row. The offset must be constant integer value. The - * default offset is 1. When the value of 'x' is null at the offset, or when the offset is larger - * than the window, the default expression is evaluated. - * - * This documentation has been based upon similar documentation for the Hive and Presto projects. + * The Lead function returns the value of 'x' at the 'offset'th row after the current row in + * the window. Offsets start at 0, which is the current row. The offset must be constant + * integer value. The default offset is 1. When the value of 'x' is null at the 'offset'th row, + * null is returned. If there is no such offset row, the default expression is evaluated. * * @param input expression to evaluate 'offset' rows after the current row. * @param offset rows to jump ahead in the partition. - * @param default to use when the offset is larger than the window. + * @param default to use when the offset is larger than the window. The default value is null. */ @ExpressionDescription(usage = - """_FUNC_(input, offset, default) - LEAD returns the value of 'x' at 'offset' rows - after the current row in the window""") + """_FUNC_(input, offset, default) - LEAD returns the value of 'x' at the 'offset'th row + after the current row in the window. + The default value of 'offset' is 1 and the default value of 'default' is null. + If the value of 'x' at the 'offset'th row is null, null is returned. + If there is no such offset row (e.g. when the offset is 1, the last row of the window + does not have any subsequent row), 'default' is returned.""") case class Lead(input: Expression, offset: Expression, default: Expression) extends OffsetWindowFunction { @@ -400,20 +402,22 @@ case class Lead(input: Expression, offset: Expression, default: Expression) } /** - * The Lag function returns the value of 'x' at 'offset' rows before the current row in the window. - * Offsets start at 0, which is the current row. The offset must be constant integer value. The - * default offset is 1. When the value of 'x' is null at the offset, or when the offset is smaller - * than the window, the default expression is evaluated. - * - * This documentation has been based upon similar documentation for the Hive and Presto projects. + * The Lag function returns the value of 'x' at the 'offset'th row before the current row in + * the window. Offsets start at 0, which is the current row. The offset must be constant + * integer value. The default offset is 1. When the value of 'x' is null at the 'offset'th row, + * null is returned. If there is no such offset row, the default expression is evaluated. * * @param input expression to evaluate 'offset' rows before the current row. * @param offset rows to jump back in the partition. - * @param default to use when the offset is smaller than the window. + * @param default to use when the offset row does not exist. */ @ExpressionDescription(usage = - """_FUNC_(input, offset, default) - LAG returns the value of 'x' at 'offset' rows - before the current row in the window""") + """_FUNC_(input, offset, default) - LAG returns the value of 'x' at the 'offset'th row + before the current row in the window. + The default value of 'offset' is 1 and the default value of 'default' is null. + If the value of 'x' at the 'offset'th row is null, null is returned. + If there is no such offset row (e.g. when the offset is 1, the first row of the window + does not have any previous row), 'default' is returned.""") case class Lag(input: Expression, offset: Expression, default: Expression) extends OffsetWindowFunction { From faa7d898040373ac6348376b32d5cd93015cdf32 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 20 Jul 2016 13:16:44 -0700 Subject: [PATCH 7/9] Move the test file --- .../execution/SQLWindowFunctionSuite.scala | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) rename sql/{hive/src/test/scala/org/apache/spark/sql/hive => core/src/test/scala/org/apache/spark/sql}/execution/SQLWindowFunctionSuite.scala (96%) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala similarity index 96% rename from sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala index 839be26b9d4dd..9f74e657d850f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala @@ -15,12 +15,10 @@ * limitations under the License. */ -package org.apache.spark.sql.hive.execution +package org.apache.spark.sql.execution import org.apache.spark.sql.{AnalysisException, QueryTest, Row} -import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.test.SQLTestUtils - +import org.apache.spark.sql.test.SharedSQLContext case class WindowData(month: Int, area: String, product: Int) @@ -28,8 +26,9 @@ case class WindowData(month: Int, area: String, product: Int) /** * Test suite for SQL window functions. */ -class SQLWindowFunctionSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { - import spark.implicits._ +class SQLWindowFunctionSuite extends QueryTest with SharedSQLContext { + + import testImplicits._ test("window function: udaf with aggregate expression") { val data = Seq( @@ -357,15 +356,14 @@ class SQLWindowFunctionSuite extends QueryTest with SQLTestUtils with TestHiveSi } test("SPARK-7595: Window will cause resolve failed with self join") { - sql("SELECT * FROM src") // Force loading of src table. - checkAnswer(sql( """ |with - | v1 as (select key, count(value) over (partition by key) cnt_val from src), + | v0 as (select 0 as key, 1 as value), + | v1 as (select key, count(value) over (partition by key) cnt_val from v0), | v2 as (select v1.key, v1_lag.cnt_val from v1, v1 v1_lag where v1.key = v1_lag.key) - | select * from v2 order by key limit 1 - """.stripMargin), Row(0, 3)) + | select key, cnt_val from v2 order by key limit 1 + """.stripMargin), Row(0, 1)) } test("lead/lag should return the default value if the offset row does not exist") { From 073ac94c66759d120948c04849c20634b8e4ae4e Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 21 Jul 2016 13:38:15 -0700 Subject: [PATCH 8/9] test names --- .../apache/spark/sql/execution/SQLWindowFunctionSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala index 9f74e657d850f..d3cfa953a3123 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala @@ -366,7 +366,7 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSQLContext { """.stripMargin), Row(0, 1)) } - test("lead/lag should return the default value if the offset row does not exist") { + test("SPARK-16633: lead/lag should return the default value if the offset row does not exist") { checkAnswer(sql( """ |SELECT @@ -386,7 +386,7 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSQLContext { Row(2, 2)) } - test("lead/lag should be able to handle null input value correctly") { + test("lead/lag should respect null values") { checkAnswer(sql( """ |SELECT From ff3029e3db2214d7c51ea3ea866becf441fddac4 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 25 Jul 2016 18:16:44 -0700 Subject: [PATCH 9/9] Update doc --- .../spark/sql/catalyst/expressions/windowExpressions.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index 7fd25758a8241..6012e62455cf8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -321,8 +321,7 @@ abstract class OffsetWindowFunction val input: Expression /** - * Default result value for the function when the input expression returns NULL. The default will - * evaluated against the current row instead of the offset row. + * Default result value for the function when the 'offset'th row does not exist. */ val default: Expression