diff --git a/lib/wallaroo/core/checkpoint/checkpoint_initiator.pony b/lib/wallaroo/core/checkpoint/checkpoint_initiator.pony index 0a40751f87..04ce62e158 100644 --- a/lib/wallaroo/core/checkpoint/checkpoint_initiator.pony +++ b/lib/wallaroo/core/checkpoint/checkpoint_initiator.pony @@ -305,6 +305,8 @@ actor CheckpointInitiator is Initializable ifdef "resilience" then _clear_pending_checkpoints() _current_checkpoint_id = _current_checkpoint_id + 1 + _save_checkpoint_id(_current_checkpoint_id, _last_complete_checkpoint_id, + _last_rollback_id) ifdef "checkpoint_trace" then try @@ -509,9 +511,10 @@ actor CheckpointInitiator is Initializable @printf[I32]("CheckpointInitiator: Checkpoint %s is complete!\n". cstring(), st.id.string().cstring()) end - _save_checkpoint_id(st.id, _last_rollback_id) _last_complete_checkpoint_id = st.id - _propagate_checkpoint_complete(st.id) + _save_checkpoint_id(_current_checkpoint_id, + _last_complete_checkpoint_id, _last_rollback_id) + _propagate_checkpoint_complete(_last_complete_checkpoint_id) try let msg = ChannelMsgEncoder.commit_checkpoint_id(st.id, _last_rollback_id, _worker_name, _auth)? @@ -568,7 +571,8 @@ actor CheckpointInitiator is Initializable if (_primary_worker == _worker_name) then let rollback_id = _last_rollback_id + 1 _last_rollback_id = rollback_id - _save_checkpoint_id(_last_complete_checkpoint_id, rollback_id) + _save_checkpoint_id(_current_checkpoint_id, _last_complete_checkpoint_id, + _last_rollback_id) promise(_last_rollback_id) else try @@ -604,6 +608,8 @@ actor CheckpointInitiator is Initializable _last_complete_checkpoint_id) if _current_checkpoint_id < _last_complete_checkpoint_id then _current_checkpoint_id = _last_complete_checkpoint_id + _save_checkpoint_id(_current_checkpoint_id, + _last_complete_checkpoint_id, rollback_id) end let barrier_promise = Promise[BarrierToken] barrier_promise.next[None]({(t: BarrierToken) => @@ -631,7 +637,8 @@ actor CheckpointInitiator is Initializable be rollback_complete(rollback_id: RollbackId) => ifdef "resilience" then - _save_checkpoint_id(_last_complete_checkpoint_id, rollback_id) + _save_checkpoint_id(_current_checkpoint_id, _last_complete_checkpoint_id, + rollback_id) end be commit_checkpoint_id(checkpoint_id: CheckpointId, rollback_id: RollbackId, @@ -645,22 +652,25 @@ actor CheckpointInitiator is Initializable "not the primary for checkpoints. Ignoring.\n").cstring()) end - fun ref _commit_checkpoint_id(checkpoint_id: CheckpointId, + fun ref _commit_checkpoint_id(current_checkpoint_id: CheckpointId, rollback_id: RollbackId) => ifdef "resilience" then - _current_checkpoint_id = checkpoint_id - _last_complete_checkpoint_id = checkpoint_id - _save_checkpoint_id(checkpoint_id, rollback_id) + _current_checkpoint_id = current_checkpoint_id + _last_complete_checkpoint_id = current_checkpoint_id + _save_checkpoint_id(_current_checkpoint_id, _last_complete_checkpoint_id, + rollback_id) end - fun ref _save_checkpoint_id(checkpoint_id: CheckpointId, - rollback_id: RollbackId) + fun ref _save_checkpoint_id(current_checkpoint_id: CheckpointId, + last_complete_checkpoint_id: CheckpointId, rollback_id: RollbackId) => try ifdef "checkpoint_trace" then - @printf[I32]("Saving CheckpointId %s and RollbackId %s\n".cstring(), - checkpoint_id.string().cstring(), rollback_id.string().cstring()) + @printf[I32]("Saving current CheckpointId %s last complete CheckpointId %s RollbackId %s\n".cstring(), + current_checkpoint_id.string().cstring(), + last_complete_checkpoint_id.string().cstring(), + rollback_id.string().cstring()) end let filepath = FilePath(_auth, _checkpoint_id_file)? // TODO: We'll need to rotate this file since it will grow. @@ -669,7 +679,8 @@ actor CheckpointInitiator is Initializable _do_local_file_io) file.seek_end(0) - _wb.u64_be(checkpoint_id) + _wb.u64_be(current_checkpoint_id) + _wb.u64_be(last_complete_checkpoint_id) _wb.u64_be(rollback_id) // TODO: We can't be sure we actually wrote all this out given the // way this code works. @@ -683,10 +694,11 @@ actor CheckpointInitiator is Initializable fun ref _load_latest_checkpoint_id() => ifdef "resilience" then - (let checkpoint_id, let rollback_id) = - LatestCheckpointId.read(_auth, _checkpoint_id_file) - _current_checkpoint_id = checkpoint_id - _last_complete_checkpoint_id = checkpoint_id + (let current_checkpoint_id, + let last_complete_checkpoint_id, + let rollback_id) = LatestCheckpointId.read(_auth, _checkpoint_id_file) + _current_checkpoint_id = current_checkpoint_id + _last_complete_checkpoint_id = last_complete_checkpoint_id _last_rollback_id = rollback_id end @@ -697,28 +709,33 @@ actor CheckpointInitiator is Initializable primitive LatestCheckpointId fun read(auth: AmbientAuth, checkpoint_id_file: String): - (CheckpointId, RollbackId) + (CheckpointId, CheckpointId, RollbackId) => try let filepath = FilePath(auth, checkpoint_id_file)? if filepath.exists() then let file = File(filepath) file.seek_end(0) - file.seek(-16) + file.seek(-24) let r = Reader - r.append(file.read(16)) - let checkpoint_id = r.u64_be()? + r.append(file.read(24)) + let current_checkpoint_id = r.u64_be()? + let last_complete_checkpoint_id = r.u64_be()? let rollback_id = r.u64_be()? - (checkpoint_id, rollback_id) + ifdef "checkpoint_trace" then + @printf[I32]("Found checkpoint ids in recovery file: current %lu last successful %lu rollback %lu\n".cstring(), + current_checkpoint_id, last_complete_checkpoint_id, rollback_id) + end + (current_checkpoint_id, last_complete_checkpoint_id, rollback_id) else @printf[I32]("No latest checkpoint id in recovery file.\n".cstring()) Fail() - (0, 0) + (0, 0, 0) end else @printf[I32]("Error reading checkpoint id recovery file!".cstring()) Fail() - (0, 0) + (0, 0, 0) end class _InitiateCheckpoint is TimerNotify diff --git a/lib/wallaroo/startup.pony b/lib/wallaroo/startup.pony index 8ba88512b8..704f4b2d07 100644 --- a/lib/wallaroo/startup.pony +++ b/lib/wallaroo/startup.pony @@ -180,9 +180,10 @@ actor Startup _disposables.set(joining_listener) elseif _is_recovering then ifdef "resilience" then - (let checkpoint_id, let rollback_id) = + (let current_checkpoint_id, let last_complete_checkpoint_id, + let rollback_id) = LatestCheckpointId.read(auth, _checkpoint_ids_file) - _initialize(checkpoint_id) + _initialize(last_complete_checkpoint_id) else _initialize() end