Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Time sort with offset/fetch without retraction #4380

Closed

Conversation

rtudoran
Copy link
Contributor

Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the How To Contribute guide.
In addition to going through the list, please provide a meaningful description of your changes.

  • [x ] General

    • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
    • The pull request addresses only one issue
    • Each commit in the PR has a meaningful commit message (including the JIRA id)
  • [x ] Documentation

    • Documentation has been added for new functionality
    • Old documentation affected by the pull request has been updated
    • JavaDoc for public methods has been added
  • [x ] Tests & Build

    • Functionality added by the pull request is covered by tests
    • mvn clean verify has been executed successfully locally or a Travis build has passed

@rtudoran
Copy link
Contributor Author

@fhueske ,@wuchong
As mentioned in PR #4263 I re-implemented the offset/fetch support for *time without retraction. You can find things in this PR. In principle things should be easy to follow as it is similar with what we already have.
Please let me know what you think

@fhueske
Copy link
Contributor

fhueske commented Jul 20, 2017

Hi @rtudoran, thanks for updating the PR.

I had a brief look at it and as I said before, I don't think we need additional ProcessFunctions for any of the ORDER BY *time ASC OFFSET x FETCH y cases. All cases (proctime and rowtime) can be implemented by extending the current functions for ORDER BY.

All we have to do is to

  • add a counter for OFFSET and not emit the first x rows (counter should be stored as state)
  • add a counter for FETCH and not emit more than y rows (counter should be stored as state).

If OFFSET or FETCH are not required, we can set x and/or y to -1 and ignore the counters.

Once we have extended the current ProcessFunctions, we can also remove the additional methods in DataStreamSort and SortUtil because we always translate to the same ProcessFunction just with different values for offset and fetch. So we only have to extend the current methods in DataStreamSort and SortUtil to set the correct values for offset and fetch (either the values from the query or -1 if not used) to the updated ProcessFunctions.

That should simplify the PR and require much less changes and code.

Let me know what you think,
Fabian

@rtudoran
Copy link
Contributor Author

@fhueske
I did not understood initially that this is your suggestion. What you propose has the advantage that it is easy to maintain (as we consolidate the whole functionality) and a slight disadvantage that you have a couple of useless checks in some scenarios (e.g., you would still check the fetch condition even if you would have only offset or just the plain sorting). If this tiny performance price to pay is ok ...than clearly we can consolidate the implementation.

Nevertheless, even if this is the case i would still propose we keep the ProcTimeIdentitySortProcessFunction
This is for the scenario where you have the order by simply on the time (and no other field). In the case of simple sort we only had an identity map function to pass the events. For the case of offset/fetch we can either extend that or keep one different implementation that adds the state counters for offset/fetch. I propose we keep this later implementation.

@rtudoran
Copy link
Contributor Author

@fhueske
Also - i do not understand why you would need to keep the counters for offset/fetch as states?

Assuming we have in the buffer state with events for proctime T values (1, 2, 3, 4, 5)
You want to emit them with offset 2 and fetch 2 (hence values 3 and 4)

So you will have the onTimer function when proctime moved and you can trigger computation for time T (i.e. at T+1)

The basic logic after you sort is that you go through the 5 elements and count the offset and then the fetch

for(int i=0; i< inputs.size; i++) {
offsetCounter++;
if(offsetCounter > offset && fetchCounter<fetch) {
out.collect(inputs(i))
fetchCounter++;
}
}

...you would then update the states at the end
What is the point here to memorize the fetchCounter and offsetCounter?
If a failure happen meanwhile you would anyway restore the whole list of the 5 elements and restart the logic (i.e., from the beginning of the function).
It is not like you do a state update at every iteration to pick in case of a failure from let's say line 5 when the value of counter was at a certain value

@rtudoran
Copy link
Contributor Author

@fhueske ,@wuchong
I updated the PR. Please have a look

Copy link
Contributor

@fhueske fhueske left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @rtudoran,

thanks for updating the PR. I had a look at the sort functions and noticed that the OFFSET and FETCH semantics are not correct. In SQL both are global limits for the complete query result. If a query specifies ORDER BY ... FETCH x ROWS ONLY, then the query must emit exactly x rows (given that the result has at least x rows) and not x rows for the same sort key. We want to have the same SQL semantics as for batch execution, so the operators and tests need to be adjusted to the correct semantics.

Also there is a lot of code duplication that can be avoided with some refactoring.

Best, Fabian

@@ -108,28 +108,25 @@ class DataStreamSort(
case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType) =>
(sortOffset, sortFetch) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change sortOffset and sortFetch member fields to Option[RexNode] to avoid null.

* @param execCfg table environment execution configuration
* @return org.apache.flink.streaming.api.functions.ProcessFunction
*/
private[flink] def createRowTimeSortFunctionOffset(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can consolidate all sort-related methods in SortUtil into three methods:

  • createProcTimeNoSortFunction(..., sortOffset: Option[RexNode], sortFetch: Offset[RexNode])
  • createProcTimeSortFunction(..., sortOffset: Option[RexNode], sortFetch: Offset[RexNode])
  • createRowTimeSortFunction(..., sortOffset: Option[RexNode], sortFetch: Offset[RexNode])

Each method handles all combinations of offset and fetch with two simple conditions to set the parameter to -1, 0, or the actual value.

inputRowType.asInstanceOf[CRowTypeInfo].rowType)
stateEventsBuffer = getRuntimeContext.getListState(sortDescriptor)

val arity:Integer = inputRowType.getArity
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+space arity: Integer

private val adjustedFetchLimit = offset + fetch

override def open(config: Configuration) {
val sortDescriptor = new ListStateDescriptor[Row]("sortState",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need state to collect records for this operator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead we need two ValueState[Long] for offsetCount and fetchCount.


}

override def onTimer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need timers for this operator


private var stateEventsBuffer: ListState[Row] = _

private var outputC: CRow = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need outputC. We can simply forward the input CRow.


}

override def processElement(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This operator should be implemented as follows:

val fetchCnt = fetchCount.value
if (fetch == -1 || fetchCnt < fetch) {
  // we haven't fetched enough rows
  val offsetCnt = offsetCount.value
  if (offsetCnt < offset) {
    // we haven't skipped enough rows
    // increment counter and skip row
    offsetCount.update(offsetCnt + 1)
  } else {
   // forward row
   out.collect(inputC)
   if (fetchCnt != -1) {
     fetchCount.update(fetchCnt + 1)
   }
  }
} else {
  // we fetch enough rows. drop Row and return
}

As you'll notice ORDER BY proctime ASC FETCH x ROWS FIRST is quite pointless because it will only emit x rows than nothing more. However, that's the correct semantics here. OFFSET is similar because it won't emit the first x rows which is not really meaningful either in a streaming context.

The other combinations are basically the same. The only difference is that they do a bit more sorting to identify the rows that have to be dropped. The sorting operators have to do the sorting as before in onTimer() but each record has to pass the offset, fetch check before being emitted. Once, the fetch count is exceeded, we also don't need to put rows into state and can simply drop them.
The reason why we need counters for offset and fetch are that this information must not get lost in case of a failure. Otherwise a job might emit rows after a failure even though it emitted enough rows before the failure.

Please adjust all operators and tests to the correct semantics. Thank you.

@@ -108,28 +108,25 @@ class DataStreamSort(
case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot code duplication in this class. All createSort methods look basically the same and mostly differ in the SortUtil methods they call. I think we don't need these methods and can do everything with a few if conditions directly in the translateToPlan() method.

Basically:

val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo)

val pFunc = if (FlinkTypeFactory.isProctimeIndicatorType(timeType) && sortCollation.getFieldCollations.size() == 1) {
  SortUtil.createProcTimeNoSortFunction(..., sortOffset, sortFetch)
} else if (FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
  SortUtil.createProcTimeSortFunction(..., sortOffset, sortFetch)
} else if (FlinkTypeFactory.isRowtimeIndicatorType(timeType)) {
  SortUtil.createRowTimeSortFunction(..., sortOffset, sortFetch)
} else {
  // error
}

inputDS.keyBy(new NullByteKeySelector[CRow])
  .process(processFunction).setParallelism(1).setMaxParallelism(1)
  .returns(returnTypeInfo)
  .asInstanceOf[DataStream[CRow]]

We would have to change the IdentityCRowMap to a ProcessFunction but that's fine. ORDER BY proctime is a corner case that does not add functionality and is only supported for syntactical completeness. IMO it is not worth added code complexity.

@rtudoran
Copy link
Contributor Author

@fhueske

Hi, I saw your remark for "If a query specifies ORDER BY ... FETCH x ROWS ONLY, then the query must emit exactly x rows (given that the result has at least x rows) and not x rows for the same sort key"

To some extend we had this discussion before when we started this topic. Yes - offset/fetch refer to global/absolute values - there must be 5 elements emitted overall if we have offset 1 and fetch 5. However, as you are saying it is not particularly useful if we enable this for the stream. Take 5 elements and then do not emit anything. This was the reason why the conclusion of the initial discussions on this topic was to have the mandatory orderby time...and then basically the restrictions of offset/fetch will re-apply for each timestamp. That was also the reason why we were retracting the previously emitted results and send the updated results.
Hence if you want to write a sql query over the stream to take the top 5 results based on some sort you would write

Select * from stream1 Order by rowtime, field1 Limit 5
...this would ensure that at moment 1 you get the top 5 results
...then at moment 2 you get an update that these 5 results are not relevant anymore (as time changed) and you get the top 5 results relevant at that moment.

Please reconsider this. I think this is the behavior that we need (and the one we discussed that is compatible with SQL semantics ... and also relevant). Without this and just having a function that selects at some point x results and then does nothing (or skips y results and then nothing for the rest of the existence)...i do not see the relevance. While, with this syntax or some similar syntax...we would still need to support such functionality also via SQL.

@fhueske
Copy link
Contributor

fhueske commented Jul 30, 2017

Hi @rtudoran,

I had the impression that your main motivation was to fill the syntactic gaps of SQL on streams and not so much to support certain kinds of use cases.

The semantics of a SQL query are given by the SQL specification and not up for discussion. If you want to support a certain behavior, we need to see how this can be expressed in standard-compliant SQL and then think about the implementation. The semantics of ORDER BY rowtime ASC FETCH 10 ROWS ONLY are given and will return the ten first (first because they have the lowest timestamp) result rows.

However, I think what you are trying to achieve is represented by ORDER BY rowtime DESC FETCH 10 ROWS ONLY. This query returns the ten last (last because they have the highest timestamp) rows of the result. Obviously, we need retraction to handle this case, because the ten last rows will change over time. ORDER BY rowtime DESC OFFSET 10 is also not very useful, because it holds back the 10 last result rows. However, we could support it to fill the gap in the syntax and execute the query instead of saying "Sorry, can't do that".

So the question now is, how do we proceed with this PR? Do we want to add the ORDER BY rowtime ASC OFFSET / FETCH functionality as simple additions to the existing operators (as I said I think it can be done with very few lines if we do some refactoring) to fill the syntactic gap or not?

Regardless of whether we add this or not, we should work on the ORDER BY rowtime DESC OFFEST / FETCH case.

What do you think?

@rtudoran
Copy link
Contributor Author

rtudoran commented Aug 1, 2017

@fhueske

Thanks for the remarks/clarification.

I am fine with taking an approach to tackle both scenarios (also the simple addons you suggest but also having the actual useful code).
Hence i would propose that the code developed with retraction support for ORDER BY rowtime ASC FETCH/OFFSET we use it for "ORDER BY rowtime DESC offset/FETCH. Do you agree with this?

I can add then in another PR/update also the code you intend to use for the ASC case with the simple modifications.

Putting this aside there is another issue with LIMIT x. The intended behavior of LIMIT is to limit the number of results considered at a point. The way it is translated by Calcite is similar with FETCH. Basically at the moment of conversion you do not see any difference between

ORDER BY rowtime ASC FETCH 10 ROWS ONLY and ORDER BY rowtime LIMIT 10.
How should we handle this situation?

@fhueske
Copy link
Contributor

fhueske commented Aug 1, 2017

OK @rtudoran, that sounds good to me.

I haven't had a detailed look at the retraction code you submitted before. If it has the right semantics for ORDER BY *time DESC FETCH x, please open a new PR and I'll be happy to review it.

Regarding ORDER BY *time ASC OFFSET / FETCH, I'd modify this PR according to the comments.

I didn't understand the issue with LIMIT yet. Why is LIMIT 10 not equivalent to FETCH FIRST 10 ROWS ONLY? Both should return 10 rows, right? Aside from this question, I think if Calcite parses and translates LIMIT as FETCH the right way to change the behavior would be to fix it in Calcite.

Add support for offset and fetch when time is descending with reste at
every time unit
@rtudoran
Copy link
Contributor Author

@fhueske
I have updated the PR.
Based on the previous discussions i have:

  1. integrated the support for offset and fetch when time is ascending as you suggested (having a counter within the process function that will restrict the output)
  2. add back the implementation for offset and fetch with retraction and bind this for queries that would have time descending. It is important to remark that the way i intended to enable this behavior is that at every new time unit the offset/fetch restriction will be reapplied from scratch. That means that in an example such as (getting the first 2 elements):
    t1: event 1, event 2, event 3 => output is event 1 and event 2
    t2: event 4 => output is retract (events 1&2) emit event 4

a reset at each timestamp there is a continuous output/retraction
@aljoscha
Copy link
Contributor

Closing this as abandoned, and also because it's not tied to a Jira issue.

@aljoscha aljoscha closed this Oct 18, 2019
@aljoscha aljoscha self-assigned this Oct 20, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants