New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-6232][Table&Sql] support proctime inner windowed stream join #4266
Conversation
Hi @hongyuhong , is this the same PR with #3715 ? In order to rebase/remove merge commit, please do not create a new PR, otherwise committers may review an out-date PR or lose the review context. You can force update your repo branch via Thanks, |
Thanks for the update @hongyuhong! I will open a new PR with your work and my commit on top, probably later today. @wuchong your review is of course also highly welcome :-) Thank you, Fabian |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @hongyuhong , thanks for your great work! I left some comments below (most is code style).
Overall looks good to me.
@fhueske , I just finished the review. Sorry for the delay.
Cheers,
Jark
*/ | ||
package org.apache.flink.table.runtime.join | ||
|
||
import java.math.{BigDecimal => JBigDecimal} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unused import
|
||
joinType match { | ||
case JoinRelType.INNER => | ||
isRowTime match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using a if (isRowTime) else
here is more simple.
object WindowJoinUtil { | ||
|
||
/** | ||
* Analyze time-condtion to get time boundary for each stream and get the time type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor typo: condtion -> condition
"two join predicates that bound the time in both directions.") | ||
} | ||
|
||
// extract time offset from the time indicator conditon |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor typo: conditon -> condition
val env = StreamExecutionEnvironment.getExecutionEnvironment | ||
val tEnv = TableEnvironment.getTableEnvironment(env) | ||
env.setStateBackend(getStateBackend) | ||
StreamITCase.testResults = mutable.MutableList() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can simply do StreamITCase.clear
instead of this.
val env = StreamExecutionEnvironment.getExecutionEnvironment | ||
val tEnv = TableEnvironment.getTableEnvironment(env) | ||
env.setStateBackend(getStateBackend) | ||
StreamITCase.testResults = mutable.MutableList() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can simply do StreamITCase.clear
instead of this.
|
||
class JoinITCase extends StreamingWithStateTestBase { | ||
|
||
val data = List( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the data
is never used, can we remove it?
val reduceList = new util.ArrayList[RexNode]() | ||
exprReducer.reduce(rexBuilder, originList, reduceList) | ||
|
||
val literals = reduceList.asScala.map(f => f match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be simplified to
val literals = reduceList.asScala.map {
case literal: RexLiteral =>
literal.getValue2.asInstanceOf[Long]
case _ =>
throw TableException(
"Time condition may only consist of time attributes, literals, and arithmetic operators.")
}
// If the state has non-expired timestamps, register a new timer. | ||
// Otherwise clean the complete state for this input. | ||
if (nextTimer != 0) { | ||
ctx.timerService.registerProcessingTimeTimer(nextTimer + winSize + 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The nextTimer
maybe not the smallest or greatest timestamp among the non-expired timestamps. Is it better to register a curTime + winSize + 1
timer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that makes sense to me.
val oppoUpperTime = curProcessTime + oppoUpperBound | ||
|
||
// only when windowsize != 0, we need to store the element | ||
if (winSize != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about this. For example, a.proctime between b.proctime - 5 and b.proctime
. In this case, we will buffer stream a
for a window size 5, but will not buffer stream b
because the right window size is 0.
Suppose the input elements are [a1, 1], [a2, 2], [b1, 5], [a3, 5]. The first field in the tuple indicates which stream it belongs to. The seconds field in the tuple is the processing timestamp. The expected result should be a1, b1
, a2, b1
, a3, b1
. But the actual result misses a3, b1
. Because we didn't buffer the elements from b
stream.
So I think, even if the window size is 0, we still need to store the elements. Of course, we will register a curTime +1
timer to clean the states.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are right @wuchong. I'll remove that condition.
OTOH, this is a processing time join which cannot guarantee strict results anyway ;-)
Thanks for the review @wuchong. |
|
||
val config = tableEnv.getConfig | ||
|
||
val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use DataStreamRetractionRules.isAccRetract(input)
to check whether the input will produces updates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isAccRetract
only checks how updates are encoded but not whether there are updates.
The current approach is correct, IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following SQL select a, sum(b), a+1 from t1 group by a
will optimized into the following nodes:
DataStreamCalc (AccRetract, producesUpdates=false)
DataStreamGroupAggregate (AccRetract, producesUpdates=true)
DataStreamScan (Acc, producesUpdates=fale)
The DataStreamCalc is append only, but is in AccRetract mode which means the output contains retraction.
I think we want to check whether the input contains retraction, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the UpdateCheckUtils.isAppendOnly
recursively checks if any downstream operator produces updates. As soon as any downstream operator produces updates, the given operator has to be able to handle them.
Updates can be encodes as retraction or as key-wise updates. Retraction updates produce two messages. Non-retraction updates produce a single message and require a key to which they relate (CRow.change == true
-> insert or update per key, CRow.change == false
-> delete on key). Right now, only UpsertTableSinks use non-retraction/keyed updates, but other operators such as unbounded joins will use it as well.
So even if AccRetract
is false, the input might produce updates but those updates are differently encoded, i.e., in a single message. The window stream join is not able to handle updates (it ignores the CRow.change
flag). Therefore, we must ensure that the inputs do not produce updates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the explanation, that makes sense to me. But I find DataStreamOverAggregate
and DataStreamGroupWindowAggregate
use DataStreamRetractionRules.isAccRetract
, is that a misusage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think you are right. These checks should also check for updates and not retraction mode.
Maybe it makes sense to integrate the whole append-only/updates check into the decoration rules. Same for the inference of unique keys (the other method in UpdateCheckUtils
).
Hi @hongyuhong and @wuchong, I opened a new PR which extends this PR. @hongyuhong can you close the PRs #3715 and this one? Thank you, Fabian |
Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the How To Contribute guide.
In addition to going through the list, please provide a meaningful description of your changes.
General
Documentation
Tests & Build
mvn clean verify
has been executed successfully locally or a Travis build has passed