Skip to content

Commit

Permalink
Sorting WrappedArrays within Rows before compare.
Browse files Browse the repository at this point in the history
  • Loading branch information
attilapiros committed Jan 2, 2018
1 parent 03d6159 commit caafba7
Showing 1 changed file with 45 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql

import java.sql.{Date, Timestamp}

import scala.collection.mutable

import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
import org.apache.spark.sql.functions._
Expand All @@ -33,6 +35,11 @@ import org.apache.spark.unsafe.types.CalendarInterval
class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext {
import testImplicits._

private def sortWrappedArrayInRow(d: DataFrame) = d.map {
case Row(key: String, unsorted: mutable.WrappedArray[String]) =>
(key, unsorted.sorted)
}.toDF("key", "sorted")

test("reuse window partitionBy") {
val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
val w = Window.partitionBy("key").orderBy("value")
Expand Down Expand Up @@ -253,10 +260,11 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext {
("h", "p3", "20"),
("i", "p4", null)).toDF("key", "partition", "value")
checkAnswer(
df.select(
$"key",
collect_list("value").over(Window.partitionBy($"partition").orderBy($"value")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))),
sortWrappedArrayInRow(
df.select(
$"key",
collect_list("value").over(Window.partitionBy($"partition").orderBy($"value")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)))),
Seq(
Row("a", Array("1", "2", "2", "3")),
Row("b", Array("1", "2", "2", "3")),
Expand All @@ -281,46 +289,48 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext {
("h", "p3", "20"),
("i", "p4", null)).toDF("key", "partition", "value")
checkAnswer(
df.select(
$"key",
collect_list("value").over(Window.partitionBy($"partition").orderBy($"value".desc)
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))),
sortWrappedArrayInRow(
df.select(
$"key",
collect_list("value").over(Window.partitionBy($"partition").orderBy($"value".desc)
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)))),
Seq(
Row("a", Array("3", "2", "2", "1")),
Row("b", Array("3", "2", "2", "1")),
Row("c", Array("3", "2", "2", "1")),
Row("d", Array("3", "2", "2", "1")),
Row("e", Array("3", "2", "2", "1")),
Row("f", Array("11", "10")),
Row("g", Array("11", "10")),
Row("a", Array("1", "2", "2", "3")),
Row("b", Array("1", "2", "2", "3")),
Row("c", Array("1", "2", "2", "3")),
Row("d", Array("1", "2", "2", "3")),
Row("e", Array("1", "2", "2", "3")),
Row("f", Array("10", "11")),
Row("g", Array("10", "11")),
Row("h", Array("20")),
Row("i", Array())))
}

test("collect_set in window") {
val df = Seq(
("a", "p1", 1),
("b", "p1", 2),
("c", "p1", 2),
("d", "p1", 3),
("e", "p1", 3),
("f", "p2", 10),
("g", "p2", 11),
("h", "p3", 20)).toDF("key", "partition", "value")
("a", "p1", "1"),
("b", "p1", "2"),
("c", "p1", "2"),
("d", "p1", "3"),
("e", "p1", "3"),
("f", "p2", "10"),
("g", "p2", "11"),
("h", "p3", "20")).toDF("key", "partition", "value")
checkAnswer(
df.select(
$"key",
collect_set("value").over(Window.partitionBy($"partition").orderBy($"value")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))),
sortWrappedArrayInRow(
df.select(
$"key",
collect_set("value").over(Window.partitionBy($"partition").orderBy($"value")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)))),
Seq(
Row("a", Array(1, 2, 3)),
Row("b", Array(1, 2, 3)),
Row("c", Array(1, 2, 3)),
Row("d", Array(1, 2, 3)),
Row("e", Array(1, 2, 3)),
Row("f", Array(10, 11)),
Row("g", Array(10, 11)),
Row("h", Array(20))))
Row("a", Array("1", "2", "3")),
Row("b", Array("1", "2", "3")),
Row("c", Array("1", "2", "3")),
Row("d", Array("1", "2", "3")),
Row("e", Array("1", "2", "3")),
Row("f", Array("10", "11")),
Row("g", Array("10", "11")),
Row("h", Array("20"))))
}

test("skewness and kurtosis functions in window") {
Expand Down

0 comments on commit caafba7

Please sign in to comment.