-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-33419] Port PROCTIME/ROWTIME functions to the new inference stack #23634
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| TestSpec.forStrategy( | ||
| "ROWTIME type strategy on proctime indicator", | ||
| SpecificInputTypeStrategies.windowTimeIndicator( | ||
| TimestampKind.ROWTIME)) | ||
| .calledWithArgumentTypes(timeIndicatorType(TimestampKind.PROCTIME)) | ||
| .expectErrorMessage( | ||
| "A proctime window cannot provide a rowtime attribute."), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can/should we add a test to cover the other direction? E.g. using the proctime strategy on a rowtime indicator? (Such a test passes. I'll admit that I don't quite understand how to mix and match the two.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's what I meant by "timeindicators are tricky".
The thing is the validation in the API layer is not perfect. It has already lots of false positives at different locations. By false positives I mean we can not fully trust if a column is or is not a proctime. That's some general background.
Now to the ROWTIME/PROCTIME. Rowtime strictly requires Watermarks to be working. Watermarks are only present if you do a rowtime window. PROCTIME on the other hand tells the time when a record was processed. This theoretically can always be returned. It could be returned both for rowtime and proctime windows.
I can add a test for that.
|
|
||
| if (!LogicalTypeChecks.canBeTimeAttributeType(type) && !type.is(LogicalTypeRoot.BIGINT)) { | ||
| return callContext.fail( | ||
| throwOnFailure, "Reference to a rowtime or proctime window required."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a test case which covers this branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added tests for that
| TimeIndicatorTypeInfo.ROWTIME_INDICATOR | ||
| case WindowReference(_, Some(tpe)) if tpe == Types.LONG || tpe == Types.SQL_TIMESTAMP => | ||
| // batch time window | ||
| Types.SQL_TIMESTAMP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this test need to return an SQL_TIMESTAMP?
TestSpec.forStrategy(
"ROWTIME type strategy on long in batch mode",
SpecificInputTypeStrategies.windowTimeIndicator(
TimestampKind.ROWTIME))
.calledWithArgumentTypes(DataTypes.BIGINT())
.expectArgumentTypes(DataTypes.BIGINT()));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test you posted covers the input strategy not output. However you're right, it should return TIMESTAMP(3) (SQL_TIMESTAMP is a legacy type). I updated the output strategy for ROWTIME
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to check my understanding, we do not need to update the output strategy for proctime since its type is constant, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we do need to update it for PROCTIME, because we always need to return PROCTIME time indicator.
I thought having timestampKind == TimestampKind.PROCTIME && !LogicalTypeChecks.isTimeAttribute(type) for input is enough, but it passes also for ROWTIME time indicator.
I'll update the output type strategy for PROCTIME
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Glad I asked, and good job finding the corner/edge case.
0e1e0a9 to
71a13af
Compare
jnh5y
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My comments have been addressed! LGTM!
What is the purpose of the change
Ports
PROCTIME&ROWTIMEfunctions to the new inference stack.Verifying this change
Added tests for the input type strategy
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation