Recover from brutal shutdown (checkpointing) (Take 2) #608
Closed
chubei
started this conversation in
Feature Requests
Replies: 1 comment 4 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Environment
Our checkpointing solution is based on a data structure
Environment
, which is built upon lmdb. Here we borrow the terms from lmdb.Environment
is defined as follows, ignoring most error handling.The most important method is
rewind
, with the help of it, we can implement the following simple algorithm of checkpointing.Checkpointing
We use the
Checkpoint
defined in the last discussion about checkpointing to monomorphize theEnvironment
type. See #516.Every node in the DAG has an associated
Environment
. Nodes persist data to the environment.Upon pipeline start, we read all the nodes'
history_range
, and find the largest possible checkpoint that all nodes can rewind to. We rewind all nodes to this checkpoint and start sources from this checkpoint.Merger
As the pipeline runs, the space that the environment occupies grows because it has to remember all the histories. The
Merger
is responsible for dropping old history and releasing space.The
Merger
runs in a separate thread and periodically queries the history range of all environments. It finds the largest possible checkpoint that all nodes can rewind to, and calldrop_history_before
with this checkpoint on all environments.Implementing Managed Database
A managed database is implemented as one primary database, one snapshot database and several incremental databases.
The primary database always has the latest state.
The snapshot database is initially empty, and is only updated on
drop_history_before
.Each increamental database is the difference between one checkpoint and its immediate next checkpoint.
Initialization
Upon initialization, primary database and snapshot database are both empty. And there's no incremental database.
Create Transaction
Upon transaction creation, an incremental database from latest checkpoint to the next one is created. Following writes operate on this increamental database.
Reads
Reads always read from the primary database.
Writes
Upon writing,
put
s anddelete
s operates on the primary database. The operation log is recorded in the incremental database.There are two ways to implement the incremental database, one is to just serialize all the operations, the other is to summarize the operations and only store the difference.
For example, for operations
Put 1 1
,Put 2 2
andDelete 1 1
, the first way would store[Put 1 1, Put 2 2, Delete 1 1]
, the second ways would store[Put 2 2]
. Although the exact workings of the second way is not totally clear now.Query History Range
The history range is the just [snapshot database, primary database].
Rewinding
Rewinding happens in three steps:
This is assuming our incremental database doesn't support going backwards in time. If it does, we can just reversely apply the operations to the primary database and remove the applied increamental database.
Merging
Upon merging ,we apply all the operations in the incremental databases to the snapshot database, and remove the incremental database, until we reach the target checkpoing.
Beta Was this translation helpful? Give feedback.
All reactions