Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New execution interface #19

Merged
merged 25 commits into from Feb 24, 2022
Merged

New execution interface #19

merged 25 commits into from Feb 24, 2022

Conversation

davidselassie
Copy link
Contributor

As you can tell by the branch name, this started out small and then
snowballed! Let me know how you think these examples feel and we can
discuss if this is the right course for our API. I'm open to new
ideas, but I also think this helps guide users along to better use
cases.

Backstory

Previously, on Bytewax:

Executor.build_and_run() wasn't great because a single function is
used for starting up a dataflow you want to execute locally and
manually assembling a cluster of processes.

The difference between "singleton" vs "partitioned" input iterators
and making the input part of the Dataflow definition wasn't great
because it ties the behavior of the dataflow to the behavior of the
input.

The capture operator wasn't great because it didn't give you the
control you needed to return data from multiple workers or background
processes.

I think all these things are symptoms of trying to handle all use
cases from all points in the stack, rather than building primitives
and abstraction layers.

Changes

See the docstrings and tests for examples!

Execution

Let's make explicit that there are different contexts you might run a
dataflow. Removes Executor.build_and_run and adds four different
entry points in a "stack" of complexity:

  • run_sync() which takes a dataflow and some input, runs it
    synchronously as a single worker in the existing Python thread, and
    returns the output to that thread. This is what you'd use in tests
    and simple notebook work.

  • run_cluster() which takes a dataflow and some input, starts a
    local cluster of processes, runs it, waits for the cluster to finish
    work, then collects thre results, and returns the output to that
    thread. This is what you'd use in a notebook if you need parallelism
    or higher throughput.

  • main_cluster() which starts up a cluster of local processes,
    coordinates the addresses and process IDs between them, runs a
    dataflow on it, and waits for it to finish. This has a partitioned
    "input builder" and an "output builder" (discussed below). This is
    what you'd use if you'd want to write a standalone script or example
    that does some higher throughput processing.

  • main_proc() which lets you manually craft a single process for use
    in a cluster. This is what you'd use when crafting your k8s cluster.

main_cluster() is built upon proc_main() but adds in some process
pool managment.

run_cluster() is built upon main_cluster() but adds in the IPC
necessary to get your data back to the calling process super
conveniently.

Input

Since you can express "singleton" input as a "partitioned" input, the
latter is the more fundamental concept, so that's what the lowest
level main_proc() and main_cluster() functions take: an input
builder function which will be called once on each worker that returns
the input that worker should work on.

The higher level execution contexts that you'd want to use from a
notebook or a single thread (run_sync() and run_cluster()) then
handle paritioning for you and provide a nice interface where you can
send in a whole Python list and not need to think more about it.

Output

Make yours, like mine.

Since the above scheme for input feels good, let's try copying the
approach for output: At the lowest level there's a "partitioned output
builder" which returns a callback that each worker thread can use to
write the output it sees.

The higher level execution contexts can then make an output builder
function that collects data to send back to the main process for you.

This change means that the capture operator doesn't need to take any
functions; it just marks what parts of the dataflow are output. I
think this will change slightly with the introduction of branching
dataflows (something like marking different captures with a tag
maybe?).

Arg Parsers

Since the above execution entry points all are Python functions, add
some convenience methods which parse arguments from command line or
env vars. This will make it easier to craft your "main" function in a
cluster or standalone script context.

Updates all the examples to use these parsers so we can go back to
using -w2 and -n2 on the command line. Although some of the
examples show off using main_proc() and so would need different
command line arguments.

As you can tell by the branch name, this started out small and then
snowballed! Let me know how you think these examples feel and we can
discuss if this is the right course for our API. I'm open to new
ideas, but I also think this helps guide users along to better use
cases.

Backstory
=========

_Previously, on Bytewax:_

`Executor.build_and_run()` wasn't great because a single function is
used for starting up a dataflow you want to execute locally and
manually assembling a cluster of processes.

The difference between "singleton" vs "partitioned" input iterators
and making the input part of the Dataflow definition wasn't great
because it ties the behavior of the dataflow to the behavior of the
input.

The capture operator wasn't great because it didn't give you the
control you needed to return data from multiple workers or background
processes.

I think all these things are symptoms of trying to handle all use
cases from all points in the stack, rather than building primitives
and abstraction layers.

Changes
=======

See the docstrings and tests for examples!

Execution
---------

Let's make explicit that there are different contexts you might run a
dataflow. Removes `Executor.build_and_run` and adds four different
entry points in a "stack" of complexity:

- `run_sync()` which takes a dataflow and some input, runs it
  synchronously as a single worker in the existing Python thread, and
  returns the output to that thread. This is what you'd use in tests
  and simple notebook work.

- `run_cluster()` which takes a dataflow and some input, starts a
  local cluster of processes, runs it, waits for the cluster to finish
  work, then collects thre results, and returns the output to that
  thread. This is what you'd use in a notebook if you need parallelism
  or higher throughput.

- `main_cluster()` which starts up a cluster of local processes,
  coordinates the addresses and process IDs between them, runs a
  dataflow on it, and waits for it to finish. This has a partitioned
  "input builder" and an "output builder" (discussed below). This is
  what you'd use if you'd want to write a standalone script or example
  that does some higher throughput processing.

- `main_proc()` which lets you manually craft a single process for use
  in a cluster. This is what you'd use when crafting your k8s cluster.

`main_cluster()` is built upon `proc_main()` but adds in some process
pool managment.

`run_cluster()` is built upon `main_cluster()` but adds in the IPC
necessary to get your data back to the calling process super
conveniently.

Input
-----

Since you can express "singleton" input as a "partitioned" input, the
latter is the more fundamental concept, so that's what the lowest
level `main_proc()` and `main_cluster()` functions take: an input
builder function which will be called once on each worker that returns
the input that worker should work on.

The higher level execution contexts that you'd want to use from a
notebook or a single thread (`run_sync()` and `run_cluster()`) then
handle paritioning for you and provide a nice interface where you can
send in a whole Python list and not need to think more about it.

Output
------

_Make yours, like mine._

Since the above scheme for input feels good, let's try copying the
approach for output: At the lowest level there's a "partitioned output
builder" which returns a callback that each worker thread can use to
write the output it sees.

The higher level execution contexts can then make an output builder
function that collects data to send back to the main process for you.

This change means that the capture operator doesn't need to take any
functions; it just marks what parts of the dataflow are output. I
think this will change slightly with the introduction of branching
dataflows (something like marking different captures with a tag
maybe?).

Arg Parsers
-----------

Since the above execution entry points all are Python functions, add
some convenience methods which parse arguments from command line or
env vars. This will make it easier to craft your "main" function in a
cluster or standalone script context.

Updates all the examples to use these parsers so we can go back to
using `-w2` and `-n2` on the command line. Although some of the
examples show off using `main_proc()` and so would need different
command line arguments.
@davidselassie
Copy link
Contributor Author

Hmm. I guess tests are failing. I tried to add a dep on a new library multiprocess because it supports sending lambdas between processes (which I guess vanilla Python doesn't!). I thought adding it to the pyproject.toml would be enough, but I guess not? My tests are passing locally currently.

examples/basic.py Outdated Show resolved Hide resolved
pysrc/bytewax/inp.py Show resolved Hide resolved
@davidselassie
Copy link
Contributor Author

Gosh found some more pickling dark magic: it seems like you can't use isinstance on dataclasses sent through pickling because they are dynamically generated. Something related to https://stackoverflow.com/questions/620844/why-do-i-get-unexpected-behavior-in-python-isinstance-after-pickling but I'm not exactly sure what's up.

flow.map(double)
flow.map(minus_one)
flow.map(stringy)
flow.inspect(peek)
flow.capture()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a strong dependency now between flow.capture() and the shape of dataflows like this one. I wonder if we can make this more explicit when running dataflows this way.

If you remove this line, you don't see any output from running the dataflow, which makes sense, but may be confusing at first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recap of my understanding of our discussion for posterity:

Our old patterns of "use inspect(print) as "output" are bad habits in the new era of formal output. Output needs to live in a layer above the dataflow itself so that it can adapt to the different execution contexts, otherwise it breaks our rule of "the same dataflow should have the same behavior". We should update our examples with more context to show proper, more nuanced usage.

Capture also takes on more of a role in non-linear dataflows. You can't just output "the last step" since there isn't necessarily a well defined "last step". So we won't be able to do this automatically in general.

I do think it's worthwhile to raise if a dataflow is missing any capture steps. Perhaps there are use cases where that makes sense, but my spidy sense is that it's an indication that you're running a dataflow for its side effects and are possibly playing fast and loose with execution order or worker identity. We can be conservative now and remove that exception later if we find valid use cases.

@whoahbot
Copy link
Contributor

Great work! I really like the new shape of input/output.

I was brainstorming over the weekend if there is a set of descriptive names for main_proc, run_cluster, etc. that help describe the different modes of running dataflow processes and workers, but haven't really come up with anything convincing.

@davidselassie
Copy link
Contributor Author

davidselassie commented Feb 23, 2022

Putting our decision tree here for posterity. We did some brainstorming today on what the shape of the API should look like and the kinds of decisions you need to make to decide on your execution type. This isn't any final naming or anything.

graph TD
    S1(Wake Up) -.Eat Coffee.-> S2(Start)

    S2 --> P(In Process)
    --> P2(No Process Coordination)
    --> P3(No IO Distribution)
    --> P4[run]

    S2 --> M(Multiprocess)

    M --> MA(Automatic Process Coordination)
    
    MA --> MAA1(Automatic IO Distribution)
    --> MAA2[run_cluster]

    MA --> MAM1(Manual IO Distribution)
    --> MAM2[spawn_cluster]

    M --> M1(Manual Process Coordination)
    --> M2(Manual IO Distribution)
    --> M3[cluster_main]

@davidselassie
Copy link
Contributor Author

Thanks for all the discussion today. Updates with some new names. But, yes, let's wait and see if there are any insights from building more demos and examples in the next few days.

src/lib.rs Outdated Show resolved Hide resolved
pysrc/bytewax/__init__.py Outdated Show resolved Hide resolved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants