Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recovery state will be incorrect if any worker crashes during the initial epoch #181

Closed
davidselassie opened this issue Dec 14, 2022 · 0 comments · Fixed by #193
Closed
Assignees
Labels
type:bug Something isn't working

Comments

@davidselassie
Copy link
Contributor

Bug Description

During dataflow execution, we write out whenever a worker completes an epoch. We use these written epochs to re-derive the progress of the entire cluster. Unfortunately, though, on resume we do not know beforehand how many worker's progress messages we're waiting for. Thus if the cluster crashes after worker 1 has written progress but worker 2 has not, then upon resume, it'll look like you're resuming from just a cluster of size 1 and will silently skip data.

We need to have some way of synchronously waiting for all workers to confirm they've written a marker of their existence to the recovery store before each execution (resume or not).

Python version (python -V)

Python 3.10.6

Bytewax version (pip list | grep bytewax)

0.13.1

Relevant log output

No response

Steps to Reproduce

I don't have some example code yet, but this should theoretically be possible.

@github-actions github-actions bot added the needs triage New issue, needs triage label Dec 14, 2022
@whoahbot whoahbot added type:bug Something isn't working and removed needs triage New issue, needs triage labels Dec 14, 2022
@davidselassie davidselassie self-assigned this Jan 3, 2023
davidselassie added a commit that referenced this issue Jan 25, 2023
The progress recovery data model used to have an issue: we never knew
how many workers were part of a cluster. This caused us to "lose"
workers if we crashed before any progress data was written by a
worker; during resume, it just looked like the cluster was smaller and
nothing was missing. Adds a new kind of progress recovery message that
is written as each cluster starts up with the total worker count so we
can know if we're missing a later progress message.

Fixes #181

Related to this, we also need to disambiguate between workers across
clusters. During recovery we need to know which "worker 3" a progress
message "worker 3 moved to epoch 30" applies to. To do this we add an
`Execution` ID which is sort of like "epochs but for the whole
cluster". This is an `u64` that increments on each resume and all
progress recovery data is labeled with it so we ignore data from older
clusters that might come out of order. Adds tables to SQLite recovery
DB to do that.

This gets us one more step closer to rescaling.

Rewrites the `Progress` operator to work on frontiers again... Turns
out if a worker never generates input, it'll never generate state
data, which never generates progress info because it was based off of
the `Tick`s flowing through that part of the dataflow graph. This
means it's unnessecary gymnastics to store the `BorderEpoch` so
removes that from the progress data model.
oli-kitty pushed a commit that referenced this issue Feb 6, 2023
The progress recovery data model used to have an issue: we never knew
how many workers were part of a cluster. This caused us to "lose"
workers if we crashed before any progress data was written by a
worker; during resume, it just looked like the cluster was smaller and
nothing was missing. Adds a new kind of progress recovery message that
is written as each cluster starts up with the total worker count so we
can know if we're missing a later progress message.

Fixes #181

Related to this, we also need to disambiguate between workers across
clusters. During recovery we need to know which "worker 3" a progress
message "worker 3 moved to epoch 30" applies to. To do this we add an
`Execution` ID which is sort of like "epochs but for the whole
cluster". This is an `u64` that increments on each resume and all
progress recovery data is labeled with it so we ignore data from older
clusters that might come out of order. Adds tables to SQLite recovery
DB to do that.

This gets us one more step closer to rescaling.

Rewrites the `Progress` operator to work on frontiers again... Turns
out if a worker never generates input, it'll never generate state
data, which never generates progress info because it was based off of
the `Tick`s flowing through that part of the dataflow graph. This
means it's unnessecary gymnastics to store the `BorderEpoch` so
removes that from the progress data model.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type:bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants