Skip to content

Commit

Permalink
SQL with windowing function should be able to refer column in inner s…
Browse files Browse the repository at this point in the history
…elect block.
  • Loading branch information
viirya committed Oct 7, 2015
1 parent ffe6831 commit 15b40ee
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 0 deletions.
Expand Up @@ -761,6 +761,13 @@ class Analyzer(
}.isDefined
}

private def hasAggregation(expr: NamedExpression): Boolean = {
expr.find {
case agg: AggregateExpression => true
case _ => false
}.isDefined
}

/**
* From a Seq of [[NamedExpression]]s, extract expressions containing window expressions and
* other regular expressions that do not contain any window expression. For example, for
Expand Down Expand Up @@ -831,6 +838,13 @@ class Analyzer(
val withName = Alias(agg, s"_w${extractedExprBuffer.length}")()
extractedExprBuffer += withName
withName.toAttribute

case ne: Alias if hasWindowFunction(ne) && !hasAggregation(ne) =>
ne.children.map(_.transform {
case e: NamedExpression => extractExpr(e)
})
ne

}.asInstanceOf[NamedExpression]
}

Expand Down
Expand Up @@ -833,6 +833,33 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
).map(i => Row(i._1, i._2, i._3)))
}

test("window function: refer column in inner select block") {
val data = Seq(
WindowData(1, "a", 5),
WindowData(2, "a", 6),
WindowData(3, "b", 7),
WindowData(4, "b", 8),
WindowData(5, "c", 9),
WindowData(6, "c", 10)
)
sparkContext.parallelize(data).toDF().registerTempTable("windowData")

checkAnswer(
sql(
"""
|select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1
|from (select month, area, product, 1 as tmp1 from windowData) tmp
""".stripMargin),
Seq(
("a", 2),
("a", 3),
("b", 2),
("b", 3),
("c", 2),
("c", 3)
).map(i => Row(i._1, i._2)))
}

test("window function: partition and order expressions") {
val data = Seq(
WindowData(1, "a", 5),
Expand Down

0 comments on commit 15b40ee

Please sign in to comment.