New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-18119][table-runtime-blink] Retract old records in time range … #12680
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit a71ca38 (Tue Jun 16 11:22:27 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
Thanks for your contribution, could you add a unit test to this issue? This can help ensuring we don't introduce this bug again in the future. |
@KurtYoung Added unit tests and verified the tests fail without fix. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since I'm not super familiar with over aggregate, cc @wuchong to give another review.
.../org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunctionTest.java
Show resolved
Hide resolved
...org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java
Show resolved
Hide resolved
...org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java
Outdated
Show resolved
Hide resolved
.../org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunctionTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@protos37 Thanks for you contribution, the changes LGTM generally, only left some minor comments.
And let's wait for @wuchong 's final review.
...ava/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the great work @protos37 , nice catch! The changes looks good to me in general.
But I would like to go further with this PR. Actually, I don't think we need state ttl (The TableConfig.setIdleStateRetentionTime
) for the bounded over aggregates. A bounded over aggregate is just like a processing/event-time interval join or window aggregation, the state size is bounded and stable. The operator should expire state automantically without lossing correctness. We also didn't introduce state ttl for interval join and window aggregation. Therefore, I think we can remove the state ttl logic in ProcTimeRangeBoundedPrecedingFunction
and ProcTimeRowsBoundedPrecedingFunction
, that means they shouldn't extend KeyedProcessFunctionWithCleanupState
. What do you think?
.../org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunctionTest.java
Outdated
Show resolved
Hide resolved
@wuchong Considering the cases you've mentioned it totally makes sense to me to remove the state ttl from time range bounded over aggregations. Thank you for opinion and I'd be glad to work on that. |
// update timestamp and register timer if needed | ||
Long curCleanupTimestamp = cleanupTsState.value(); | ||
if (curCleanupTimestamp == null || curCleanupTimestamp < cleanupTimestamp) { | ||
// we don't delete existing timer since it may delete timer for data processing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may cause some performance problem if we register a timer for each record, because each timer is an entry in this state. A better solution might be to use AbstractStreamOperator
provides InternalTimerService
which can register timer by namespace. We can separate the namespace between cleanup and data processing.
Besides, it would also be better if we can make the cleanup timestamp in a range instead of a point, e.g. if the current cleanup timer is in (timestamp + precedingOffset, precedingOffset + precedingOffset * 1.5)
(similar to CleanupState#registerProcessingCleanupTimer
) , then we don't need to register a new one. This can avoid to remove/register for each record and be friendly to statebackend.
This might be a big refactoring. Thus I'm fine to add TODO comment here and create a following issue to do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About having cleanup timestamp as range, if I understood correctly, it seems to be about tradeoff between immediate state reduction and timer related overhead. While we don't have specific criteria like maxRetentionTime
, how can we choose the appropriate generosity for cleanup? Is it okay to go for 1.5 times as you mentioned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This is a tradeoff to avoid too many timers. But the 1.5 times is up to discuss.
@wuchong Applied ranged timer timestamp. For the timer namespace it seems to be a big refactoring as you said so left it as TODO. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updating. LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, we need to update OverWindowHarnessTest
because we changed the state TTL behavior, otherwise, the tests are failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to merge when build is passed.
It seems that the e2e test has failed by timeout. Is there anything I can do? |
You can rebase to master and push force to trigger the build again. |
…bounded preceding functions
…n time range bounded preceding functions
…ounded preceding functions
…for time range bounded over aggregation This changes the state expiration behavior for RowTimeRangeBoundedPrecedingFunction and ProcTimeRangeBoundedPrecedingFunction. In the previous version, we use TableConfig.setIdleStateRetentionTime to cleanup state when it is idle for some time. However, a bounded over aggregation is just like a processing/event-time interval join or window aggregation, the state size should be bounded and stable. The operator should expire state automatically based on watermark and processing time without losing correctness. This closes apache#12680
a10056b
to
e4010de
Compare
Rebased and force pushed to branch... Hope it didn't break anything you were doing? |
Passed in my branch: https://dev.azure.com/imjark/Flink/_build/results?buildId=177&view=results |
Will merge it. |
…for time range bounded over aggregation This changes the state expiration behavior for RowTimeRangeBoundedPrecedingFunction and ProcTimeRangeBoundedPrecedingFunction. In the previous version, we use TableConfig.setIdleStateRetentionTime to cleanup state when it is idle for some time. However, a bounded over aggregation is just like a processing/event-time interval join or window aggregation, the state size should be bounded and stable. The operator should expire state automatically based on watermark and processing time without losing correctness. This closes #12680
…for time range bounded over aggregation This changes the state expiration behavior for RowTimeRangeBoundedPrecedingFunction and ProcTimeRangeBoundedPrecedingFunction. In the previous version, we use TableConfig.setIdleStateRetentionTime to cleanup state when it is idle for some time. However, a bounded over aggregation is just like a processing/event-time interval join or window aggregation, the state size should be bounded and stable. The operator should expire state automatically based on watermark and processing time without losing correctness. This closes apache#12680
…bounded preceding functions
What is the purpose of the change
This fixes a bug in time range bounded preceding functions that the old records that is no longer required are retracted only if a new record with the same key comes in. This prevents unlimitedly growing state especially when the keyspace mutates over time.
Brief change log
Verifying this change
This change is already covered by existing tests, such as
OverWindowITCase
.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation