-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-7446] [table] Change DefinedRowtimeAttribute to work on existing field. #4710
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
LGTM overall +1. One question: since we now cast |
twalthr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @fhueske. Looks very good! I had just minor comments. One last thing: Could could you also add a combined TableSource that implements batch and stream table source + defined time attributes?
docs/dev/table/streaming.md
Outdated
| #### During DataStream-to-Table Conversion | ||
|
|
||
| The event time attribute is defined with the `.rowtime` property during schema definition. | ||
| The event time attribute is defined with the `.rowtime` property during schema definition. Timestamps and watermarks must have been assigned in the `DataStream` that is converted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a link to dev/event_timestamps_watermarks again?
| case idx => | ||
| // regular attribute. Access attribute in input data type. | ||
| generateInputAccess(input1, input1Term, idx) | ||
| // get type of result field |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment that this is needed for TableSource?
| // Change output type to rowtime indicator | ||
| if (FlinkTypeFactory.isRowtimeIndicatorType(outType) && | ||
| (inputAccess.resultType == Types.LONG || inputAccess.resultType == Types.SQL_TIMESTAMP)) { | ||
| // Hard cast possible because LONG, TIMESTAMP, and ROW_TIMEINDICATOR are internally |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ROWTIME_INDICATOR
| tableSource match { | ||
| case s: StreamTableSource[_] => | ||
| StreamTableSourceTable.deriveRowTypeOfTableSource(s, flinkTypeFactory) | ||
| case b: BatchTableSource[_] => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace b with _ to remove IDE warning.
| val scan: FlinkLogicalTableSourceScan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan] | ||
| scan.tableSource match { | ||
| // projection pushdown is not supported for sources that provide time indicators | ||
| case r: DefinedRowtimeAttribute if r.getRowtimeAttribute != null => false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you create a follow-up issue for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes will do, but it is not easy to solve IMO.
| val original = TableEnvironment.getFieldIndices(tableSource) | ||
|
|
||
| // append rowtime marker | ||
| val withRowtime = if (rowtime.isDefined) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unused rowtime
| * Defines a name of the event-time attribute that represents Flink's | ||
| * event-time. Null if no rowtime should be available. | ||
| * Defines a name of the event-time attribute that represents Flink's event-time, i.e., an | ||
| * attribute that is aligned with the watermarks of the table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is aligned with the watermarks of the underlying DataStream?
| * The method should return null if no rowtime attribute is defined. | ||
| * | ||
| * @return The name of the field that represents the event-time field and which is aligned | ||
| * with the watermarks of the table. The field must be present in the schema of the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some here.
| trait DefinedRowtimeAttribute { | ||
|
|
||
| /** | ||
| * Defines a name of the event-time attribute that represents Flink's |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe also update the docs of the class?
| } | ||
|
|
||
| @Test | ||
| def testProjectableRowTimeTableSource(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does filter push down work? With rowtime and proctime?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that should not be a problem. Projection push-down is not possible because the schema of the table is partially constructed inside of the Table API (e.g., appending the proctime attribute).
|
@haohui We only cast ROWTIME / PROCTIME directly to LONG during runtime, the special types are needed during pre-flight phase and validation. We could not come up with a better solution that ensures that watermarks stay aligned with the rowtime. |
|
Thanks for the review @twalthr. @haohui: This PR preserves the current logic that time attributes are exposed as |
|
Looks good to me. +1 |
|
Thanks @wuchong |
What is the purpose of the change
Changes the contract of the
DefinedRowtimeAttributeinterface. The rowtime attribute is no longer appended to the schema of the row but marks an existing field in the input that will be handled as event time attribute. The specified field must be of typeLongorTimestamp. The watermarks ofDataStreammust be aligned to the specified field.Brief change log
TableSourcesthat implementDefinedRowtimeAttributeorDefinedProctimeAttributeVerifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation