New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-6075] - Support Limit/Top(Sort) for Stream SQL #3889
Conversation
Order by time and optionally other fields is supported
@hongyuhong @stefanobortoli @shijinkui i forgot to add you to the PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @rtudoran,
thanks for the PR. The overall approach looks good. I make a couple of comments about code style and refactorings but nothing major.
Thanks, Fabian
sortCollation: RelCollation, | ||
sortOffset: RexNode, | ||
sortFetch: RexNode, | ||
cluster: RelOptCluster, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please keep the order of parameters consistent to other classes (for instance DataStreamCalc
), i.e., cluster, traitset, input, inputschema, schema, sortCollation, offset, fetch, description.
} | ||
|
||
override def toString: String = { | ||
s"Sort(by: ($SortUtil.getSortFieldToString(sortCollation, rowRelDataType))," + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the methods which are common among DataSetSort
and DataStreamSort
(e.g. methods to create strings) into a new class CommonSort
, similar to CommonCalc
or CommonCorrelate
.
|
||
//need to identify time between others ordering fields | ||
val ind = collationSort.getFieldCollations.get(0).getFieldIndex | ||
rowType.getFieldList.get(ind).getValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use getType
instead of getValue
//enable to extend for other types of aggregates that will not be implemented in a window | ||
timeType match { | ||
case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType) => | ||
(sortOffset,sortFetch) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+space
timeType match { | ||
case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType) => | ||
(sortOffset,sortFetch) match { | ||
case (o: Any, f: Any) => // offset and fetch needs retraction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use the more concrete type RexNode
instead of Any
?
if o
and f
are not used, you can write case (_: RexNode, _: RexNode) =>
|
||
object SortITCase { | ||
|
||
class EventTimeSourceFunction[T]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make the EventTimeSourceFunction
in OverWindowITCase
a util class and reuse it here instead of duplicating the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
"DataStreamCalc", | ||
unaryNode("DataStreamSort", | ||
streamTableNode(0), | ||
term("orderBy", "proctime ASC, c ASC], offset=[null], fetch=[unlimited")), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The string seems to be broken. First there is a closing ]
and later an opening [
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah - i was trying to make it match with respect to the number of parenthesis with the incoming value. The problem was that i was messing up with the string and no matter how many i pair i was adding it was replacing and adding some additional ones at the begin and end. The actual problem was that order by / offset and fetch needed to be standalone terms
streamUtil.verifySql(sqlQuery, expected) | ||
} | ||
|
||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-1 space. Check expected exceptions with @Test(expected = classOf[YourExpectedException])
} | ||
} | ||
|
||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
@Test | ||
def testSortProcessingTimeSecondaryField() = { | ||
|
||
val sqlQuery = "SELECT a FROM MyTable ORDER BY c, proctime" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment why the tests are expected to fail
refactor the tests, correct for styles errors
@fhueske @stefanobortoli @shijinkui @hongyuhong |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @rtudoran, the PR looks quite good. I made quite a few comments for minor changes but did not find a major issue.
Let me know what you think,
Fabian
tEnv.registerTable("T1", t1) | ||
|
||
val sqlQuery = "SELECT b FROM T1 " + | ||
"ORDER BY rowtime, b ASC "; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm ;
"ORDER BY rowtime, b ASC "; | ||
|
||
|
||
val result = tEnv.sql(sqlQuery).toDataStream[Row] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
toDataStream
is deprecated. Please use toAppendStream
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the flink version from where i started it it is not available. I think if i rebase now i will have to create a new PR... I would prefer to keep this and do an update after the merge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, will do that before merging
rexBuilder, | ||
input.getRowType.getFieldList.map(_.getType)) | ||
|
||
val offset = if(sort.offset != null) sort.offset.accept(materializer) else null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
offset
and fetch
should not contain time attributes. If they do, we cannot execute the query.
val offsetToString = s"$offset" | ||
offsetToString | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm new line
|
||
trait CommonSort { | ||
|
||
private[flink] def offsetToString(offset: RexNode): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, this method can be removed. It simply calls offset.toString()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is more for consistency - to have a method for "ToString" for each feature of the sort (direction, fetch, fields...). As there is not cost for it I suggest to keep it
|
||
val inputCRowType = CRowTypeInfo(rT) | ||
|
||
val processFunction = new KeyedProcessOperator[Integer,CRow,CRow]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+space after comma
INT_TYPE_INFO, | ||
STRING_TYPE_INFO, | ||
LONG_TYPE_INFO), | ||
Array("a","b","c","d","e")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
space after comma
|
||
val rTA = new RowTypeInfo(Array[TypeInformation[_]]( | ||
LONG_TYPE_INFO), Array("count")) | ||
val indexes = Array(1,2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
space
Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2006)) | ||
|
||
//move the timestamp to ensure the execution | ||
testHarness.setProcessingTime(1005) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please run at least two sorts to ensure that all intermediate state is correctly reset.
Also verify the state size to ensure that the operator is not leaking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I understand - you mean within the same test or in different tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to trigger a sort a second time by adding more data and setting the processing time another time. Basically, just what you did. Same would have been good for the rowtime test.
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} | ||
import org.apache.flink.streaming.api.TimeCharacteristic | ||
|
||
class TimeSortProcessFunctionTest{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a similar test for the event-time sorter which sorts for multiple timestamps and validates that all rows preserve their timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not understand - this test was added (see below testSortRowTimeHarnessPartitioned) ...perhaps this is an old comment ...
- multiple indentations, spaces and style things - further refactor methods towards CommonSort - extend and update the tests
@fhueske I have addressed the remarks you have done (with 1-2 minor exceptions for which you can see my comments). Please have a look. I think it could be ready to be merged and than I can start the preparation for the offset and fetch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @rtudoran.
I will make some minor fixes and refactorings and merge this PR.
Thanks, Fabian
|
||
val input = sort.getInput.accept(this) | ||
|
||
val materializer = new RexTimeIndicatorMaterializer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused. Should be removed
rexBuilder, | ||
input.getRowType.getFieldList.map(_.getType)) | ||
|
||
//val offset = if(sort.offset != null) sort.offset.accept(materializer) else null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be removed
|
||
package org.apache.flink.table.plan.nodes.datastream | ||
|
||
import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About half of the imports are unused. Other classes have unused imports as well.
expectedOutput.add(new StreamRecord(new CRow( | ||
Row.of(1: JInt, 1L: JLong, 0: JInt, "aaa", 11L: JLong),true), 1006)) | ||
expectedOutput.add(new StreamRecord(new CRow( | ||
Row.of(1: JInt, 2L: JLong, 0: JInt, "aaa", 11L: JLong),true), 1006)) | ||
|
||
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test should check that the ProcessFunction emit the rows in the correct order. assertOutputEqualsSorted
sorts the result and expected data before comparing them. We have to use assertOutputEquals
instead.
Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2006)) | ||
|
||
//move the timestamp to ensure the execution | ||
testHarness.setProcessingTime(1005) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to trigger a sort a second time by adding more data and setting the processing time another time. Basically, just what you did. Same would have been good for the rowtime test.
"ORDER BY rowtime, b ASC "; | ||
|
||
|
||
val result = tEnv.sql(sqlQuery).toDataStream[Row] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, will do that before merging
@fhueske Thanks for the review and for handling the the fixes (sorry for still kipping some :( ). I will create this week the PR for the rest of the functions. |
Order by time and optionally other fields is supported
Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the How To Contribute guide.
In addition to going through the list, please provide a meaningful description of your changes.
[x ] General
[x ] Documentation
[x ] Tests & Build
mvn clean verify
has been executed successfully locally or a Travis build has passed