From 2073be263e31ba0a957b7ffa2bec2cafe40011d2 Mon Sep 17 00:00:00 2001 From: HanShuliang Date: Thu, 10 Aug 2017 14:46:04 +0800 Subject: [PATCH 1/2] [SPARK-21590][SS]Window start time should support negative values --- .../sql/catalyst/expressions/TimeWindow.scala | 9 +++---- .../analysis/AnalysisErrorSuite.scala | 25 ++++++++++++------- .../expressions/TimeWindowSuite.scala | 13 ++++++++++ 3 files changed, 32 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala index 9a9f579b37f58..b2ca5102b8663 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala @@ -79,16 +79,13 @@ case class TimeWindow( if (slideDuration <= 0) { return TypeCheckFailure(s"The slide duration ($slideDuration) must be greater than 0.") } - if (startTime < 0) { - return TypeCheckFailure(s"The start time ($startTime) must be greater than or equal to 0.") - } if (slideDuration > windowDuration) { return TypeCheckFailure(s"The slide duration ($slideDuration) must be less than or equal" + s" to the windowDuration ($windowDuration).") } - if (startTime >= slideDuration) { - return TypeCheckFailure(s"The start time ($startTime) must be less than the " + - s"slideDuration ($slideDuration).") + if (startTime.abs >= slideDuration) { + return TypeCheckFailure(s"The absolute value of start time ($startTime) must be less " + + s"than the slideDuration ($slideDuration).") } } dataTypeCheck diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 4e0613619add6..42d7d24856dbd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -334,14 +334,28 @@ class AnalysisErrorSuite extends AnalysisTest { "start time greater than slide duration in time window", testRelation.select( TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "1 minute").as("window")), - "The start time " :: " must be less than the slideDuration " :: Nil + "The absolute value of start time " :: " must be less than the slideDuration " :: Nil ) errorTest( "start time equal to slide duration in time window", testRelation.select( TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "1 second").as("window")), - "The start time " :: " must be less than the slideDuration " :: Nil + "The absolute value of start time " :: " must be less than the slideDuration " :: Nil + ) + + errorTest( + "SPARK-21590: absolute value of start time greater than slide duration in time window", + testRelation.select( + TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "-1 minute").as("window")), + "The absolute value of start time " :: " must be less than the slideDuration " :: Nil + ) + + errorTest( + "SPARK-21590: absolute value of start time equal to slide duration in time window", + testRelation.select( + TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "-1 second").as("window")), + "The absolute value of start time " :: " must be less than the slideDuration " :: Nil ) errorTest( @@ -372,13 +386,6 @@ class AnalysisErrorSuite extends AnalysisTest { "The slide duration" :: " must be greater than 0." :: Nil ) - errorTest( - "negative start time in time window", - testRelation.select( - TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "-5 second").as("window")), - "The start time" :: "must be greater than or equal to 0." :: Nil - ) - errorTest( "generator nested in expressions", listRelation.select(Explode('list) + 1), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala index d6c8fcf291842..a73d48e5a3c3b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala @@ -77,6 +77,19 @@ class TimeWindowSuite extends SparkFunSuite with ExpressionEvalHelper with Priva } } + test("SPARK-21590: Start time works with negative values and return microseconds") { + val validDuration = "10 minutes" + for ((text, seconds) <- Seq( + ("-10 seconds", -10000000), // -1e7 + ("-1 minute", -60000000), + ("-1 hour", -3600000000L))) { // -6e7 + assert(TimeWindow(Literal(10L), validDuration, validDuration, "interval " + text).startTime + === seconds) + assert(TimeWindow(Literal(10L), validDuration, validDuration, text).startTime + === seconds) + } + } + private val parseExpression = PrivateMethod[Long]('parseExpression) test("parse sql expression for duration in microseconds - string") { From 06b37d1c8aae2892c7bde3fe73bac3437ed64af1 Mon Sep 17 00:00:00 2001 From: HanShuliang Date: Sat, 12 Aug 2017 23:50:41 +0800 Subject: [PATCH 2/2] add new unit tests in DataFrameTimeWindowingSuite --- .../sql/DataFrameTimeWindowingSuite.scala | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index 6fe356877c268..2953425b1db49 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -43,6 +43,22 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B ) } + test("SPARK-21590: tumbling window using negative start time") { + val df = Seq( + ("2016-03-27 19:39:30", 1, "a"), + ("2016-03-27 19:39:25", 2, "a")).toDF("time", "value", "id") + + checkAnswer( + df.groupBy(window($"time", "10 seconds", "10 seconds", "-5 seconds")) + .agg(count("*").as("counts")) + .orderBy($"window.start".asc) + .select($"window.start".cast("string"), $"window.end".cast("string"), $"counts"), + Seq( + Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 2) + ) + ) + } + test("tumbling window groupBy statement") { val df = Seq( ("2016-03-27 19:39:34", 1, "a"), @@ -72,6 +88,20 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B Seq(Row(1), Row(1), Row(1))) } + test("SPARK-21590: tumbling window groupBy statement with negative startTime") { + val df = Seq( + ("2016-03-27 19:39:34", 1, "a"), + ("2016-03-27 19:39:56", 2, "a"), + ("2016-03-27 19:39:27", 4, "b")).toDF("time", "value", "id") + + checkAnswer( + df.groupBy(window($"time", "10 seconds", "10 seconds", "-5 seconds"), $"id") + .agg(count("*").as("counts")) + .orderBy($"window.start".asc) + .select("counts"), + Seq(Row(1), Row(1), Row(1))) + } + test("tumbling window with multi-column projection") { val df = Seq( ("2016-03-27 19:39:34", 1, "a"), @@ -309,4 +339,19 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B ) } } + + test("SPARK-21590: time window in SQL with three expressions including negative start time") { + withTempTable { table => + checkAnswer( + spark.sql( + s"""select window(time, "10 seconds", 10000000, "-5 seconds"), value from $table""") + .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), + Seq( + Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 1), + Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 4), + Row("2016-03-27 19:39:55", "2016-03-27 19:40:05", 2) + ) + ) + } + } }