-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-4616] Added functionality through which watermarks for each partition are saved and loaded via checkpointing mechanism #3031
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…rtition are saved and loaded via checkpointing mechanism
bb9a549
to
b79b580
Compare
I think that the changes that I propose eliminates the possibility of starting with checkpoints created before my code changes. Because now it saves ListState<Tuple2<KafkaTopicPartition, Tuple2<Long, Long>>> (partition + offset + watermark). (I mean checkpoints version later then 1.1. Please advise me how to repair backward compatibility. I have some idea of how to implement it:
is it
Or it is necessary implement different way. I would be grateful for help. |
Thank you for the contribution @MayerRoman. Just want to let you know that I've noticed this PR, and I think the issue is definitely something we want to fix. I'll allocate some time this week to review the PR. |
Thank you Tzu-Li Tai. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MayerRoman I've done a first pass review, I'll definitely do a second detailed pass once we sort the following problems out:
-
I think it'll make sense to have a new specific state class that wraps a topic-partition's offset and watermark as checkpointed state.
-
This feature is expected to be fixed for 1.3.0, which means savepoints taken during
1.2
and1.1
will be considered as old version checkpoints. Therefore, I think the originalTuple2<KafkaTopicPartition, Long>
state will first be passed through the oldrestoreState()
method. Then, ininitializeState()
, we can try to bridge the state type change from the old one to the new one. I'm still confirming this though, as this method call pattern for old version state might only be the case for 1.1 -> 1.2, and not for 1.2 -> 1.3. -
There's an important thing missing in this change: after a restore with watermarks, when starting to run the source, you'll need to check if a watermark should first be emitted from the source considering the already existing watermarks across its subscribed partitions. I don't think this PR has addressed this yet.
Let me know what you think about the above :-)
private SerializedValue<AssignerWithPunctuatedWatermarks<T>> punctuatedWatermarkAssigner; | ||
|
||
private transient ListState<Tuple2<KafkaTopicPartition, Long>> offsetsStateForCheckpoint; | ||
private transient ListState<Tuple2<KafkaTopicPartition, Tuple2<Long, Long>>> offsetsAndWatermarksStateForCheckpoint; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should switch to have a specific checkpointed state object instead of continuing to "extend" the original Tuple. This will also be helpful for compatibility for any future changes to the checkpointed state.
Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); | ||
for (Map.Entry<KafkaTopicPartition, Tuple2<Long, Long>> kafkaTopicPartitionOffsetAndWatermark : restoreToOffsetAndWatermark.entrySet()) { | ||
offsetsAndWatermarksStateForCheckpoint.add( | ||
Tuple2.of(kafkaTopicPartitionOffsetAndWatermark.getKey(), kafkaTopicPartitionOffsetAndWatermark.getValue())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having a specific checkpoint state object will also be helpful for code readability in situations like this one (it's quite tricky to understand quickly what the key / value refers to, as well as some of the f0
, f1
calls in other parts of the PR. I know the previous code used f0
and f1
also, but I think it's a good opportunity to improve that).
|
||
case NO_TIMESTAMPS_WATERMARKS: { | ||
|
||
for (KafkaTopicPartitionState<KPH> partition : allPartitions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excessive empty line above this line.
|
||
default: | ||
// cannot happen, add this as a guard for the future | ||
throw new RuntimeException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to have a reason message here.
|
||
default: | ||
// cannot happen, add this as a guard for the future | ||
throw new RuntimeException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to have a reason message here.
return partitionWatermark; | ||
} | ||
|
||
void setCurrentWatermarkTimestamp(long watermarkTimestamp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other methods seem to be public
(although they can actually be package-private). Should we stay consistent with that here?
A re-clarification about backwards compatibility for state type change: At the same time, there was recent discussion about letting the window operators also checkpoint watermarks. So perhaps we might not need to let the Kafka sources checkpoint watermarks in the end, if the window operators already take care of restoring the previous event time. @aljoscha can you perhaps help clarify this? |
I think this PR should also include a test for the added feature. |
I hope that I ended up with another issue, and I come back to this. First, I want to ask a question that perhaps remove all the others. Tzu-Li Tai, did I understand correctly that if discussion about letting the window operators checkpoint watermarks lead to the decision to implement this functionality in the window operators, the need to preserve the state of watermarks in Kafka consumer will disappear? |
Hi @MayerRoman! Thank you for coming back to this issue. I had a quick chat offline with @aljoscha about whether or not it'll be reasonable to add this. Either your approach in this PR or letting window operators checkpoint watermarks will both solve the issue of late elements after restore. However, we thought that we should let the user function code be free of responsibility of checkpointing watermarks, and let user code simply leave that to the streaming internals (checkpointed by window operators, and perhaps also by source operators). So, essentially, the Kafka consumer should not need to checkpoint watermarks, and we can close this PR and the JIRA ticket. Very sorry for the late discussion on this, and having you worked on it already. Let we know what you think and whether or not you agree :-) |
Hello, Tzu-Li Tai! Do not worry about the done work, I got good experience in the process. |
Thanks. Can you please close this PR then :-D ? I'll close the JIRA. |
Ok |
[FLINK-4616] Kafka consumer doesn't store last emmited watermarks per partition in state.