Skip to content

Commit

Permalink
Expand checkpoint id logging (#3060)
Browse files Browse the repository at this point in the history
* Add to logging of local checkpoint state info

Prior to this commit, we would only store 2 pieces of info
for local checkpoint state:
1. last committed checkpoint #
2. rollback checkpoint #

This commit adds a 3rd integer: current checkpoint #.
Its purpose is to avoid reusing checkpoint #s, despite possible
crashes by the initializer worker.

* Add to logging of local checkpoint state info 2
  • Loading branch information
Scott Lystig Fritchie committed Nov 19, 2019
1 parent 9680d93 commit fcea2e3
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 26 deletions.
65 changes: 41 additions & 24 deletions lib/wallaroo/core/checkpoint/checkpoint_initiator.pony
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ actor CheckpointInitiator is Initializable
ifdef "resilience" then
_clear_pending_checkpoints()
_current_checkpoint_id = _current_checkpoint_id + 1
_save_checkpoint_id(_current_checkpoint_id, _last_complete_checkpoint_id,
_last_rollback_id)

ifdef "checkpoint_trace" then
try
Expand Down Expand Up @@ -509,9 +511,10 @@ actor CheckpointInitiator is Initializable
@printf[I32]("CheckpointInitiator: Checkpoint %s is complete!\n".
cstring(), st.id.string().cstring())
end
_save_checkpoint_id(st.id, _last_rollback_id)
_last_complete_checkpoint_id = st.id
_propagate_checkpoint_complete(st.id)
_save_checkpoint_id(_current_checkpoint_id,
_last_complete_checkpoint_id, _last_rollback_id)
_propagate_checkpoint_complete(_last_complete_checkpoint_id)
try
let msg = ChannelMsgEncoder.commit_checkpoint_id(st.id,
_last_rollback_id, _worker_name, _auth)?
Expand Down Expand Up @@ -568,7 +571,8 @@ actor CheckpointInitiator is Initializable
if (_primary_worker == _worker_name) then
let rollback_id = _last_rollback_id + 1
_last_rollback_id = rollback_id
_save_checkpoint_id(_last_complete_checkpoint_id, rollback_id)
_save_checkpoint_id(_current_checkpoint_id, _last_complete_checkpoint_id,
_last_rollback_id)
promise(_last_rollback_id)
else
try
Expand Down Expand Up @@ -604,6 +608,8 @@ actor CheckpointInitiator is Initializable
_last_complete_checkpoint_id)
if _current_checkpoint_id < _last_complete_checkpoint_id then
_current_checkpoint_id = _last_complete_checkpoint_id
_save_checkpoint_id(_current_checkpoint_id,
_last_complete_checkpoint_id, rollback_id)
end
let barrier_promise = Promise[BarrierToken]
barrier_promise.next[None]({(t: BarrierToken) =>
Expand Down Expand Up @@ -631,7 +637,8 @@ actor CheckpointInitiator is Initializable

be rollback_complete(rollback_id: RollbackId) =>
ifdef "resilience" then
_save_checkpoint_id(_last_complete_checkpoint_id, rollback_id)
_save_checkpoint_id(_current_checkpoint_id, _last_complete_checkpoint_id,
rollback_id)
end

be commit_checkpoint_id(checkpoint_id: CheckpointId, rollback_id: RollbackId,
Expand All @@ -645,22 +652,25 @@ actor CheckpointInitiator is Initializable
"not the primary for checkpoints. Ignoring.\n").cstring())
end

fun ref _commit_checkpoint_id(checkpoint_id: CheckpointId,
fun ref _commit_checkpoint_id(current_checkpoint_id: CheckpointId,
rollback_id: RollbackId)
=>
ifdef "resilience" then
_current_checkpoint_id = checkpoint_id
_last_complete_checkpoint_id = checkpoint_id
_save_checkpoint_id(checkpoint_id, rollback_id)
_current_checkpoint_id = current_checkpoint_id
_last_complete_checkpoint_id = current_checkpoint_id
_save_checkpoint_id(_current_checkpoint_id, _last_complete_checkpoint_id,
rollback_id)
end

fun ref _save_checkpoint_id(checkpoint_id: CheckpointId,
rollback_id: RollbackId)
fun ref _save_checkpoint_id(current_checkpoint_id: CheckpointId,
last_complete_checkpoint_id: CheckpointId, rollback_id: RollbackId)
=>
try
ifdef "checkpoint_trace" then
@printf[I32]("Saving CheckpointId %s and RollbackId %s\n".cstring(),
checkpoint_id.string().cstring(), rollback_id.string().cstring())
@printf[I32]("Saving current CheckpointId %s last complete CheckpointId %s RollbackId %s\n".cstring(),
current_checkpoint_id.string().cstring(),
last_complete_checkpoint_id.string().cstring(),
rollback_id.string().cstring())
end
let filepath = FilePath(_auth, _checkpoint_id_file)?
// TODO: We'll need to rotate this file since it will grow.
Expand All @@ -669,7 +679,8 @@ actor CheckpointInitiator is Initializable
_do_local_file_io)
file.seek_end(0)

_wb.u64_be(checkpoint_id)
_wb.u64_be(current_checkpoint_id)
_wb.u64_be(last_complete_checkpoint_id)
_wb.u64_be(rollback_id)
// TODO: We can't be sure we actually wrote all this out given the
// way this code works.
Expand All @@ -683,10 +694,11 @@ actor CheckpointInitiator is Initializable

fun ref _load_latest_checkpoint_id() =>
ifdef "resilience" then
(let checkpoint_id, let rollback_id) =
LatestCheckpointId.read(_auth, _checkpoint_id_file)
_current_checkpoint_id = checkpoint_id
_last_complete_checkpoint_id = checkpoint_id
(let current_checkpoint_id,
let last_complete_checkpoint_id,
let rollback_id) = LatestCheckpointId.read(_auth, _checkpoint_id_file)
_current_checkpoint_id = current_checkpoint_id
_last_complete_checkpoint_id = last_complete_checkpoint_id
_last_rollback_id = rollback_id
end

Expand All @@ -697,28 +709,33 @@ actor CheckpointInitiator is Initializable

primitive LatestCheckpointId
fun read(auth: AmbientAuth, checkpoint_id_file: String):
(CheckpointId, RollbackId)
(CheckpointId, CheckpointId, RollbackId)
=>
try
let filepath = FilePath(auth, checkpoint_id_file)?
if filepath.exists() then
let file = File(filepath)
file.seek_end(0)
file.seek(-16)
file.seek(-24)
let r = Reader
r.append(file.read(16))
let checkpoint_id = r.u64_be()?
r.append(file.read(24))
let current_checkpoint_id = r.u64_be()?
let last_complete_checkpoint_id = r.u64_be()?
let rollback_id = r.u64_be()?
(checkpoint_id, rollback_id)
ifdef "checkpoint_trace" then
@printf[I32]("Found checkpoint ids in recovery file: current %lu last successful %lu rollback %lu\n".cstring(),
current_checkpoint_id, last_complete_checkpoint_id, rollback_id)
end
(current_checkpoint_id, last_complete_checkpoint_id, rollback_id)
else
@printf[I32]("No latest checkpoint id in recovery file.\n".cstring())
Fail()
(0, 0)
(0, 0, 0)
end
else
@printf[I32]("Error reading checkpoint id recovery file!".cstring())
Fail()
(0, 0)
(0, 0, 0)
end

class _InitiateCheckpoint is TimerNotify
Expand Down
5 changes: 3 additions & 2 deletions lib/wallaroo/startup.pony
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,10 @@ actor Startup
_disposables.set(joining_listener)
elseif _is_recovering then
ifdef "resilience" then
(let checkpoint_id, let rollback_id) =
(let current_checkpoint_id, let last_complete_checkpoint_id,
let rollback_id) =
LatestCheckpointId.read(auth, _checkpoint_ids_file)
_initialize(checkpoint_id)
_initialize(last_complete_checkpoint_id)
else
_initialize()
end
Expand Down

0 comments on commit fcea2e3

Please sign in to comment.