Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't take the GIL when iterating over items #418

Merged
merged 3 commits into from Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 33 additions & 0 deletions .github/workflows/benches.yml
@@ -0,0 +1,33 @@
name: benchmarks

on:
push:
branches:
- "main"
pull_request:
# `workflow_dispatch` allows CodSpeed to trigger backtest
# performance analysis in order to generate initial data.
workflow_dispatch:

jobs:
benchmarks:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- uses: PyO3/maturin-action@v1
with:
command: build
args: --release -o dist -i python-3.11
sccache: true
- name: Install dependencies
run: |
WHEEL_FILE=$(ls ./dist/*.whl)
pip install $WHEEL_FILE'[dev]' -v --force-reinstall
- name: Run benchmarks
uses: CodSpeedHQ/action@v2
with:
token: ${{ secrets.CODSPEED_TOKEN }}
run: pytest --benchmark-enable --benchmark-only
3 changes: 2 additions & 1 deletion pyproject.toml
Expand Up @@ -52,6 +52,7 @@ kafka = [
test = [
"myst-docutils==0.17.0",
"pytest==7.1.0",
"pytest-benchmark>=3.4",
"sybil==6.0.3",
]

Expand All @@ -66,7 +67,7 @@ long_description = "file: README.md"
long_description_content_type = "text/markdown"

[tool.pytest.ini_options]
addopts = "-v -p no:doctest"
addopts = "-v -p no:doctest --benchmark-skip"
testpaths = [
"docs",
"pytests",
Expand Down
84 changes: 84 additions & 0 deletions pytests/benchmarks/test_windowing.py
@@ -0,0 +1,84 @@
from datetime import datetime, timedelta, timezone
from typing import Generator, List, Optional

import bytewax.operators as op
import bytewax.operators.window as w
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.inputs import FixedPartitionedSource, StatefulSourcePartition
from bytewax.testing import cluster_main, run_main

BATCH_SIZE = 100_000
BATCH_COUNT = 10


class _NumberSource(StatefulSourcePartition):
def __init__(self, i: int) -> None:
self.i = i
self.records = self._record_gen()

def close(self) -> None:
pass

def next_awake(self) -> Optional[datetime]:
return None

def _record_gen(self) -> Generator[List[datetime], None, None]:
yield [
datetime.now(tz=timezone.utc) + timedelta(seconds=i) for i in range(self.i)
]

def next_batch(self, *args, **kwargs) -> List[datetime]:
return next(self.records)

def snapshot(self) -> None:
return None


class NumbersInput(FixedPartitionedSource):
def __init__(self, i: int) -> None:
self.i = i

def build_part(self, *args, **kwargs) -> _NumberSource:
return _NumberSource(self.i)

def list_parts(self) -> List[str]:
return ["single"]


clock_config = w.EventClockConfig(
dt_getter=lambda x: x,
wait_for_system_duration=timedelta(seconds=0),
)
window = w.TumblingWindow(
align_to=datetime(2022, 1, 1, tzinfo=timezone.utc), length=timedelta(minutes=1)
)

flow = Dataflow("bench")
(
op.input("in", flow, NumbersInput(BATCH_SIZE))
.then(op.flat_map, "flat-map", lambda x: (x for _ in range(BATCH_COUNT)))
.then(op.key_on, "key-on", lambda _: "x")
.then(
w.fold_window,
"fold-window",
clock_config,
window,
lambda: None,
lambda s, _: s,
)
.then(op.filter, "filter_all", lambda _: False)
.then(op.output, "stdout", StdOutSink())
)


def test_run_main(benchmark):
benchmark.pedantic(run_main, args=(flow,))


def test_cluster_main(benchmark):
benchmark.pedantic(
cluster_main,
args=(flow,),
kwargs={"addresses": ["localhost:9999"], "proc_id": 0},
)
133 changes: 68 additions & 65 deletions src/operators/stateful_unary.rs
Expand Up @@ -81,7 +81,7 @@ where
/// logic will not be called again.
///
/// This must return values to be emitted downstream.
fn on_awake(&mut self, next_value: Poll<Option<V>>) -> I;
fn on_awake(&mut self, py: Python, next_value: Poll<Option<V>>) -> I;

/// Called when [`StatefulUnary::stateful_unary`] is deciding if
/// the logic for this key is still relevant.
Expand Down Expand Up @@ -426,46 +426,48 @@ where
let mut output_handle = output_wrapper.activate();
let mut output_session = output_handle.session(&output_cap);

// Drain to re-use allocation.
for (key, next_value) in tmp_awake_logic_with.drain(..) {
// Ok, let's actually run the logic code!
// Pull out or build the logic for the
// current key.
let mut logic = current_logic
.remove(&key)
.unwrap_or_else(|| logic_builder(None));
let output = with_timer!(logic_histogram, labels, logic.on_awake(next_value));
output_session.give_iterator(
output.into_iter().map(|item| (key.clone(), item)),
);

// Figure out if we should discard it.
let fate = logic.fate();
match fate {
LogicFate::Discard => {
// Remove any pending awake times,
// since that's part of the state.
current_next_awake.remove(&key);

// Do not re-insert the
// logic. It'll be dropped.
}
LogicFate::Retain => {
// If we don't discard it, ask
// when to wake up next and
// overwrite that.
if let Some(next_awake) = logic.next_awake() {
current_next_awake.insert(key.clone(), next_awake);
} else {
Python::with_gil(|py| {
// Drain to re-use allocation.
for (key, next_value) in tmp_awake_logic_with.drain(..) {
// Ok, let's actually run the logic code!
// Pull out or build the logic for the
// current key.
let mut logic = current_logic
.remove(&key)
.unwrap_or_else(|| logic_builder(None));
let output = with_timer!(logic_histogram, labels, logic.on_awake(py, next_value));
output_session.give_iterator(
output.into_iter().map(|item| (key.clone(), item)),
);

// Figure out if we should discard it.
let fate = logic.fate();
match fate {
LogicFate::Discard => {
// Remove any pending awake times,
// since that's part of the state.
current_next_awake.remove(&key);

// Do not re-insert the
// logic. It'll be dropped.
}
LogicFate::Retain => {
// If we don't discard it, ask
// when to wake up next and
// overwrite that.
if let Some(next_awake) = logic.next_awake() {
current_next_awake.insert(key.clone(), next_awake);
} else {
current_next_awake.remove(&key);
}

current_logic.insert(key.clone(), logic);
}
};
current_logic.insert(key.clone(), logic);
}
};

awoken_keys_buffer.insert(key);
}
awoken_keys_buffer.insert(key);
}
});

// Determine the fate of each key's logic at
// the end of each epoch. If a key wasn't
Expand All @@ -485,34 +487,35 @@ where
// Go through all keys awoken in this
// epoch. This might involve keys from the
// previous activation.
for state_key in std::mem::take(&mut awoken_keys_buffer) {
// Now snapshot the logic and next
// awake at value, if any.
let change = if let Some(logic) = current_logic.get(&state_key)
{
let logic_state = with_timer!(snapshot_histogram, labels, logic.snapshot());
let next_awake =
current_next_awake.get(&state_key).cloned();
let state = unwrap_any!(Python::with_gil(
|py| -> PyResult<PyObject> {
let state = PyDict::new(py);
state.set_item("logic", logic_state)?;
state.set_item("next_awake", next_awake)?;
Ok(state.into())
}
));
StateChange::Upsert(state.into())
} else {
// It's ok if there's no logic,
// because on that logic's last
// awake it might have had a
// LogicFate::Discard and been
// dropped.
StateChange::Discard
};
let snap = Snapshot(step_id.clone(), state_key, change);
change_session.give(snap);
}
Python::with_gil(|py| {
for state_key in std::mem::take(&mut awoken_keys_buffer) {
// Now snapshot the logic and next
// awake at value, if any.
let change = if let Some(logic) = current_logic.get(&state_key)
{
let logic_state = with_timer!(snapshot_histogram, labels, logic.snapshot());
let next_awake =
current_next_awake.get(&state_key).cloned();
let state = unwrap_any!(|| -> PyResult<PyObject> {
let state = PyDict::new(py);
state.set_item("logic", logic_state)?;
state.set_item("next_awake", next_awake)?;
Ok(state.into())
}()
);
StateChange::Upsert(state.into())
} else {
// It's ok if there's no logic,
// because on that logic's last
// awake it might have had a
// LogicFate::Discard and been
// dropped.
StateChange::Discard
};
let snap = Snapshot(step_id.clone(), state_key, change);
change_session.give(snap);
}
});

if let Some(loads) = loads_inbuf.remove(&epoch) {
for (worker, (key, change)) in loads {
Expand Down
52 changes: 26 additions & 26 deletions src/outputs.rs
Expand Up @@ -304,32 +304,32 @@ where
// need to ensure that writes happen in epoch
// order.
if let Some(part_to_items) = items_inbuf.remove(epoch) {
for (part_key, items) in part_to_items {
let part = parts
.entry(part_key.clone())
// If there's no resume data for
// this partition, lazily create
// it.
.or_insert_with_key(|part_key| {
unwrap_any!(Python::with_gil(|py| sink
.build_part(py, part_key, None)
.reraise("error init StatefulSink")))
});

let batch: Vec<_> =
items.into_iter().map(|(_k, v)| v).collect();
item_inp_count.add(batch.len() as u64, &labels);
with_timer!(
write_batch_histogram,
labels,
unwrap_any!(Python::with_gil(
|py| part.write_batch(py, batch)
))
);

awoken.insert(part_key);
}
}
Python::with_gil(|py| {
for (part_key, items) in part_to_items {
let part = parts
.entry(part_key.clone())
// If there's no resume data for
// this partition, lazily create
// it.
.or_insert_with_key(|part_key| {
unwrap_any!(sink
.build_part(py, part_key, None)
.reraise("error init StatefulSink"))
});

let batch: Vec<_> =
items.into_iter().map(|(_k, v)| v).collect();
item_inp_count.add(batch.len() as u64, &labels);
with_timer!(
write_batch_histogram,
labels,
unwrap_any!(part.write_batch(py, batch))
);

awoken.insert(part_key);
}
});
};
},
|caps, (parts, awoken)| {
let clock_cap = &caps[0];
Expand Down