-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-6483] [table] Support time materialization #3862
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
fhueske
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @twalthr.
Looks pretty good. I left a couple of comments mainly regarding the null check in the generated code and support for time fields of grouping columns and agg function arguments (MIN(rowtime) might make sense in a tumbling window for instance).
What do you think?
Fabian
| if (isEventTime) { | ||
| val resultCode = | ||
| s""" | ||
| |boolean $nullTerm = $contextTerm.timestamp() == null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should throw an exception if the timestamp is null.
The query only access the timestamp if it explicitly asks for event-time. If the timestamp is not set, the query should fail, IMO.
As an additional check, we should add a check to the StreamTableEnvironment (and the TableSourceTable) that verifies that event-time is enabled in the StreamExecutionEnvironment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it makes even sense to to this check once in the beginning of a query to avoid the repeated checks whenever we materialize the time. I'd consider this as a future improvement.
| val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) | ||
|
|
||
| val result = t | ||
| .groupBy('rowtime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we support this case as well?
For example .groupBy('rowtime + 0.milli) and .groupBy('rowtime.floor(TimeIntervalUnit.HOUR)) work correctly, because the expressions are pushed into a Calc which evaluates them. So for .groupBy('rowtime), we would need to inject a Calc with the materialization. How much effort would this be?
|
|
||
| val result = t | ||
| .groupBy('long) | ||
| .select('rowtime.count) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it makes sense to have this restriction here. .select(('rowtime + 0.milli).count) would work correctly.
| } | ||
|
|
||
| @Test | ||
| def testWindowStartEnd(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tests only window end.
| } | ||
|
|
||
| @Test | ||
| def testWindowStartEndSql(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tests only window end.
|
|
||
| class TableFunc extends TableFunction[String] { | ||
| def eval(time1: Long, time2: Timestamp): Unit = { | ||
| time1.toString + time2.toString |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be collect(time1.toString + time2.toString)
| import java.math.BigDecimal | ||
| import java.sql.Timestamp | ||
|
|
||
| import org.apache.flink.api.scala._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Several unused imports
| } | ||
| } | ||
|
|
||
| class TableFunc extends TableFunction[String] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be removed because the code uses RelTimeIndicatorConverterTest.TableFunc.
| correlate.getJoinType) | ||
| } | ||
|
|
||
| private def convertAggregate(aggregate: Aggregate): LogicalAggregate = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return a (Option[LogicalProject], LogicalAggregate) to cover the case when a LogicalProject needs to be prepended to materialize a grouping key or agg function argument (see comments below).
|
Thanks for the update @twalthr! |
This PR adds support for time materialization. It also fixes several bugs related to time handling in the Table API & SQL.