Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(1.19 backport) [FLINK-35886][task] Fix watermark idleness timeout accounting when subtask is backpressured/blocked #25223

Merged
merged 12 commits into from
Aug 21, 2024

Conversation

pnowojski
Copy link
Contributor

this is a backport of #25167

Operators have to be serializable and they will also need an access
to Clock to construct ProgressBlockingRelativeClock. Because we
also want to be able to provide for testing purposes ManualClock
we have to find a way how Operators could obtain a Clock instance.
Exposing Clock from ProcessingTimeService sounds like a good place
as it also will provide a since source of processing time for
potential users (for example ProgressBlockingRelativeClock and
firing timers)
Comment on lines +2358 to +2361
<!-- New method has been added to the interface, but this interface shouldn't be implemented by users, only used/called, so this doesn't break API compatibility. -->
<exclude>org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier$Context</exclude>
<!-- Base interface has been extracted from the existing Clock class. This interface shouldn't affect user code. -->
<exclude>org.apache.flink.util.clock.Clock</exclude>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please pay attention to this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to return System clock from the new method (WatermarkGeneratorSupplier.Context#getInputActivityClock) to avoid breaking existing implementations.
I think it's not impossible that there are custom implementations of this interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This context is provided by Flink to operators/functions, so I don't see any legitimate way how user should be interested in implementing it's own context. This could only happen if someone forks Flink and extends it somehow in this area AFAIU. I will merge it as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this WatermarkGeneratorSupplier.Context#getInputActivityClock, turns out to be a problem and user code actually implements on its own WatermarkGeneratorSupplier.Context interface. For example our Kafka connector does this here: https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java#L438-L439

Due to this reason, I don't see an easy way how to backport this bug fix to release 1.19 and 1.20 and I would suggest to only fix the bug in 2.0. Especially given that there doesn't seem to be a large interest in this bug fix (it hasn't been reported before). I will revert my changes for 1.19 and 1.20

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 20, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@rkhachatryan rkhachatryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for compatibility concern, but I'd be fine also with merging as is.

Comment on lines +2358 to +2361
<!-- New method has been added to the interface, but this interface shouldn't be implemented by users, only used/called, so this doesn't break API compatibility. -->
<exclude>org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier$Context</exclude>
<!-- Base interface has been extracted from the existing Clock class. This interface shouldn't affect user code. -->
<exclude>org.apache.flink.util.clock.Clock</exclude>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to return System clock from the new method (WatermarkGeneratorSupplier.Context#getInputActivityClock) to avoid breaking existing implementations.
I think it's not impossible that there are custom implementations of this interface.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants