Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New execution interface #19

Merged
merged 25 commits into from Feb 24, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f3449c6
New execution interface
davidselassie Feb 14, 2022
6b13e68
Fix dependencies for bytewax package.
whoahbot Feb 17, 2022
7949cd2
Don't forbid pypi index.
whoahbot Feb 17, 2022
67f60a8
Try installing wheels directly.
whoahbot Feb 17, 2022
0874b58
Adding installing multiprocess manually in test step
miccioest Feb 17, 2022
aea55ae
Removing force-reinstall flag
miccioest Feb 17, 2022
16b8600
Applying linux changes to macos and windows
miccioest Feb 17, 2022
838db10
Try installing local whl file without no-index
miccioest Feb 18, 2022
5fe7e9b
Preventing compiling all python versions on each job
miccioest Feb 18, 2022
425cd16
total_worker_count -> worker_count
davidselassie Feb 18, 2022
fbd7a30
Rust serde needs to use dill library to match sending data to cluster…
davidselassie Feb 18, 2022
197aecf
Reifys cluster input so generators will work; doesn't support infinit…
davidselassie Feb 18, 2022
55f80a3
Makes all the examples run again
davidselassie Feb 18, 2022
7a816ca
Renames to brainstorming names
davidselassie Feb 23, 2022
ea022fa
Merge branch 'main' into args-helper
davidselassie Feb 23, 2022
e6ce32f
Allows and tests multiple capture
davidselassie Feb 23, 2022
c4fd79c
Reformat all
davidselassie Feb 23, 2022
34c80bd
Internal run
davidselassie Feb 23, 2022
18c6d36
Error if dataflow is missing capture
davidselassie Feb 23, 2022
a717ca4
Updates readme
davidselassie Feb 23, 2022
cf2b0af
Changelog with new names
davidselassie Feb 23, 2022
130a47f
Examples all use capture now
davidselassie Feb 23, 2022
dfb0b92
run_cluster exits with capture error rather than hanging
davidselassie Feb 24, 2022
8e118e0
Fixes typos
davidselassie Feb 24, 2022
d9300be
Renames run_ to _run.
davidselassie Feb 24, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 6 additions & 3 deletions .github/workflows/CI.yml
Expand Up @@ -23,7 +23,8 @@ jobs:
args: --release -o dist
- name: Run tests
run: |
pip install bytewax --no-index --find-links dist --force-reinstall
pip install multiprocess
pip install bytewax --no-index --find-links dist
pip install --upgrade pytest==7.0.0
python -m pytest pytests/
- name: Upload wheels
Expand All @@ -49,7 +50,8 @@ jobs:
args: --release --no-sdist -o dist
- name: Run tests
run: |
pip install bytewax --no-index --find-links dist --force-reinstall
pip install multiprocess
pip install bytewax --no-index --find-links dist
pip install --upgrade pytest==7.0.0
python -m pytest pytests/
- name: Upload wheels
Expand All @@ -75,7 +77,8 @@ jobs:
args: --release --no-sdist -o dist --universal2
- name: Run tests
run: |
pip install bytewax --no-index --find-links dist --force-reinstall
pip install multiprocess
pip install bytewax --no-index --find-links dist
pip install --upgrade pytest==7.0.0
python -m pytest pytests/
- name: Upload wheels
Expand Down
32 changes: 32 additions & 0 deletions CHANGELOG.md
@@ -0,0 +1,32 @@
# Bytewax Changelog

## 0.8.0

- Capture operator no longer takes arguments. Items that flow through
those points in the dataflow graph will be processed by the output
handlers setup by each execution entry point.

- `Executor.build_and_run()` is replaced with four entry points for
specific use cases:

- `run_sync()` for synchronous exeuction in the current process. It
returns all captured items to the calling process for you. Use
this for prototyping in notebooks and basic tests.

- `run_cluster()` for synchronous execution on a temporary
machine-local cluster. It returns all captured items to the
calling process for you. Use this for notebook analysis where you
need parallelism.

- `main_cluster()` for starting a machine-local cluster with more
control over input and output. Use this for standalone scripts
where you might need partitioned input and output.

- `main_proc()` for starting a process that will participate in a
cluster you are coordinating manually. Use this when starting a
Kubernetes cluster.

- Adds `bytewax.parse` module to help with reading command line
arguments and environment variables for the above entrypoints.

- Adds `manual_cluster` example.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "bytewax"
version = "0.7.0"
version = "0.8.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
7 changes: 3 additions & 4 deletions benches/benchmarks/wordcount_bytewax.py
@@ -1,7 +1,7 @@
import re
import operator

from bytewax import Executor, inp, processes
from bytewax import inp, run_sync, Dataflow


def tokenize(x):
Expand All @@ -13,11 +13,10 @@ def initial_count(word):
return word, 1


ec = Executor()
flow = ec.Dataflow(inp.single_batch(open("benches/benchmarks/collected-works.txt")))
flow = Dataflow()
flow.flat_map(tokenize)
flow.map(initial_count)
flow.reduce_epoch(operator.add)

if __name__ == "__main__":
processes.start_local(ec, number_of_processes=6)
run_sync(flow, inp.single_batch(open("benches/benchmarks/collected-works.txt")))
8 changes: 3 additions & 5 deletions examples/anomaly_detector.py
@@ -1,7 +1,6 @@
import random

import bytewax
from bytewax import inp
from bytewax import Dataflow, inp, parse, run_cluster


def random_datapoints():
Expand Down Expand Up @@ -54,13 +53,12 @@ def inspector(metric__value_mu_sigma_anomalous):
)


ec = bytewax.Executor()
flow = ec.Dataflow(inp.fully_ordered(random_datapoints()))
flow = Dataflow()
# ("metric", value)
flow.stateful_map(lambda: ZTestDetector(2.0), ZTestDetector.push)
# ("metric", (value, mu, sigma, is_anomalous))
flow.inspect(inspector)


if __name__ == "__main__":
ec.build_and_run()
run_cluster(flow, inp.fully_ordered(random_datapoints()), **parse.cluster_args())
9 changes: 4 additions & 5 deletions examples/basic.py
@@ -1,4 +1,4 @@
import bytewax
from bytewax import Dataflow, parse, run_cluster


def inp():
Expand All @@ -22,13 +22,12 @@ def peek(x):
print(f"peekin at {x}")


ec = bytewax.Executor()
flow = ec.Dataflow(inp())
flow = Dataflow()
flow.map(double)
flow.map(minus_one)
flow.map(stringy)
flow.inspect(peek)
flow.inspect(print)


if __name__ == "__main__":
ec.build_and_run()
run_cluster(flow, inp(), **parse.cluster_args())
whoahbot marked this conversation as resolved.
Show resolved Hide resolved
65 changes: 38 additions & 27 deletions examples/events_to_parquet.py
Expand Up @@ -2,17 +2,40 @@
import time
from datetime import datetime

import bytewax

import pandas

import pyarrow.parquet as parquet
from bytewax import inp
from bytewax import Dataflow, inp, main_cluster, parse
from pandas import DataFrame
from pyarrow import Table

from utils import fake_events

# Collect 5 second tumbling windows of data and write them out as
# Parquet datasets. `fake_events` will generate events for multiple
# days around today. Each worker will generate independent fake
# events.
def input_builder(worker_index, total_worker_count):
return inp.tumbling_epoch(5.0, fake_events.generate_web_events())


# Arrow assigns a UUID to each worker / window's file so they won't
# clobber each other. They are further automatically placed in the
# correct directory structure based on date and path.
def write_parquet(epoch__events_df):
"""Write events as partitioned Parquet in `$PWD/parquet_demo_out/`"""
epoch, events_df = epoch__events_df
table = Table.from_pandas(events_df)
parquet.write_to_dataset(
table,
root_path="parquet_demo_out",
partition_cols=["year", "month", "day", "page_url_path"],
)


# Each worker writes using the same code because we don't need to
# further partition because of the UUID described above.
def output_builder(worker_index, total_worker_count):
return write_parquet


def add_date_columns(event):
timestamp = datetime.fromisoformat(event["event_timestamp"])
Expand All @@ -30,36 +53,24 @@ def append_event(events_df, event_df):
return pandas.concat([events_df, event_df])


def write_parquet(path__events_df):
"""Write events as partitioned Parquet in `$PWD/parquet_demo_out/`"""
path, events_df = path__events_df
table = Table.from_pandas(events_df)
parquet.write_to_dataset(
table,
root_path="parquet_demo_out",
partition_cols=["year", "month", "day", "page_url_path"],
)
def drop_page(page__events_df):
page, events_df = page__events_df
return events_df


ec = bytewax.Executor()
# Collect 5 second tumbling windows of data and write them out as
# Parquet datasets. Arrow assigns a UUID to each worker / window's
# file so they won't clobber each other. They are further
# automatically placed in the correct directory structure based on
# date and path. `fake_events` will generate events for multiple days
# around today.
flow = ec.Dataflow(inp.tumbling_epoch(5.0, fake_events.generate_web_events()))
flow = Dataflow()
flow.map(json.loads)
# {"page_url_path": "/path", "event_timestamp": "2022-01-02 03:04:05", ...}
flow.map(add_date_columns)
# {"page_url_path": "/path", "year": 2022, "month": 1, "day": 5, ... }
flow.map(group_by_page)
# ("/path", DataFrame([{"page_url_path": "/path", "year": 2022, "month": 1, "day": 5, ... })])
# ("/path", DataFrame([{"page_url_path": "/path", "year": 2022, "month": 1, "day": 5, ... }]))
flow.reduce_epoch_local(append_event)
# ("/path", DataFrame([{"page_url_path": "/path", ...}, ...])
flow.map(write_parquet)
# None
# ("/path", DataFrame([{"page_url_path": "/path", ...}, ...]))
flow.map(drop_page)
# DataFrame([{"page_url_path": "/path", ...}, ...])
flow.capture()


if __name__ == "__main__":
ec.build_and_run()
main_cluster(flow, input_builder, output_builder, **parse.cluster_args())
67 changes: 67 additions & 0 deletions examples/manual_cluster.py
@@ -0,0 +1,67 @@
from itertools import chain
from pathlib import Path

from bytewax import Dataflow, main_proc, parse


read_dir = Path("./examples/sample_data/cluster/")
write_dir = Path("./cluster_out/")


def input_builder(worker_index, total_worker_count):
# List all the input partitions in the reading directory.
all_partitions = read_dir.glob("*.txt")
# Then have this worker only read every `n` files so each worker
# will read a disjoint set.
this_worker_partitions = [
path
for i, path in enumerate(all_partitions)
if i % total_worker_count == worker_index
]
# Open all the ones that this worker should read.
files = [open(path) for path in this_worker_partitions]
# Now send them into the dataflow on this worker.
for line in chain(*files):
yield 0, line.strip()


def output_builder(worker_index, total_worker_count):
write_dir.mkdir(exist_ok=True)
# Open a file that just this worker will write to.
write_to = open(write_dir / f"{worker_index}.out", "w")
# Build a function that can be called for each captured output.
def write(epoch_item):
epoch, item = epoch_item
write_to.write(f"{epoch} {item}\n")

# Return it so Bytewax will run it whenever an item is seen by a
# capture operator.
return write


flow = Dataflow()
flow.map(str.upper)
flow.capture()


if __name__ == "__main__":
# Run these two commands in separate terminals:

# $ python ./examples/manual_cluster.py -p0 -a localhost:2101 -a localhost:2102
# $ python ./examples/manual_cluster.py -p1 -a localhost:2101 -a localhost:2102

# They'll collectively read the files in
# ./examples/sample_data/cluster/*.txt which have lines like
# `one1`.

# They will then both finish and you'll see ./cluster_out/0.out
# and ./cluster_out/1.out with the data that each process in the
# cluster wrote with the lines uppercased.

# You could imagine reading from / writing to separate Kafaka
# partitions, S3 blobs, etc.

# When using `main_proc()` you have to coordinate ensuring each
# process knows the address of all other processes in the cluster
# and their unique process ID.
main_proc(flow, input_builder, output_builder, **parse.main_args())
9 changes: 5 additions & 4 deletions examples/pagerank.py
@@ -1,7 +1,7 @@
import collections
import operator

import bytewax
from bytewax import Dataflow, parse, run_cluster


FIRST_ITERATION = 0
Expand Down Expand Up @@ -37,8 +37,7 @@ def sum_to_weight(node_sum):
return node, updated_weight


ec = bytewax.Executor()
flow = ec.Dataflow(read_edges("examples/sample_data/graph.txt"))
flow = Dataflow()
# (parent, {child}) per edge
flow.reduce_epoch(operator.or_)
# (parent, children) per parent
Expand Down Expand Up @@ -72,4 +71,6 @@ def sum_to_weight(node_sum):


if __name__ == "__main__":
ec.build_and_run()
run_cluster(
flow, read_edges("examples/sample_data/graph.txt"), **parse.cluster_args()
)
6 changes: 6 additions & 0 deletions examples/sample_data/cluster/partition-1.txt
@@ -0,0 +1,6 @@
one1
one2
one3
one4
one5
one6
6 changes: 6 additions & 0 deletions examples/sample_data/cluster/partition-2.txt
@@ -0,0 +1,6 @@
two1
two2
two3
two4
two5
two6
6 changes: 6 additions & 0 deletions examples/sample_data/cluster/partition-3.txt
@@ -0,0 +1,6 @@
three1
three2
three3
three4
three5
three6
6 changes: 6 additions & 0 deletions examples/sample_data/cluster/partition-4.txt
@@ -0,0 +1,6 @@
four1
four2
four3
four4
four5
four6
6 changes: 6 additions & 0 deletions examples/sample_data/cluster/partition-5.txt
@@ -0,0 +1,6 @@
five1
five2
five3
five4
five5
five6
7 changes: 3 additions & 4 deletions examples/search_session.py
Expand Up @@ -2,7 +2,7 @@
from dataclasses import dataclass
from typing import List

import bytewax
from bytewax import Dataflow, parse, run_cluster


@dataclass
Expand Down Expand Up @@ -95,8 +95,7 @@ def calc_ctr(search_session):
return 0.0


ec = bytewax.Executor()
flow = ec.Dataflow(IMAGINE_THESE_EVENTS_STREAM_FROM_CLIENTS)
flow = Dataflow()
# event
flow.map(group_by_user)
# (user, [event])
Expand All @@ -115,4 +114,4 @@ def calc_ctr(search_session):


if __name__ == "__main__":
ec.build_and_run()
run_cluster(flow, IMAGINE_THESE_EVENTS_STREAM_FROM_CLIENTS, **parse.cluster_args())