New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-4218] [checkpoints] Do not rely on FileSystem to determine state sizes #2544
Conversation
@StefanRRichter Maybe interesting for you to review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good 👍
…te sizes This prevents failures on eventually consistent S3, where the operations for keys (=entries in the parent directory/bucket) are not guaranteed to be immediately consistent (visible) after a blob was written.
7ce2de7
to
dc12d0b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Except for my two minor comments, everything looks good to me.
@@ -301,9 +301,16 @@ public StreamStateHandle closeAndGetHandle() throws IOException { | |||
} | |||
else { | |||
flush(); | |||
|
|||
long size = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if returning -1 as size on exception is ideal. Currently, this value should one be used in the calculation of meta data, but one might be tempted to use it e.g. to preallocate a byte[] to read the file into, so this should at least be documented in StateObject
. Furthermore, we make the assumption that the stream position is also equal to the final file size. Not entirely sure if this holds for all streams and file systems, but I guess this is the best we can do without asking the file system for meta data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stream position should be okay to determine the state size. All instances I checked were accurate there.
Also, given that the size is more informational and should not be relied upon, it should be all the less critical.
As
*/ | ||
long getStateSize() throws Exception; | ||
long getStateSize() throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think with the change in StreamStateHandle
, even throwing IOException becomes obsolete now for all existing implementations. We might remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should do that.
Manually merged in 95e9004 |
This prevents failures on eventually consistent S3, where the operations for keys (=entries in the parent directory/bucket) are not guaranteed to be immediately consistent (visible) after a blob was written.
Not relying on any operation on keys (= requesting
FileStatus
) should mitigate the problem.This also changes the exception signature from
getStateSize()
fromException
toIOException
, which fits more natural with the exception signatures of some other I/O methods.Related issue: We may still want to have retries on
FileStatus
operations on S3, for other parts of the system (like FileOutputFormats)