Skip to content

Conversation

sihuazhou
Copy link
Contributor

What is the purpose of the change

This PR fixes FLINK-7873. Current recover strategy will always read checkpoint data from remote FileStream (HDFS). This will cost a lot of bandwidth when the state is so big (e.g. 1T). What's worse, if this job performs recover again and again, it can eat up all network bandwidth and do a huge hurt to cluster. So, I proposed that we can cache the checkpoint data locally, and read checkpoint data from local cache as well as we can, we read the data from remote only if we fail locally. The advantage is that if a execution is assigned to the same TaskManager as before, it can save a lot of network, and obtain a faster recovery.

Brief change log

  • Add CheckpointCacheManager for TM to manage Local Checkpoint Data for each TM
  • Add CheckpointCache for Task to manage Local Checkpoint Data for each Task
  • Add CachedCheckpointStreamFactory to write checkpoint data to both DFS and local disk
  • Add CachedStreamStateHandle to read checkpoint data from local or remote
  • Here is a doc for detail: local_recovery.docx

Verifying this change

This change added tests and can be verified as follows:

  • Add tests in CheckpointCacheManagerTest.java, CheckpointCacheTest.java, CachedCheckpointStreamFactoryTest.java, SharedCacheRegistryTest.java.
  • Compile this PR and deploy it on a cluster, trigger failure randomly. (I tested this on a yarn cluster and with a naive Scheduler mechanism that allocates slot only according to state.)

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)

Documentation

  • Does this pull request introduce a new feature? (Yes)
  • doc link :local recovery

Add note and unit test for CheckpointCacheManager.

add more tests for local recovery.

add more tests for local recovery.

add comment for SharedCacheRegistry.
fix tests.

fix migrate bug.
@sihuazhou
Copy link
Contributor Author

@StefanRRichter Could you please have a look at this?

Copy link
Contributor

@StefanRRichter StefanRRichter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for this bit contribution. I think this is overall a nice approach that works for the intended purpose. There are certain similarities to what I had planned for implementing this and started in PR #4745. In particular, your CheckpointCacheManager basically is similar to what I have planned to implement in TaskExecutorLocalStateStoresManager and CheckpointCache corresponds to TaskLocalStateStore. Also wrapping the output stream into a decorator that duplicates the write to a local file matches my intend for the implementation of certain checkpoints that are based on a single file per backend. We could somehow map this on said PR.

However, one thing that is problematic with this work for a general approach to local state caching is that your implementation is heavily based on the idea that for every (file based) remote state handle there is a (file based) cache state handle counterpart. Let me explain my concern in more detail.
Some state backends can leverage certain specialities in their underlying data structures that allow for different/better ways to checkpoint and recover locally than just mimicking what we do with the remote state. Two examples that I find hard to build on top of your approach
are:

  • An asynchronous heap state backend that keeps the in memory copy-on-write snapshot (CopyOnWriteStateTableSnapshot) as local state an can recover super fast from already existing objects in memory instead of doing any reading from disk.
  • RocksDB incremental snapshots that build on top of RocksBD's native checkpoints. In the process of doing Flink incremental checkpoints, RocksDB already prepares a directory that holds the complete state. There is no need to write all those files again to another output stream. Instead we can just keep the directory alive in the cache manager, but this would again break the 1:1 relationship from remote to local state. As another important side effect, RocksDB's native mechanism on local files uses file system hard-links to do all the reference counting, which makes SharedCacheRegistry completely obsolete and keeps things simpler.

What I would propose instead is breaking the 1:1 relationship that maps each remote state handle to a local state handle and work with checkpoint ids. Like this, we can store local state in whatever form we like and can identify the local objects that we must read, release, etc. and we can also avoid introducing yet another type of state handle implementation that is known to the SavepointSerializer. In general, I would actually prefer to reduce the number of those in the future rather than introducing more, which always couples the JobManager tighter to the backends. I would suggest to send the remote handles and the checkpoint id, and the task manager can figure out if there is local state for the given checkpoint id and substitute the remote handle if possible.

I would also suggest to make local recovery configurable, because it can introduce additional costs to write another state copy locally. Some user might value fast checkpoints and saved space over faster recovery, at least they could have a choice.

Another general problem (that currently also exists for other cases like IOManager) is that, in case of JVM crashes or machine failures, the local state in the temp directories is never cleaned up and can pile up to the point that the disk space is exhausted and makes jobs constantly fail on a machine that looks otherwise healthy. Solving this is orthogonal to the PR, but the effects of this shortcoming can be amplified by also hosting local state.

One point that I like is that you use TTL to as additional heuristic when to release state. I think (at least for some intermediate step) this can be a good addition.

Disclaimer: So far I mostly reviewed the PR from an architectural point to evaluate if we can use this as a basis for an implementation of local recovery and did not dive into all implementation details.

final CheckpointCacheManager checkpointCacheManager = new CheckpointCacheManager(
new ScheduledThreadPoolExecutor(1),
Executors.directExecutor(),
taskManagerServicesConfiguration.getTmpDirPaths()[0]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this problematic, because it does not consider all the configured tmp directories. While most user probably have only a single tmp directory configured, this can be problematic if somebody makes use of multiple directories (e.g. to utilize multiple smaller disks). We should also be sensitive about this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I even thought we should add a configuration for local recovery to store the data.

byte[] internalData = byteStreamStateHandle.getData();
dos.writeInt(internalData.length);
dos.write(byteStreamStateHandle.getData());
} else if (stateHandle instanceof CachedStreamStateHandle) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means we are actually introducing significant new code to the job manager, that even impacts the serialization format. I think this should not strictly be required if we map local state to checkpoint ids.

Copy link
Contributor Author

@sihuazhou sihuazhou Nov 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@StefanRRichter Thanks very much for reviewing my code and Tanks very much for your so detailed expression of your opinion, very happy can be similar to what you think in some places, there are two things I want to explain a bit:

  1. About the 1:1 relationship between remote handle and local handle, In fact, I think each local state handle corresponds to a smallest storage unit of a checkpoint. For example, each Backend will generates a IncrementalKeyedStateHandle for every increment checkpoint, but IncrementalKeyedStateHandle is a composite handle, it contains a collect of sub StateHandle to stores data (meta & sst & misc), in this case the sub StateHanlde is the smallest storage unit and each of them have 1:1 relationship with local state handle and IncrementalKeyedStateHandle has 1:N relationship with local state handle(Now, CheckpointStateOutputStream.closeAndGet () returns a remote handle, which I viewed as the smallest storage unit). For incremental checkpoint, it can be optimized indeed, we can provide a green path for it to put cache entry into checkpoint cache, it doesn't need to write data locally when Transmitting data to remote end. I didn't do that because I wanted to provide a unified way to meet up all Backends requirements and I didn't want to change the code of Backend so much.

  2. The local handle can be not only a local file (I reorganized some change to make this logic look clearer, I made the CachedOutputStream an interface and provided a FsCachedOutputStream base on it), because the CantryEntry use StreamStateHandle to refer to local Checkpoint data, so the local handle can also be stored in memory (ByteStreamStateHandle), or other storage medium (This may apply to CopyOnWriteStateTableSnapshot problem described above). We can implement CachedOutputStream to meet up different requirements. Currently, a FsCachedOutputStream is provided to stored data into local file or in memory (when data size is too small).

  3. About the SharedCacheRegistry, although we can provide a sepcial solution for RocksDB incremental checkpoint and SharedCacheRegistry become completely obsolete in this case. But If we want to provide a support for other types of incremental checkpoint (increment checkpoint that not only base on RocksDB) in futures, we may still need it, it is a genenral solution for incremental checkpoint just like the SharedStateRegistry in CheckpointCroodinator.

IMO map local state to checkpoint id can also work, but I have some minor questions about that:

  1. Can we provide a unified local state way to meet all of the current state backend requirements (of course, the RocksDB can be optimized)?
  2. Since the local state is mapped according to the checkpoint id, the key range detection needs to be performed locally again, which is a bit repetitive, can this be avoided with the work on JM.

Although I've expressed my ideas, but I think you are more professional than me in this area and your thought should be better than mine. So if you have any planned issues, I would like to work on your plan instead of this PR, but I would be so happy if you think that this PR has something useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this can be avoid by map local state to checkpoint ids. In fact, it can also be avoid when map local state to handle id, but it needs some additional code.


@Override
public long getPos() throws IOException {
return remoteOut != null ? remoteOut.getPos() :-1L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a bit dangerous. Some code may rely on certain offsets in the file, so both streams cachedOut and remoteOut better should have aligned offsets before we start. This should at least be checked in a precondition because it could lead to subtile, hard to debug bugs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I means is that any seeks to offsets that we obtain by getPos() could fail on the cached stream if the offsets are not aligned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for point out this, addressing it.

@Override
public void flush() throws IOException {
if (cacheOut != null) {
cacheOut.flush();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this a bit inconsistent: why not also catching exceptions from cacheOut here and discard the stream? In general, I would suggest to write this class in a way that failures on cacheOut to not result in a checkpoint failure (or you can make this configurable). As long as writing to the primary (remote) stream works, I would consider the checkpoint as ok, caching is more of a bonus.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, What My intend is what you described but I missed it here, I should handle it like above code, and wrapped up the catchOut operation with try catch block.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants