Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-10531][e2e] Fix unstable TTL end-to-end test. #7036

Closed
wants to merge 1 commit into from

Conversation

kl0u
Copy link
Contributor

@kl0u kl0u commented Nov 6, 2018

As describe on the JIRA, the problem is that the clock on the machine on Travis seems to have jumped backwards.

This resulted in a mismatch between the elements discarded as expired by Flink's internal TTL mechanism, and the "user-code" in the test that computes the expired elements based on the timestamp of the latest update.

I repeat the explanation from the JIRA here for reference:


The way the test works is the following. For a given state:

  1. it keeps the state with TTL
  2. and keeps all the updates to that state irrespective of their timestamp (e.g. all the elements added in a ListState)

To verify correctness:

  1. it fetches the state from Flink, as cleaned up by the internal TTL mechanism
  2. it reconstructs the expected state from the stored updates by taking the timestamp
    of the latest update (ts) discarding elements with timestamp ts-ttl

As you can see from the stacktrace in the error from Travis, the latest update has timestamp ts=1538918066021
while there are elements in the list with timestamps greater than ts (e.g. 1538918066136). This means that the internal
clock on that machine went backwards, so Flink's TTL may have removed elements that appear in the expected state
of the test, as it takes as current timestamp the ts=1538918066021.

The fix is simply to assume that (for the test), processing time increases monotonically and ignore "updates from the past".

@kl0u
Copy link
Contributor Author

kl0u commented Nov 6, 2018

R @azagrebin

@zentol
Copy link
Contributor

zentol commented Nov 7, 2018

performUpdate() is using System.currentTimeMillis which is not guaranteed to be monotonous. Could this be the underlying cause?

@StefanRRichter
Copy link
Contributor

I wonder if it would not make more sense to use a monotonous time provider for a single machine, such as System.nanoTime(), or a wrapper around time millis that prevents falling back in time, or some test implementation that the test can control, for the test instead of ignoring elements and potentially missing out on errors. TtlTimeProvider is already an interface, and TtlTimeProvider DEFAULT = System::currentTimeMillis; is used in all places. Instead this could be the first case where we want to make the time provider configurable via a factor. The simplest case could just replace it when encountering a certain config entry or property.

Another option would be that if we detect such time changes we exit the test with a certain exit code and rerun it again. What do you think?

@azagrebin
Copy link
Contributor

azagrebin commented Nov 8, 2018

I agree that configuring custom processing time provider on the API side is more robust approach for tests. The change might be a bit more involving. I have created issues to consider it: FLINK-10830, FLINK-10831.

One more option is to check if currentTimeMillis jumped back in TtlVerifyUpdateFunction.performUpdate relatively to the latest consumed value. If jump happened then just wait until currentTimeMillis returns the next value greater than the latest consumed value. This way the update never needs to be rejected for this reason and test restarted.

There is still potential subtle problem (could be not really practical) if:

  • timestampBeforeUpdate gets increased value
  • Flink gets jumped back currentTimeMillis
  • lag happens and
  • timestampAfterUpdate gets again increased value.

In this case, the time queried by Flink is completely out of test control and without plugging the time there is no way to sync it in Flink and test.

@kl0u
Copy link
Contributor Author

kl0u commented Nov 8, 2018

@azagrebin I commented on https://issues.apache.org/jira/browse/FLINK-10830 about my concerns of allowing the user to specify his/her own processing time provider.

@kl0u
Copy link
Contributor Author

kl0u commented Nov 8, 2018

Actually, now @StefanRRichter mentioned it, we have the TtlTimeProvider and the ProcessingTimeService. Both of them are used internally by Flink to let it know what is the "current" processing time, but each of them has its own implementation (which happens to be currentTimeMillis for both).

I think that these two should be unified and the TtlTimeProvider should call the ProcessingTimeService::getCurrentProcessingTime(). This will allow to have a "single point of truth".

What do you think @StefanRRichter and @azagrebin

@kl0u kl0u force-pushed the ttl-fix branch 2 times, most recently from 63a35df to 8a018f0 Compare November 14, 2018 16:40
@kl0u
Copy link
Contributor Author

kl0u commented Nov 14, 2018

@azagrebin Please have another look and let me know what you think!

Copy link
Contributor

@azagrebin azagrebin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kl0u! Looks good, I have left couple of comments to consider before merge. Especially, I think we should generate UUIDs for Serializable classes.

* A stub implementation of the {@link StateBackend} that allows the use of
* a custom {@link TtlTimeProvider}.
*/
private static final class StubStateBackend implements StateBackend {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also generate UUID for StubStateBackend, MonotonicTTLTimeProvider, TtlVerifyUpdateFunction, UpdateStat, AggregateFunction, TtlUpdateContext, TtlVerificationContext, ValueWithTs and ValueWithTs.Serializer.

I would suggest to create StateBackendWrapperAdaptor in main code along with StateBackend as well which would always wrap wrappedBackend and forward all StateBackend interface methods. Here we would need to override only relevant methods. The adaptor could be reused in future for similar cases like we have here. If StateBackend interface gets more methods, they would need default forwarding only in StateBackendWrapperAdaptor and other extending classes can stay untouched.
Otherwise, I would suggest to move this class at least to separate file.
I leave these last ideas up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will move the class to a separate file but I will not touch the production code. This Jira is about test stability. Refactoring the production code is a lot more involved and definitely deserves another JIRA and more discussion. Especially for such a change.


public long getTimestampAfterUpdate() {
return timestampAfterUpdate;
public long getTimestamp() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems we can remove public

@kl0u
Copy link
Contributor Author

kl0u commented Nov 16, 2018

@azagrebin I integrated your comments. I will merge as soon as Travis gives green.

Copy link
Contributor

@azagrebin azagrebin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kl0u ! LGTM 👍

@kl0u kl0u deleted the ttl-fix branch November 26, 2018 13:12
tisonkun pushed a commit to tisonkun/flink that referenced this pull request Jan 17, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants