Skip to content

Commit

Permalink
[FLINK-6075] [table] Add ORDER BY support for streaming table.
Browse files Browse the repository at this point in the history
This closes #3889.
  • Loading branch information
rtudoran authored and fhueske committed Jun 30, 2017
1 parent d7d10a1 commit b8c8f20
Show file tree
Hide file tree
Showing 15 changed files with 1,367 additions and 59 deletions.
6 changes: 4 additions & 2 deletions docs/dev/table/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -486,13 +486,15 @@ FROM (
<tr>
<td>
<strong>Order By</strong><br>
<span class="label label-primary">Batch</span>
<span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
</td>
<td>
<b>Note:</b> The result of streaming queries must be primarily sorted on an ascending <a href="streaming.html#time-attributes">time attribute</a>. Additional sorting attributes are supported.

{% highlight sql %}
SELECT *
FROM Orders
ORDER BY users
ORDER BY orderTime
{% endhighlight %}
</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,11 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
override def visit(minus: LogicalMinus): RelNode =
throw new TableException("Logical minus in a stream environment is not supported yet.")

override def visit(sort: LogicalSort): RelNode =
throw new TableException("Logical sort in a stream environment is not supported yet.")
override def visit(sort: LogicalSort): RelNode = {

val input = sort.getInput.accept(this)
LogicalSort.create(input, sort.collation, sort.offset, sort.fetch)
}

override def visit(`match`: LogicalMatch): RelNode =
throw new TableException("Logical match in a stream environment is not supported yet.")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.plan.nodes

import org.apache.calcite.rex.{RexLiteral, RexNode}
import org.apache.calcite.rel.RelFieldCollation.Direction
import org.apache.calcite.rel.`type`._
import scala.collection.JavaConverters._
import org.apache.flink.api.common.operators.Order
import org.apache.calcite.rel.{RelWriter, RelCollation}

/**
* Common methods for Flink sort operators.
*/
trait CommonSort {

private def offsetToString(offset: RexNode): String = {
val offsetToString = s"$offset"
offsetToString
}

private def sortFieldsToString(
collationSort: RelCollation,
rowRelDataType: RelDataType): String = {
val fieldCollations = collationSort.getFieldCollations.asScala
.map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))

fieldCollations
.map(col => s"${rowRelDataType.getFieldNames.get(col._1)} ${col._2.getShortName}" )
.mkString(", ")
}

private[flink] def directionToOrder(direction: Direction) = {
direction match {
case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING
case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING
case _ => throw new IllegalArgumentException("Unsupported direction.")
}
}

private def fetchToString(fetch: RexNode, offset: RexNode): String = {
val limitEnd = getFetchLimitEnd(fetch, offset)

if (limitEnd == Long.MaxValue) {
"unlimited"
} else {
s"$limitEnd"
}
}

private[flink] def getFetchLimitEnd (fetch: RexNode, offset: RexNode): Long = {
if (fetch != null) {
RexLiteral.intValue(fetch) + getFetchLimitStart(offset)
} else {
Long.MaxValue
}
}

private[flink] def getFetchLimitStart (offset: RexNode): Long = {
if (offset != null) {
RexLiteral.intValue(offset)
} else {
0L
}
}

private[flink] def sortToString(
rowRelDataType: RelDataType,
sortCollation: RelCollation,
sortOffset: RexNode,
sortFetch: RexNode): String = {
s"Sort(by: ($$sortFieldsToString(sortCollation, rowRelDataType))," +
(if (sortOffset != null) {
" offset: $offsetToString(sortOffset),"
} else {
""
}) +
(if (sortFetch != null) {
" fetch: $fetchToString(sortFetch, sortOffset))"
} else {
""
})
}

private[flink] def sortExplainTerms(
pw: RelWriter,
rowRelDataType: RelDataType,
sortCollation: RelCollation,
sortOffset: RexNode,
sortFetch: RexNode) : RelWriter = {

pw
.item("orderBy", sortFieldsToString(sortCollation, rowRelDataType))
.itemIf("offset", offsetToString(sortOffset), sortOffset != null)
.itemIf("fetch", fetchToString(sortFetch, sortOffset), sortFetch != null)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
import org.apache.flink.table.runtime.{CountPartitionFunction, LimitFilterFunction}
import org.apache.flink.types.Row
import org.apache.flink.table.plan.nodes.CommonSort

import scala.collection.JavaConverters._

Expand All @@ -43,19 +44,12 @@ class DataSetSort(
offset: RexNode,
fetch: RexNode)
extends SingleRel(cluster, traitSet, inp)
with CommonSort
with DataSetRel {

private val limitStart: Long = if (offset != null) {
RexLiteral.intValue(offset)
} else {
0L
}
private val limitStart: Long = getFetchLimitStart(offset)

private val limitEnd: Long = if (fetch != null) {
RexLiteral.intValue(fetch) + limitStart
} else {
Long.MaxValue
}
private val limitEnd: Long = getFetchLimitEnd(fetch, offset)

override def deriveRowType(): RelDataType = rowRelDataType

Expand Down Expand Up @@ -127,7 +121,7 @@ class DataSetSort(
limitEnd,
broadcastName)

val limitName = s"offset: $offsetToString, fetch: $fetchToString"
val limitName = s"offset: $$offsetToString(offset), fetch: $$fetchToString(fetch, offset))"

partitionedDs
.filter(limitFunction)
Expand All @@ -136,36 +130,19 @@ class DataSetSort(
}
}

private def directionToOrder(direction: Direction) = {
direction match {
case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING
case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING
case _ => throw new IllegalArgumentException("Unsupported direction.")
}

}

private val fieldCollations = collations.getFieldCollations.asScala
.map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))

private val sortFieldsToString = fieldCollations
.map(col => s"${getRowType.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ")

private val offsetToString = s"$offset"

private val fetchToString = if (limitEnd == Long.MaxValue) {
"unlimited"
} else {
s"$limitEnd"

override def toString: String = {
sortToString(getRowType, collations, offset, fetch)
}

override def toString: String =
s"Sort(by: ($sortFieldsToString), offset: $offsetToString, fetch: $fetchToString)"


override def explainTerms(pw: RelWriter) : RelWriter = {
super.explainTerms(pw)
.item("orderBy", sortFieldsToString)
.item("offset", offsetToString)
.item("fetch", fetchToString)
sortExplainTerms(
super.explainTerms(pw),
getRowType,
collations,
offset,
fetch)
}
}

0 comments on commit b8c8f20

Please sign in to comment.