-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-24064][connector/common] HybridSource restore from savepoint #17111
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 8b30569 (Thu Sep 02 03:35:04 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
@AHeise @stevenzwu please take a look at the deserialization change in general. I'm planning for some more cleanup work on this PR tomorrow but would also like for this to go into the 1.14 release. |
@@ -92,13 +90,13 @@ | |||
|
|||
private final List<SourceListEntry> sources; | |||
// sources are populated per subtask at switch time | |||
private final Map<Integer, Source> switchedSources; | |||
private final HybridSourceSplitSerializer.SwitchedSources switchedSources; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is SwitchedSources
nested inside the HybridSourceSplitSerializer
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did I miss sth? I didn't see switchedSources
used anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes please remove. switchedSources
acted like a shared cache which is now not necessary anymore. (Not sure how I missed that in the initial review, I guess I was too focused on API.)
This should now just be a field in enumerator/reader that caches the sources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That came in after moving away from the fixed source sequence that originally both, enumerator and serializer had access to. They still needed access to the underlying serializer and therefore to the source that provided that serializer. Now that serializers are decoupled, this hacky thing is no longer needed. I just missed that in the refactor, thanks @stevenzwu for catching it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for providing this fix! I left some comments, see below.
...test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
Outdated
Show resolved
Hide resolved
this.switchedSources = switchedSources; | ||
this.cachedSerializers = new HashMap<>(); | ||
} | ||
public HybridSourceEnumeratorStateSerializer() {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much cleaner now!
return source.getSplitSerializer(); | ||
})); | ||
/** Sources that participated in switching with cached serializers. */ | ||
public static class SwitchedSources implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see why this is a nested class here.
...src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializer.java
Outdated
Show resolved
Hide resolved
@@ -92,13 +90,13 @@ | |||
|
|||
private final List<SourceListEntry> sources; | |||
// sources are populated per subtask at switch time | |||
private final Map<Integer, Source> switchedSources; | |||
private final HybridSourceSplitSerializer.SwitchedSources switchedSources; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes please remove. switchedSources
acted like a shared cache which is now not necessary anymore. (Not sure how I missed that in the initial review, I guess I was too focused on API.)
This should now just be a field in enumerator/reader that caches the sources.
return wrappedStateBytes; | ||
} | ||
|
||
public int wrappedStateSerializerVersion() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: wrappedStateSerializerVersion
-> getWrappedStateSerializerVersion
just to be consistent of Flink style
out.writeInt(enumStateBytes.length); | ||
out.write(enumStateBytes); | ||
out.writeInt(enumState.wrappedStateSerializerVersion()); | ||
out.writeInt(enumState.getWrappedState().length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
integer would limit the state size to 2 GB. not sure if we need to worry about it or not. It can happen if the historical storage (like HDFS or Iceberg) have many files/splits for the booststrap scan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each Iceberg split contains data files, delete files (for upsert), schema string. Each data file also contains stats for every column. if the table is wide (many columns), each split may go over 10 KB
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This discussion is probably outside the scope of this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please note that the limit would not be the integer to represent the size but rather the byte[] array that cannot go beyond that. We do not have 64 bit array https://www.nayuki.io/page/large-arrays-proposal-for-java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. understood it is the choice of bye[], which is then a limitation of SimpleVersionedSerializer's API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also wonder if we would hit other issues with such large state serialized in the coordinator? Can IcebergSource limit the number of splits it keeps in the checkpoint and only add more once some have been processed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tweise I have thought about adding the optimization (of limiting the number of splits) for streaming read in the future. for bounded job, we can't. on the other hand, bounded job may not need to have checkpoint enabled
@tweise do we need a MiniCluster unit test for the savepoint trigger and restore? |
fd39ae6
to
f4b4b59
Compare
} | ||
|
||
public SimpleVersionedSerializer<SourceSplit> serializerOf(int sourceIndex) { | ||
return cachedSerializers.computeIfAbsent( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to cache the SplitSerializer? seems unnecessary to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To not create a new serializer instance per split, but rather once per coordinator/operator (matching how it works for the top level source).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Originally I was imagining the singleton pattern from file source. Then this caching is not necessary.
@Override
public SimpleVersionedSerializer<FileSourceSplit> getSplitSerializer() {
return FileSourceSplitSerializer.INSTANCE;
}
I guess it depends on the implementation. Some source impls may construct a new object in this method and hence this caching might be beneficial.
I'm going to look into adding that to |
@@ -21,18 +21,25 @@ | |||
/** The state of hybrid source enumerator. */ | |||
public class HybridSourceEnumeratorState { | |||
private final int currentSourceIndex; | |||
private final Object wrappedState; | |||
private byte[] wrappedStateBytes; | |||
private final int wrappedStateSerializerVersion; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private final int wrappedStateSerializerVersion; | |
private final int serializerVersion; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considered that as well and prefer the verbose name to make clear that this is the serializer version for the underlying state vs that of HybridSourceEnumeratorState
.
import java.util.List; | ||
import java.util.Objects; | ||
|
||
/** Source split that wraps the actual split type. */ | ||
public class HybridSourceSplit implements SourceSplit { | ||
|
||
private final SourceSplit wrappedSplit; | ||
private final byte[] wrappedSplitBytes; | ||
private final int wrappedSplitSerializerVersion; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private final int wrappedSplitSerializerVersion; | |
private final int serializerVersion; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considered that as well and prefer the verbose name to make clear that this is the serializer version for the underlying state vs that of HybridSourceSplit
.
@@ -57,38 +69,64 @@ public boolean equals(Object o) { | |||
return false; | |||
} | |||
HybridSourceSplit that = (HybridSourceSplit) o; | |||
return sourceIndex == that.sourceIndex && wrappedSplit.equals(that.wrappedSplit); | |||
return sourceIndex == that.sourceIndex | |||
&& Arrays.equals(wrappedSplitBytes, that.wrappedSplitBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need the splitId
equal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed because splitId
is already part of wrappedSplitBytes
.
} | ||
|
||
@Override | ||
public String toString() { | ||
return "HybridSourceSplit{" | ||
+ "realSplit=" | ||
+ wrappedSplit | ||
+ wrappedSplitBytes | ||
+ ", sourceIndex=" | ||
+ sourceIndex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the splitId
field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I also removed wrappedSplitBytes
because it doesn't provide meaningful information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM after @SteNicholas 's comments are addressed
74087ed
to
5690d85
Compare
@stevenzwu @AHeise @SteNicholas thanks for the review! |
What is the purpose of the change
Restore from savepoint fails due to deserialization of underlying splits before the underlying enumerator has been restored (details in JIRA). With this change deserialization will be deferred and be explicit in the HybridSource enumerator/reader.
Verifying this change
Existing tests don't cover restore from savepoint (ITCase performs recovery from initial state). Deserialization of HybridSplit and enumerator checkpoint covered by unit test. Changes verified with internal deployment. Planning to add unit test that just deserializes HybridSourceSplit before merging.