Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Renaming of stateful operators #5

Merged
merged 4 commits into from Feb 10, 2022
Merged

Renaming of stateful operators #5

merged 4 commits into from Feb 10, 2022

Conversation

davidselassie
Copy link
Contributor

Here's a prototype of a set of slightly different, and hopefully more
uniform, stateful operators. I took a long look at how we used
exchange, accumulate, state_machine, and aggregate in our
existing examples and also what kind of interfaces the build in Python
types give us.

I'm excited that things fell into a shape that has some nice parallel
construction and I think that means these will hopefully be easier to
understand.

exchange is banished. My current thinking is that it's a "scheduling
implementation detail" that shouldn't be user visible. I don't like
that it was possible to write a dataflow that worked with a single
worker, then broke with multiple workers. Every time we previously
used exchange, we really wanted a "group by" operation, so these
changes make this first class. (I understand the concept of exchange
needs to be there, but I think it's something that should only be
exposed when you're writing Timely operators and actively coming up
with a way to orchestrate state.)

If we want to support both mutable and immutable types as state
aggregators, we have to have our function interface return updated
state and not modify in place. I don't think we should force users to
do things like

class Counter:
    def __init__(self):
        self.c = 0

just to count things, so keep immutables working. Thankfully, Python
has the operator package which lets you have access to + as a
function and all the operators return updated values. This makes it
possible to write in the functional style necessary without much
boilerplate. But you can still write out lambdas or helper functions
too.

Many of our stateful examples are of the form "collect some items and
stop at some point". Since this is a dataflow, the stream could be
endless, so we need a way to signal that the aggregator is complete.
Noting that the epoch is complete is super common "is complete" case
and we can include that for convenience, but the general case exists
too. Since they're similarly shaped problems, make sure the interface
looks similar.

So what happened?

I want to put forth key_fold, key_fold_epoch,
key_fold_epoch_local, and key_transition as the state operators. I
wrote out usage docstrings and updated examples, but tl;dr:

  • They all start with key because they require (key, value) as
    input and group by key and exchange automatically. They output
    (key, aggregator) too.

  • The ones with fold do a "fold": build an initial aggregator, then
    incrementaly add new values as you see them, then return the entire
    aggregator.

  • The ones with epoch automatically stop folding at the end of each
    epoch since that's such a common use case. Otherwise you provide a
    function to designate "complete".

  • local means only aggregate within a worker. We haven't found a use
    case for this yet that isn't a performance optimisation (and one
    that maybe could be done automatically?).

  • transition isn't a "fold" in that it's always emitting events (not
    just when the aggregation is complete).

  • Input comes in undefined order within an epoch, but in epoch order
    if multiple epochs.

If you feel like you know the shape of the old versions, then:
key_fold_epoch ~= aggregate, key_transition ~= state_machine,
key_fold is state_machine but shaped like aggregate, and
key_fold_epoch_local is accumulate but shaped like aggregate.

I've updated the examples too. I wanted to try out seeing how concise
you could get using the built in types to see how the library might
feel in that dimension. But I understand that some of these examples
might be a bit too terse. We don't have to necessarily go that way in
the docs, but it's cool to see that it can work and you can count like:

flow.key_fold_epoch(lambda: 0, operator.add)

Let me know what you think!

Here's a prototype of a set of slightly different, and hopefully more
uniform, stateful operators. I took a long look at how we used
`exchange`, `accumulate`, `state_machine`, and `aggregate` in our
existing examples and also what kind of interfaces the build in Python
types give us.

I'm excited that things fell into a shape that has some nice parallel
construction and I think that means these will hopefully be easier to
understand.

`exchange` is banished. My current thinking is that it's a "scheduling
implementation detail" that shouldn't be user visible. I don't like
that it was possible to write a dataflow that worked with a single
worker, then broke with multiple workers. Every time we previously
used `exchange`, we really wanted a "group by" operation, so these
changes make this first class. (I understand the concept of exchange
needs to be there, but I think it's something that should only be
exposed when you're writing Timely operators and actively coming up
with a way to orchestrate state.)

If we want to support both mutable and immutable types as state
aggregators, we have to have our function interface return updated
state and not modify in place. I don't think we should force users to
do things like

```python
class Counter:
    def __init__(self):
        self.c = 0
```

just to count things, so keep immutables working. Thankfully, Python
has the `operator` package which lets you have access to `+` as a
function and all the operators return updated values. This makes it
possible to write in the functional style necessary without much
boilerplate. But you can still write out lambdas or helper functions
too.

Many of our stateful examples are of the form "collect some items and
stop at some point". Since this is a dataflow, the stream could be
endless, so we need a way to signal that the aggregator is complete.
Noting that the epoch is complete is super common "is complete" case
and we can include that for convenience, but the general case exists
too. Since they're similarly shaped problems, make sure the interface
looks similar.

So what happened?

I want to put forth `key_fold`, `key_fold_epoch`,
`key_fold_epoch_local`, and `key_transition` as the state operators. I
wrote out usage docstrings and updated examples, but tl;dr:

- They all start with `key` because they require `(key, value)` as
  input and group by key and exchange automatically. They output
  `(key, aggregator)` too.

- The ones with `fold` do a "fold": build an initial aggregator, then
  incrementaly add new values as you see them, then return the entire
  aggregator.

- The ones with `epoch` automatically stop folding at the end of each
  epoch since that's such a common use case. Otherwise you provide a
  function to designate "complete".

- `local` means only aggregate within a worker. We haven't found a use
  case for this yet that isn't a performance optimisation (and one
  that maybe could be done automatically?).

- `transition` isn't a "fold" in that it's always emitting events (not
  just when the aggregation is complete).

- Input comes in undefined order within an epoch, but in epoch order
  if multiple epochs.

If you feel like you know the shape of the old versions, then:
`key_fold_epoch ~= aggregate`, `key_transition ~= state_machine`,
`key_fold` is `state_machine` but shaped like `aggregate`, and
`key_fold_epoch_local` is `accumulate` but shaped like `aggregate`.

I've updated the examples too. I wanted to try out seeing how concise
you could get using the built in types to see how the library might
feel in that dimension. But I understand that some of these examples
might be a bit too terse. We don't have to necessarily go that way in
the docs, but it's cool to see that it can work and you can count like:

```python
flow.key_fold_epoch(lambda: 0, operator.add)
```

Let me know what you think!
@whoahbot
Copy link
Contributor

whoahbot commented Feb 8, 2022

From our conversation, I think we were pondering a few different names:

key_fold => fold
key_fold_epoch => fold_with_epoch
transition => stateful_map.

@@ -81,6 +89,18 @@ impl From<Py<PyAny>> for TdPyAny {
}
}

/// Allows you to debug print Python objects using their repr.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

src/lib.rs Outdated
aggregator: &mut TdPyAny,
key: &TdPyAny,
value: TdPyAny,
) -> (bool, TdPyIterator) {
) -> (bool, Vec<TdPyAny>) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We chatted about this, but noting here that we may want to consider matching the signature of key_transition.

-> IntoIterator<Item = TdPyAny>

src/lib.rs Outdated
build_new_aggregator,
reducer,
});
/// **Key Fold Epoch** lets you combine all items for a key within
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding all of these docstrings!

src/lib.rs Outdated
self.steps.push(Step::Aggregate {
build_new_aggregator,
reducer,
/// **Key Transition** lets you modify some persistent state for a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on this description i will indeed salute stateful_map or map_state

@davidselassie
Copy link
Contributor Author

Some updates: stateful_map. Great name.

But once I changed that, then it felt less weird to have the other stateful operators be "reduce" rather than "fold" and not need to take a builder. None of our other examples actually used the builder in a meaningful way, and I think it helps motivate the whole "why do I need a 1 in (word, 1)" thing. So I changed my mind.

If you don't like it or still want to call it "fold" that can happen too.

Comment on lines 34 to 36
flow.map(json.loads)
flow.map(server_name)
flow.exchange(hash)
flow.accumulate(lambda: collections.defaultdict(int), count_edits)
# {"server_name": "server.name", ...}
flow.map(group_by_server)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this group_by_server missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. That's supposed to be the initial_count from above. Fixed.

@awmatheson
Copy link
Contributor

awmatheson commented Feb 9, 2022

I really like the new names, thanks for putting this together!

Since the exchange is not implicit anymore, does that data will exchange only if it is hashable? Or if it is of the format (key, value)?

@davidselassie
Copy link
Contributor Author

In order to use any of these operators the input stream has to be of the form (key, value). Every input pair will be exchanged automatically and thus key needs to be hashable. If it's not, you'll get an exception about it being unhashable; value can be anything, though. If you forget to form the input as (key, value) you'll get an error about tuple unpacking. There's no way to "not exchange". We can work on making the errors a little more explanatory eventually. Does that answer your question?

@davidselassie davidselassie merged commit a5b376f into main Feb 10, 2022
@davidselassie davidselassie deleted the state3 branch February 10, 2022 00:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants