Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce fate #137

Merged
merged 1 commit into from Sep 27, 2022
Merged

Introduce fate #137

merged 1 commit into from Sep 27, 2022

Conversation

davidselassie
Copy link
Contributor

Previously I made the questionable API design of having
StatefulLogic::snapshot return a StateUpdate. This meant that the
snapshotting process affected the behavior of the stateful operator,
since it could return Reset to signal up to StatefulUnary to
discard the logic.

This unwinds that tangle by introducing a new method with a perhaps
overly poetic name StatefulLogic::fate which should return what
StatefulUnary should do with the logic when it's done processing via
a LogicFate enum. There are three options:

  1. Retain it until a new item for this key comes in.
  2. Retain it and awaken it after a timeout.
  3. Discard it.

fate is attempting to encapsulate the problem of StatefulUnary is
the owner of the logics, so they can't drop themselves. And since
awakening timeouts are part of that process, they're in there too.

This is nice because it simplifies the return value of exec to just
output. The awaken delay is handled in fate.

It also uses this pattern within Windower::fate with WindowerFate
for the same kind of problem: the WindowStatefulLogic is the owner,
and we need to communicate back when it's safe to discard a
Windower. This fixes the bug of never discarding window state if a
key is never seen again: we now discard that state whenever all
windows for a key are closed.

A few other small changes:

  • Standardises on the language of "awaken" a logic. Still uses
    Timely's "activate" for a Timely operator, though. Renames
    StatefulLogic::exec to StatefulLogic::awake_with to make more
    explicit when it is called. Renames WindowLogic::exec to
    WindowLogic::with_next to make explicit when it's called.

  • All stateful operator tests should test recovery as part of testing
    the logic. We should do this to excercise the serde round-trip. I
    found three? bugs via this where recovery would panic because the
    deserialization wasn't to the correct type. As part of this, I added
    explicit type annotations to all StateBytes::ser and
    StateBytes::de calls so you can compare them.

  • Clarifies more comments in the giant chunk of code for
    StatefulUnary::stateful_unary. I was also able to optimize it a
    little and get rid of two of our temporary buffers. I think it makes
    the process slightly clearer.

@davidselassie
Copy link
Contributor Author

davidselassie commented Sep 20, 2022

This is going to definitely clash with #115 and #127 so I'm happy to wait until those are merged to figure this out.

Although I believe this fixes a bug where any dataflow would panic on recovery, so perhaps worth doing now?

@davidselassie
Copy link
Contributor Author

davidselassie commented Sep 20, 2022

I'm actually going to poke at this a little further because I think persisting awake times per key is pretty closely tied to this. You can review if you want, but I'm going to add some new commits.

Actually actually, this is in the right direction, and I need to stop making +3000 line PRs so I do want this reviewed and merged and can keep iterating on it.

@davidselassie davidselassie marked this pull request as ready for review September 21, 2022 19:03
# Recover
run_main(flow, epoch_config=epoch_config, recovery_config=recovery_config)

# But it remembers the first two items in the first window.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments. So helpful

def test_fold_window(recovery_config):
flow = Dataflow()

# Remember that clocks are built per-key so the `TestingClock` in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a smell but let's table that for now 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is sort of changed in #139.

Not the per-key thing, but it'll be less confusing? Although I'm having some trouble with the TestingClockConfig in another PR, so the smell does remain...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured out what was confusing me. It's that we were recovering and using a half-consumed generator. #144 has the tests out of this working.

src/recovery/mod.rs Outdated Show resolved Hide resolved
@@ -778,7 +797,7 @@ pub(crate) fn build_state_loading_dataflow<A: Allocate>(

match update {
StateUpdate::Upsert(state) => resume_state.insert(key, state),
StateUpdate::Reset => resume_state.remove(&key),
StateUpdate::Discard => resume_state.remove(&key),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When i read this, I keep expecting the logic to be something like "Is there an updated state? great, add it to the stateful collection. Otherwise, discard it". Otherwise, I'm thinking there would be a 3rd possibility of no state update? I think it's the Update that's confusing me. Like it's either a State::Upsert(state) or a State::Discard but maybe the word State is too loaded to stand alone

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the writing side, we could eventually add a StateUpdate::Unchanged return type and then not do the write, but that would be an optimization. The trade-off is that now each logic doesn't just need to remember its state, it also needs to remember the last state it wrote (or some proxy for it) so it can look at itself when asked for a snapshot and actually determine there was no state change relative to the last snapshot. That really increases the bug surface area here as you're introducing the concept of epochs (in a minor way) into each stateful logic, whereas the more basic "just snapshot me!" approach lets the logic writer ignore that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW on the reading side (since this linked snippet is the loading during recovery), that situation is already handled: if we didn't write the update, then there wasn't an update, so don't do anything.

.map(|t| t - watermark)
.min()
fn fate(&self) -> WindowerFate {
if let Some(next_close) = self.close_times.values().cloned().min() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did i think these were sorted already? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you're remembering that we use BTreeMap elsewhere? We can't here because we need to order by value, not key. We could cache the min close time, but I don't know that it would help that much.

Copy link
Contributor

@blakestier blakestier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments around names to highlight places that felt a little sticky for me, but I think this is a great change and I appreciate the tests, the untangling, and the dramatic introduction of fate

davidselassie added a commit that referenced this pull request Sep 26, 2022
Makes all types for recovery serialization and deserialization
explicit and fixes them to match each other for each recoverable
operator.

This will fix panics during recovery loading.

This was also fixed in #137
but I'm breaking it out here separately because I keep discoverying
smaller recovery bugs and am fixing them separately.
This was referenced Sep 26, 2022
Previously I made the questionable API design of having
`StatefulLogic::snapshot` return a `StateUpdate`. This meant that the
snapshotting process affected the behavior of the stateful operator,
since it could return `Reset` to signal up to `StatefulUnary` to
discard the logic.

This unwinds that tangle by introducing a new method with a perhaps
overly poetic name `StatefulLogic::fate` which should return what
`StatefulUnary` should do with the logic when it's done processing via
a `LogicFate` enum: either `Retain` it or `Discard` it.

`fate` is attempting to encapsulate the problem of `StatefulUnary` is
the owner of the logics, so they can't drop themselves.

This is nice because it simplifies the return value of `exec` to just
output. The awaken delay is handled in `fate`.

The other part of the logic return value was "time to next
awake". Which is now encapsulated in `StatefulLogic::next_awake`.

It also breaks apart the results of `Windower` due to the same kind of
problem: the `WindowStatefulLogic` is the owner, and we need to
communicate back when it's safe to discard a `Windower`. Adds
`Windower::is_empty` and `Windower::next_close` to handle this. This
fixes the bug of never discarding window state if a key is never seen
again: we now discard that state whenever all windows for a key are
closed.

A few other small changes:

- Standardises on the language of "awaken" a logic. Still uses
  Timely's "activate" for a Timely operator, though. Renames
  `StatefulLogic::exec` to `StatefulLogic::on_awake` to make more
  explicit when it is called. Renames `WindowLogic::exec` to
  `WindowLogic::with_next` to make explicit when it's called.

- Clarifies more comments in the giant chunk of code for
  `StatefulUnary::stateful_unary`. I was also able to optimize it a
  little and get rid of two of our temporary buffers. I think it makes
  the process slightly clearer.
@davidselassie
Copy link
Contributor Author

Rebased this.

The changes from the #143 and #144 are thus no longer in this PR.

Broke out LogicFate::AwakeAfter into its own function StatefulLogic::next_awake so that the "do I keep this around" question is totally separate from the "when do I wake up next" question.

In debugging that I think we're going to have to take an overhaul of how system time works: currently because there are interactions between the times returned by the clock in the window operators and the system time used by the Timely scheduler, we can't really deterministically unit test system time still. This is tough because the behavior is determined by when some code runs / windows close. I think this looks like using the Clock in StatefulUnary for run time and assuming the Timely scheduler activation delays will match up, but I'm not sure.

@davidselassie davidselassie merged commit 8da9636 into main Sep 27, 2022
@davidselassie davidselassie deleted the serde-separate branch September 27, 2022 23:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants