New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-5823] [checkpoints] State Backends also handle Checkpoint Metadata and introduce Global Cleanup Hooks #3522
Conversation
…data and use global cleanup hooks This closes apache#3522
2bd8172
to
460d480
Compare
CI tests pass (except one profile which does not complete within the limit of 50 minutes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really good changes @StephanEwen. The design and implementation are sound and well tested. Furthermore, the new code is thoroughly documented and difficult code sections are explained. +1 for merging :-)
catch (Exception e) { | ||
// catch all exception, to not let errors in cleanup hooks fail the checkpoint | ||
LOG.warn("Failed to create the cleanup hook for checkpoint " + checkpointId + | ||
". Generic cleanup path will be executed..."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e
might be interesting to log since it could contain information what went wrong.
"belonging to task {} of job {}.", checkpointId, executionAttemptID, jobId, | ||
throwable); | ||
} | ||
catch (Throwable t) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code reformatting
catch (Exception e) { | ||
// we catch all exceptions here to make sure we go through the generic path as a | ||
// fallback in case where cleanup fails | ||
LOG.warn("Fast cleanup hook f"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e
is being swallowed and the log message seems to be truncated.
} | ||
catch (Exception e) { | ||
if (firstException == null) { | ||
firstException = e; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could add all following exceptions as suppressed exceptions.
CheckpointMetadataOutputStream createCheckpointStateOutputStream() throws IOException; | ||
|
||
/** | ||
* Gets the location (as a string pointer) where the metadata factory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sentence does not seem to be complete
filesystem = FileSystem.get(checkpointDataUri); | ||
} catch (IOException e) { | ||
// should never happen | ||
throw new FlinkRuntimeException("cannot access file system for URI " + checkpointDataUri); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe adding e
as cause?
"the available configurations."; | ||
LOG.warn("Could not verify checkpoint path. This might be caused by a genuine " + | ||
"problem or by the fact that the file system is not accessible from the " + | ||
"client. Reason:{}", reason); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could fuse reason
and the logging statement into a single string.
} | ||
|
||
this.basePath = basePath; | ||
this.fs = fs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkNotNull
checks could be inserted here.
* rather than in files */ | ||
public static final String MEMORY_THRESHOLD_CONF_KEY = "state.backend.fs.memory-threshold"; | ||
/** Config key for the maximum state size that is stored with the metadata */ | ||
public static final ConfigOption<Integer> MEMORY_THRESHOLD_CONF = ConfigOptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering whether we want to group all relevant state options into a dedicated StateOptions
class or keep them in the respective class where they are used. I fear that it will be quite hard for the user to find all available options.
* capabilities to spill to disk. Checkpoints are serialized and the serialized data is | ||
* transferred | ||
* This state backend holds the working state in the memory (JVM heap) of the TaskManagers. | ||
* The state backend checkpoints state directly the JobManager's memory (hence the backend's name), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
preposition seems to be missing here
} | ||
|
||
@Override | ||
public StreamStateHandle resolveCheckpointLocation(String pointer) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to introduce an actual class for the concept of pointers to improve type safety and readability. Through subclasses, it can also be easier to reason about what kinds of pointer a backend accepts or rejects, compared to think about how a string was parsed and failed.
|
||
@Nullable | ||
@Override | ||
public String getMetadataPersistenceLocation() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The meta data location might be a concept that justifies introducing a class. In particular, it seems that there might be different location concepts in the future, so simply encoding all this in a string is questionable.
// check if we have a shortcut hook to dispose the state | ||
final StateDisposeHook disposeHook = this.disposeHook; | ||
if (disposeHook != null) { | ||
// fast path! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is now a bit more lengthy than it could be; i would suggest to factor out the two paths (fast and standard) in 2 private methods. The textual comments are hinting this and could become obsolete once the code is split up into two named methods.
// check if we have a shortcut hook to dispose the state | ||
final StateDisposeHook disposeHook = this.disposeHook; | ||
if (disposeHook != null) { | ||
// fast path! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a second look, do you think it could even make sense to have the previous behaviour also encapsulated as a StateDisposalHook
and get rid of the if completely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, that should be the case in the long run.
|
||
/** | ||
* A {@link CheckpointMetadataOutputStream} that writes into a file and | ||
* returns a {@link StreamStateHandle} upon closing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment here is outdated and should link to StreamHandleAndPointer
.
|
||
@Override | ||
public void close() { | ||
if (!closed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either the volatile
on closed
is not required or this does not actually give the intended thread safety and should be replaced by AtomicBoolean
.
|
||
try { | ||
out.close(); | ||
fileSystem.delete(path, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this delete is intended to be inside the try
or should be moved to a finally
clause
/** | ||
* A combination of a {@code StreamStateHandle} and an external pointer (in the form of a String). | ||
*/ | ||
final class StreamHandleAndPointer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not entirely sure why we need this "AndPointer" part. The pointer in the end is a string and the interpretation of the string is generic and up to the backend that is confronted with it. So right now, the string is always a path, which is already contained in the plain StreamStateHandle
that happens to be a FileStateHandle
for the current backends. So similar to expecting a certain string format, the backend could as well expect certain subclasses of StreamStateHandle
, like FileStateHandle
(or something for a different backend), and resolve the "pointer" from there. Or maybe I am missing something fundamental here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea of the pointer is to have a state-backend independent string that indicates a checkpoint location. I picked a string, because the location is ultimately passed as a string, either from the user command flink run -s <savepoint path>
or from ZooKeeper' "last completed checkpoint" entry.
Introducing the StreamHandleAndPointer
here spares the code from having to reason about state backend specific file handles. We can change that at some point, but for the breadth of changes already in this PR, this helped reduce the scope a bit.
I would like to merge this for 1.3 with a slightly reduced scope. I would include the refactoring to make the State Backends also responsible for storing the Metadata. I would NOT merge the started work about cleanup hooks. The reason for that is that after some digging through this, I found we actually have to treat different file systems quite differently. For example for HDFS and posix-style file systems, we want to dispose the checkpoint-exclusive state via a |
Here is a summary of why I picked a String initially for the metadata persistence location:
|
I agree that we can change the Strings to some proper types later. Besides what I mentioned before, I just thought that even if a string is the underlying data type for the moment, having it wrapped in a named |
Core Changes
Store Metadata in State Backend
The state backend is now responsible for storing the checkpoint metadata. There is no implicit assumption that the checkpoint metadata is stored in a file systems any more.
All checkpoint directory / savepoint directory specific config settings are now part of the state backends. The Checkpoint Coordinator simply calls the relevant methods on the state backends to store metadata.
Similar as the
CheckpointStreamFactory
for storing checkpoint state, there is now aCheckpointMetadataStreamFactory
for the metadata.State backends are not required to be able to persist metadata - only ifor HA setups and when externalized checkpoints are requested.
All checkpoints with persisted metadata are addressable via a "pointer", which is state-backend specific. For File-system based statebackends (all statebackends in Flink currently), this pointer is the file path.
Global cleanup hooks
State backends can implement an extended interface to create global cleanup hooks. For example for a file system, a global cleanup hook simply recursively deletes the checkpoint directory, which is for most file systems much faster than issuing a delete call per file.
The
MemoryStateBackend
andFsStateBackend
use fast cleanup hooks, theRocksDBStateBackend
should get then in a followup (see below).Application-defined State Backends pick up additional values from the configuration
We need to keep supporting the scenario of setting a state backends in the user program, but configuring parameters like checkpoint directory in the cluster config. To support that, state backends may implement an additional interface which lets them pick up configuration values from the cluster configuration.
Altered user-facing Behavior
Externalized checkpoints store all files (state and metadata) strictly in the same directory now. (Savepoints were contained in a single directory already before)
Both savepoint commands and configuration parameters now require qualified URIs as well (i.e.,
file:///path/do/savepoint
, whereas before the configs and command line also excepted/path/do/savepoint
.Because not having qualified URIs is error-prone anyways (auto fallback to local file system) I am actually in favor of doing this change.
Tests
This adds a lot of tests, which can due to the changed design be done completely on the state backends,
without instantiating a CheckpointCoordinator.
Followups
Implement the global hooks for the RocksDB state backend. Because the RocksDB state backend internally delegates to a "storage backend", this need to few extra tricks and was not done in this already large pull request.
The HA checkpoint store needs not store checkpoint metadata itself any more, it can simply store the pointer.
This abstraction allows state backends to over periodic cleanup hooks that can search for lost state (files that are not referenced any more), which can happen when TaskManager / JobManager processes fail during finalizing a checkpoint or when handing over state between JobManager / TaskManager.