Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13739] [SQL] Push Predicate Through Window #11635

Closed
wants to merge 71 commits into from
Closed
Show file tree
Hide file tree
Changes from 68 commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
01e4cdf
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 13, 2015
6835704
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
9180687
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
b38a21e
SPARK-11633
gatorsmile Nov 17, 2015
d2b84af
Merge remote-tracking branch 'upstream/master' into joinMakeCopy
gatorsmile Nov 17, 2015
fda8025
Merge remote-tracking branch 'upstream/master'
gatorspark Nov 17, 2015
ac0dccd
Merge branch 'master' of https://github.com/gatorsmile/spark
gatorspark Nov 17, 2015
6e0018b
Merge remote-tracking branch 'upstream/master'
Nov 20, 2015
0546772
converge
gatorsmile Nov 20, 2015
b37a64f
converge
gatorsmile Nov 20, 2015
c2a872c
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
ab6dbd7
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
4276356
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
2dab708
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 7, 2016
0458770
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 8, 2016
1debdfa
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 9, 2016
763706d
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 14, 2016
4de6ec1
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 18, 2016
9422a4f
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 19, 2016
52bdf48
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 20, 2016
1e95df3
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 23, 2016
fab24cf
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 1, 2016
8b2e33b
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 5, 2016
2ee1876
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 11, 2016
b9f0090
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 12, 2016
ade6f7e
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 15, 2016
9fd63d2
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 19, 2016
5199d49
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 22, 2016
404214c
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 23, 2016
c001dd9
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 25, 2016
59daa48
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 5, 2016
41d5f64
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 7, 2016
472a6e3
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 10, 2016
458f7be
ppd for window
gatorsmile Mar 10, 2016
92136dd
Merge remote-tracking branch 'upstream/master' into pushPredicateThro…
gatorsmile Mar 10, 2016
f401d8b
only partitioning key
gatorsmile Mar 10, 2016
0fba10a
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 12, 2016
3aea1da
Merge branch 'pushPredicateThroughWindow' into pushPredicateThroughWi…
gatorsmile Mar 12, 2016
d420246
add windowExpr and windowSpec to DSL
gatorsmile Mar 12, 2016
c05b4ae
remove useless import
gatorsmile Mar 12, 2016
6db1940
added more test cases
gatorsmile Mar 14, 2016
8fa0294
added two more test case.
gatorsmile Mar 21, 2016
cbf73b3
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 21, 2016
c08f561
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 22, 2016
474df88
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 22, 2016
3d9828d
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 24, 2016
72d2361
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 26, 2016
07afea5
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 29, 2016
8bf2007
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 30, 2016
87a165b
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 31, 2016
b9359cd
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 1, 2016
b0d7b3b
Merge branch 'pushPredicateThroughWindowNew' into pushPredicateThroug…
gatorsmile Apr 1, 2016
65bd090
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 5, 2016
babf2da
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 5, 2016
9e09469
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 6, 2016
50a8e4a
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 6, 2016
f3337fa
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 10, 2016
09cc36d
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 12, 2016
83a1915
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 14, 2016
0483145
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 19, 2016
236a5f4
Merge remote-tracking branch 'upstream/master'
gatorsmile Apr 20, 2016
417bdb4
merge
gatorsmile Apr 20, 2016
cc5f0bd
merge
gatorsmile Apr 20, 2016
436359f
style fix.
gatorsmile Apr 20, 2016
fae2694
address comments.
gatorsmile Apr 20, 2016
875d6b6
style fix.
gatorsmile Apr 20, 2016
0469923
style fix
gatorsmile Apr 20, 2016
5c4f4d3
address comments.
gatorsmile Apr 20, 2016
c4dedd2
address comments and added more test cases.
gatorsmile Apr 20, 2016
3eaeaa5
Merge remote-tracking branch 'upstream/master' into pushPredicateThro…
gatorsmile Apr 25, 2016
e427ce9
address comments.
gatorsmile Apr 25, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,15 @@ package object dsl {
Invoke(function, "apply", returnType, argument :: Nil)
}

def windowSpec(
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
frame: WindowFrame): WindowSpecDefinition =
WindowSpecDefinition(partitionSpec, orderSpec, frame)

def windowExpr(windowFunc: Expression, windowSpec: WindowSpecDefinition): WindowExpression =
WindowExpression(windowFunc, windowSpec)

implicit class DslSymbol(sym: Symbol) extends ImplicitAttribute { def s: String = sym.name }
// TODO more implicit class for literal?
implicit class DslString(val s: String) extends ImplicitOperators {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,25 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {

project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))

// Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be
// pushed beneath must satisfy three conditions:
// 1. All the columns are part of window partitioning key.
// 2. Window partitioning key should be just a sequence of [[AttributeReference]].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is almost guaranteed by the analyzer.

Copy link
Member Author

@gatorsmile gatorsmile Apr 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose is to prohibit the push down of predicate (key + value) > '2' when the partition by is key + value in the following example,

select * from (SELECT key, value, sum(key) over(partition by key + value) as c1 from src)r1 where (key + value) > '2'

The example is also copied from the test case of Hive. https://issues.apache.org/jira/secure/attachment/12788757/HIVE-12808.05.patch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this case? It appears like key + value is also constant when evaluating window functions. It should be OK to push it down? This restriction could be also related to Hive implementation?

Copy link
Contributor

@hvanhovell hvanhovell Apr 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile this restriction is probably more related to the fact that if we push down an entire expression, e.g.: a + b, we have to evaluate the expression twice, once in the Filter and once in the Window function. The double evaluation could be avoided by planning a Project.

I am pretty sure that we move all expressions used in Window clauses into an underlying Project during Analysis. So this shouldn't be to big of a problem.

[I updated the comment]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, thank you very much! I just added an extra condition to ensure that Analyzer converts all the compound expressions to alias. Added a couple of test cases to ensure it.

To enable predicate push down when the partitioning columns is a + b and the predicate is a + b > 3, we need to add a rule in Analyzer for converting the expressions to the underlying alias that has the exactly same original expressions. This will be submitted in a separate PR.

// 3. Deterministic
case filter @ Filter(condition, w: Window)
if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))
val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
cond.references.subsetOf(partitionAttrs) && cond.deterministic
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not right. Will fix it later.

}
if (pushDown.nonEmpty) {
val pushDownPredicate = pushDown.reduce(And)
val newWindow = w.copy(child = Filter(pushDownPredicate, w.child))
if (stayUp.isEmpty) newWindow else Filter(stayUp.reduce(And), newWindow)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Math.pow(NIT, 100): What does the style guide say about ternary expression onliners?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nevermind, I looked it up. It is allowed.

} else {
filter
}

case filter @ Filter(condition, aggregate: Aggregate) =>
// Find all the aliased expressions in the aggregate list that don't include any actual
// AggregateExpression, and create a map from the alias to the expression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Count}
import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor
Expand Down Expand Up @@ -267,61 +266,39 @@ class ColumnPruningSuite extends PlanTest {

test("Column pruning on Window with useless aggregate functions") {
val input = LocalRelation('a.int, 'b.string, 'c.double, 'd.int)
val winSpec = windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
val winExpr = windowExpr(count('b), winSpec)

val originalQuery =
input.groupBy('a, 'c, 'd)('a, 'c, 'd,
WindowExpression(
AggregateExpression(Count('b), Complete, isDistinct = false),
WindowSpecDefinition( 'a :: Nil,
SortOrder('b, Ascending) :: Nil,
UnspecifiedFrame)).as('window)).select('a, 'c)

val originalQuery = input.groupBy('a, 'c, 'd)('a, 'c, 'd, winExpr.as('window)).select('a, 'c)
val correctAnswer = input.select('a, 'c, 'd).groupBy('a, 'c, 'd)('a, 'c).analyze

val optimized = Optimize.execute(originalQuery.analyze)

comparePlans(optimized, correctAnswer)
}

test("Column pruning on Window with selected agg expressions") {
val input = LocalRelation('a.int, 'b.string, 'c.double, 'd.int)
val winSpec = windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
val winExpr = windowExpr(count('b), winSpec)

val originalQuery =
input.select('a, 'b, 'c, 'd,
WindowExpression(
AggregateExpression(Count('b), Complete, isDistinct = false),
WindowSpecDefinition( 'a :: Nil,
SortOrder('b, Ascending) :: Nil,
UnspecifiedFrame)).as('window)).where('window > 1).select('a, 'c)

input.select('a, 'b, 'c, 'd, winExpr.as('window)).where('window > 1).select('a, 'c)
val correctAnswer =
input.select('a, 'b, 'c)
.window(WindowExpression(
AggregateExpression(Count('b), Complete, isDistinct = false),
WindowSpecDefinition( 'a :: Nil,
SortOrder('b, Ascending) :: Nil,
UnspecifiedFrame)).as('window) :: Nil,
'a :: Nil, 'b.asc :: Nil)
.window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
.where('window > 1).select('a, 'c).analyze

val optimized = Optimize.execute(originalQuery.analyze)

comparePlans(optimized, correctAnswer)
}

test("Column pruning on Window in select") {
val input = LocalRelation('a.int, 'b.string, 'c.double, 'd.int)
val winSpec = windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
val winExpr = windowExpr(count('b), winSpec)

val originalQuery =
input.select('a, 'b, 'c, 'd,
WindowExpression(
AggregateExpression(Count('b), Complete, isDistinct = false),
WindowSpecDefinition( 'a :: Nil,
SortOrder('b, Ascending) :: Nil,
UnspecifiedFrame)).as('window)).select('a, 'c)

val originalQuery = input.select('a, 'b, 'c, 'd, winExpr.as('window)).select('a, 'c)
val correctAnswer = input.select('a, 'c).analyze

val optimized = Optimize.execute(originalQuery.analyze)

comparePlans(optimized, correctAnswer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,4 +758,142 @@ class FilterPushdownSuite extends PlanTest {
val correctedAnswer = agg.copy(child = agg.child.where(a > 1 && b > 2)).analyze
comparePlans(optimized, correctedAnswer)
}

test("Window: predicate push down -- basic") {
val winExpr = windowExpr(count('b), windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame))

val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1)
val correctAnswer = testRelation
.where('a > 1).select('a, 'b, 'c)
.window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
.select('a, 'b, 'c, 'window).analyze

comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
}

test("Window: predicate push down -- predicates with compound predicate using only one column") {
val winExpr =
windowExpr(count('b), windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame))

val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a * 3 > 15)
val correctAnswer = testRelation
.where('a * 3 > 15).select('a, 'b, 'c)
.window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
.select('a, 'b, 'c, 'window).analyze

comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
}

test("Window: predicate push down -- multi window expressions with the same window spec") {
val winSpec = windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
val winExpr1 = windowExpr(count('b), winSpec)
val winExpr2 = windowExpr(sum('b), winSpec)
val originalQuery = testRelation
.select('a, 'b, 'c, winExpr1.as('window1), winExpr2.as('window2)).where('a > 1)

val correctAnswer = testRelation
.where('a > 1).select('a, 'b, 'c)
.window(winExpr1.as('window1) :: winExpr2.as('window2) :: Nil,
'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
.select('a, 'b, 'c, 'window1, 'window2).analyze

comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
}

test("Window: predicate push down -- multi window specification - 1") {
// order by clauses are different between winSpec1 and winSpec2
val winSpec1 = windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
val winExpr1 = windowExpr(count('b), winSpec1)
val winSpec2 = windowSpec('a.attr :: 'b.attr :: Nil, 'a.asc :: Nil, UnspecifiedFrame)
val winExpr2 = windowExpr(count('b), winSpec2)
val originalQuery = testRelation
.select('a, 'b, 'c, winExpr1.as('window1), winExpr2.as('window2)).where('a > 1)

val correctAnswer1 = testRelation
.where('a > 1).select('a, 'b, 'c)
.window(winExpr1.as('window1) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
.window(winExpr2.as('window2) :: Nil, 'a.attr :: 'b.attr :: Nil, 'a.asc :: Nil)
.select('a, 'b, 'c, 'window1, 'window2).analyze

val correctAnswer2 = testRelation
.where('a > 1).select('a, 'b, 'c)
.window(winExpr2.as('window2) :: Nil, 'a.attr :: 'b.attr :: Nil, 'a.asc :: Nil)
.window(winExpr1.as('window1) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
.select('a, 'b, 'c, 'window1, 'window2).analyze

val optimizedQuery = Optimize.execute(originalQuery.analyze)
try {
Copy link
Contributor

@hvanhovell hvanhovell Apr 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you doing this because you cannot predict how the Analyzer will structure the window functions? In this case you could just have the analyzer take care of it, by writing:

testRelation.where('a > 1).select('a, 'b, 'c, winExpr1.as('window1), winExpr2.as('window2)).analyze

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The non-deterministic results are introduced when we grouping extractedWindowExprBuffer based on their Partition and Order Specs.

val groupedWindowExpressions = extractedWindowExprBuffer.groupBy { expr =>
val distinctWindowSpec = expr.collect {
case window: WindowExpression => window.windowSpec
}.distinct
// We do a final check and see if we only have a single Window Spec defined in an
// expressions.
if (distinctWindowSpec.length == 0 ) {
failAnalysis(s"$expr does not have any WindowExpression.")
} else if (distinctWindowSpec.length > 1) {
// newExpressionsWithWindowFunctions only have expressions with a single
// WindowExpression. If we reach here, we have a bug.
failAnalysis(s"$expr has multiple Window Specifications ($distinctWindowSpec)." +
s"Please file a bug report with this error message, stack trace, and the query.")
} else {
val spec = distinctWindowSpec.head
(spec.partitionSpec, spec.orderSpec)
}
}.toSeq

I did not change the behavior of the source codes since the orders of Window only matter for verifying the test cases. The order will not change the results.

Previously, I saw different orders in multiple runs. Thus, I am afraid the way you mentioned does not work too.

comparePlans(optimizedQuery, correctAnswer1)
} catch {
case ae: Throwable => comparePlans(optimizedQuery, correctAnswer2)
}
}

test("Window: predicate push down -- multi window specification - 2") {
// partitioning clauses are different between winSpec1 and winSpec2
val winSpec1 = windowSpec('a.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
val winExpr1 = windowExpr(count('b), winSpec1)
val winSpec2 = windowSpec('b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
val winExpr2 = windowExpr(count('a), winSpec2)
val originalQuery = testRelation
.select('a, winExpr1.as('window1), 'b, 'c, winExpr2.as('window2)).where('b > 1)

val correctAnswer1 = testRelation.select('a, 'b, 'c)
.window(winExpr1.as('window1) :: Nil, 'a.attr :: Nil, 'b.asc :: Nil)
.where('b > 1)
.window(winExpr2.as('window2) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil)
.select('a, 'window1, 'b, 'c, 'window2).analyze

val correctAnswer2 = testRelation.select('a, 'b, 'c)
.window(winExpr2.as('window2) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil)
.window(winExpr1.as('window1) :: Nil, 'a.attr :: Nil, 'b.asc :: Nil)
.where('b > 1)
.select('a, 'window1, 'b, 'c, 'window2).analyze

val optimizedQuery = Optimize.execute(originalQuery.analyze)
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a line of documentation to explain why you are using try/catch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do it.

comparePlans(optimizedQuery, correctAnswer1)
} catch {
case ae: Throwable => comparePlans(optimizedQuery, correctAnswer2)
}
}

test("Window: predicate push down -- predicates with multiple partitioning columns") {
val winExpr =
windowExpr(count('b), windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame))

val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1)
val correctAnswer = testRelation
.where('a + 'b > 1).select('a, 'b, 'c)
.window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
.select('a, 'b, 'c, 'window).analyze

comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
}

test("Window: no predicate push down -- predicates are not from partitioning keys") {
val winExpr =
windowExpr(count('b), windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame))

val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('c > 1)
val correctAnswer = testRelation.select('a, 'b, 'c)
.window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
.where('c > 1).select('a, 'b, 'c, 'window).analyze

comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
}

test("Window: no predicate push down -- compound partition key") {
val winSpec = windowSpec('a.attr + 'b.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
val winExpr = windowExpr(count('b), winSpec)
val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1)

val winSpecAnalyzed = windowSpec('_w0.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed)
val correctAnswer = testRelation.select('a, 'b, 'c, ('a + 'b).as("_w0"))
.window(winExprAnalyzed.as('window) :: Nil, '_w0 :: 'b.attr :: Nil, 'b.asc :: Nil)
.where('a > 1).select('a, 'b, 'c, 'window).analyze

comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -354,4 +354,38 @@ class DataFrameWindowSuite extends QueryTest with SharedSQLContext {
val df = src.select($"*", max("c").over(winSpec) as "max")
checkAnswer(df, Row(5, Row(0, 3), 5))
}

test("aggregation and rows between with unbounded + predicate pushdown") {
val df = Seq((1, "1"), (2, "2"), (2, "3"), (1, "3"), (3, "2"), (4, "3")).toDF("key", "value")
df.registerTempTable("window_table")
val selectList = Seq($"key", $"value",
last("key").over(
Window.partitionBy($"value").orderBy($"key").rowsBetween(0, Long.MaxValue)),
last("key").over(
Window.partitionBy($"value").orderBy($"key").rowsBetween(Long.MinValue, 0)),
last("key").over(Window.partitionBy($"value").orderBy($"key").rowsBetween(-1, 1)))

checkAnswer(
df.select(selectList: _*).where($"value" < "3"),
Seq(Row(1, "1", 1, 1, 1), Row(2, "2", 3, 2, 3), Row(3, "2", 3, 3, 3)))
}

test("aggregation and range between with unbounded + predicate pushdown") {
val df = Seq((5, "1"), (5, "2"), (4, "2"), (6, "2"), (3, "1"), (2, "2")).toDF("key", "value")
df.registerTempTable("window_table")
val selectList = Seq($"key", $"value",
last("value").over(
Window.partitionBy($"value").orderBy($"key").rangeBetween(-2, -1)).equalTo("2")
.as("last_v"),
avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(Long.MinValue, 1))
.as("avg_key1"),
avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(0, Long.MaxValue))
.as("avg_key2"),
avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(-1, 1))
.as("avg_key3"))

checkAnswer(
df.select(selectList: _*).where($"value" < 2),
Seq(Row(3, "1", null, 3.0, 4.0, 3.0), Row(5, "1", false, 4.0, 5.0, 5.0)))
}
}