From 7a81245345f2d6124423161786bb0d9f1c278ab8 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Sun, 19 Jul 2015 16:29:50 -0700 Subject: [PATCH] [SPARK-8638] [SQL] Window Function Performance Improvements - Cleanup This PR contains a few clean-ups that are a part of SPARK-8638: a few style issues got fixed, and a few tests were moved. Git commit message is wrong BTW :(... Author: Herman van Hovell Closes #7513 from hvanhovell/SPARK-8638-cleanup and squashes the following commits: 4e69d08 [Herman van Hovell] Fixed Perfomance Regression for Shrinking Window Frames (+Rebase) --- .../apache/spark/sql/execution/Window.scala | 14 ++-- .../sql/hive/HiveDataFrameWindowSuite.scala | 43 ++++++++++ .../sql/hive/execution/WindowSuite.scala | 79 ------------------- 3 files changed, 51 insertions(+), 85 deletions(-) delete mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index a054f52b8b489..de04132eb1104 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -118,22 +118,24 @@ case class Window( val exprs = windowSpec.orderSpec.map(_.child) val projection = newMutableProjection(exprs, child.output) (windowSpec.orderSpec, projection(), projection()) - } - else if (windowSpec.orderSpec.size == 1) { + } else if (windowSpec.orderSpec.size == 1) { // Use only the first order expression when the offset is non-null. val sortExpr = windowSpec.orderSpec.head val expr = sortExpr.child // Create the projection which returns the current 'value'. val current = newMutableProjection(expr :: Nil, child.output)() // Flip the sign of the offset when processing the order is descending - val boundOffset = if (sortExpr.direction == Descending) -offset - else offset + val boundOffset = + if (sortExpr.direction == Descending) { + -offset + } else { + offset + } // Create the projection which returns the current 'value' modified by adding the offset. val boundExpr = Add(expr, Cast(Literal.create(boundOffset, IntegerType), expr.dataType)) val bound = newMutableProjection(boundExpr :: Nil, child.output)() (sortExpr :: Nil, current, bound) - } - else { + } else { sys.error("Non-Zero range offsets are not supported for windows " + "with multiple order expressions.") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala index 15b5f418f0a8c..c177cbdd991cf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala @@ -212,4 +212,47 @@ class HiveDataFrameWindowSuite extends QueryTest { | (PARTITION BY value ORDER BY key RANGE BETWEEN 1 preceding and current row) | FROM window_table""".stripMargin).collect()) } + + test("reverse sliding range frame") { + val df = Seq( + (1, "Thin", "Cell Phone", 6000), + (2, "Normal", "Tablet", 1500), + (3, "Mini", "Tablet", 5500), + (4, "Ultra thin", "Cell Phone", 5500), + (5, "Very thin", "Cell Phone", 6000), + (6, "Big", "Tablet", 2500), + (7, "Bendable", "Cell Phone", 3000), + (8, "Foldable", "Cell Phone", 3000), + (9, "Pro", "Tablet", 4500), + (10, "Pro2", "Tablet", 6500)). + toDF("id", "product", "category", "revenue") + val window = Window. + partitionBy($"category"). + orderBy($"revenue".desc). + rangeBetween(-2000L, 1000L) + checkAnswer( + df.select( + $"id", + avg($"revenue").over(window).cast("int")), + Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) :: + Row(4, 5833) :: Row(5, 5833) :: Row(6, 2833) :: + Row(7, 3000) :: Row(8, 3000) :: Row(9, 5500) :: + Row(10, 6000) :: Nil) + } + + // This is here to illustrate the fact that reverse order also reverses offsets. + test("reverse unbounded range frame") { + val df = Seq(1, 2, 4, 3, 2, 1). + map(Tuple1.apply). + toDF("value") + val window = Window.orderBy($"value".desc) + checkAnswer( + df.select( + $"value", + sum($"value").over(window.rangeBetween(Long.MinValue, 1)), + sum($"value").over(window.rangeBetween(1, Long.MaxValue))), + Row(1, 13, null) :: Row(2, 13, 2) :: Row(4, 7, 9) :: + Row(3, 11, 6) :: Row(2, 13, 2) :: Row(1, 13, null) :: Nil) + + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala deleted file mode 100644 index a089d0d165195..0000000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution - -import org.apache.spark.sql.{Row, QueryTest} -import org.apache.spark.sql.expressions.Window -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.test.TestHive.implicits._ - -/** - * Window expressions are tested extensively by the following test suites: - * [[org.apache.spark.sql.hive.HiveDataFrameWindowSuite]] - * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryWithoutCodeGenSuite]] - * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryFileWithoutCodeGenSuite]] - * However these suites do not cover all possible (i.e. more exotic) settings. This suite fill - * this gap. - * - * TODO Move this class to the sql/core project when we move to Native Spark UDAFs. - */ -class WindowSuite extends QueryTest { - - test("reverse sliding range frame") { - val df = Seq( - (1, "Thin", "Cell Phone", 6000), - (2, "Normal", "Tablet", 1500), - (3, "Mini", "Tablet", 5500), - (4, "Ultra thin", "Cell Phone", 5500), - (5, "Very thin", "Cell Phone", 6000), - (6, "Big", "Tablet", 2500), - (7, "Bendable", "Cell Phone", 3000), - (8, "Foldable", "Cell Phone", 3000), - (9, "Pro", "Tablet", 4500), - (10, "Pro2", "Tablet", 6500)). - toDF("id", "product", "category", "revenue") - val window = Window. - partitionBy($"category"). - orderBy($"revenue".desc). - rangeBetween(-2000L, 1000L) - checkAnswer( - df.select( - $"id", - avg($"revenue").over(window).cast("int")), - Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) :: - Row(4, 5833) :: Row(5, 5833) :: Row(6, 2833) :: - Row(7, 3000) :: Row(8, 3000) :: Row(9, 5500) :: - Row(10, 6000) :: Nil) - } - - // This is here to illustrate the fact that reverse order also reverses offsets. - test("reverse unbounded range frame") { - val df = Seq(1, 2, 4, 3, 2, 1). - map(Tuple1.apply). - toDF("value") - val window = Window.orderBy($"value".desc) - checkAnswer( - df.select( - $"value", - sum($"value").over(window.rangeBetween(Long.MinValue, 1)), - sum($"value").over(window.rangeBetween(1, Long.MaxValue))), - Row(1, 13, null) :: Row(2, 13, 2) :: Row(4, 7, 9) :: - Row(3, 11, 6) :: Row(2, 13, 2) :: Row(1, 13, null) :: Nil) - - } -}