Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update PyO3 #244

Merged
merged 6 commits into from May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 3 additions & 9 deletions .github/workflows/CI.yml
Expand Up @@ -25,10 +25,6 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
toolchain: 1.68.2
- name: Install dill
shell: bash
run: |
pip install dill
- name: Rust tests
uses: actions-rs/cargo@v1
with:
Expand Down Expand Up @@ -88,6 +84,8 @@ jobs:

macos_x86:
runs-on: macos-latest
env:
MACOSX_DEPLOYMENT_TARGET: 10.9
strategy:
matrix:
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11']
Expand All @@ -103,10 +101,6 @@ jobs:
uses: arduino/setup-protoc@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Install system dependency
shell: bash
run: |
pip install dill
- name: Rust tests
uses: actions-rs/cargo@v1
with:
Expand All @@ -116,7 +110,7 @@ jobs:
with:
rust-toolchain: 1.68.2
command: build
args: --release --no-sdist -o dist --interpreter python${{ matrix.python-version }}
args: --release -o dist --interpreter python${{ matrix.python-version }}
- name: Run tests
shell: bash
run: |
Expand Down
48 changes: 14 additions & 34 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 3 additions & 6 deletions Cargo.toml
Expand Up @@ -7,16 +7,13 @@ edition = "2021"
name = "bytewax"
crate-type = ["cdylib", "rlib"]

[package.metadata.maturin]
python-source = "pysrc"

[dependencies]
axum = { version = "0.5.17" }
bincode = { version = "1.3.3" }
chrono = { version = "0.4", features = [ "serde" ] }
chrono = { version = "0.4", default_features = false, features = [ "serde" ] }
futures = { version = "0.3.21" }
num = { version = "0.4.0" }
pyo3 = { version = "0.17.2", features = ["macros", "chrono"] }
pyo3 = { version = "0.18.3", features = ["macros", "chrono"] }
serde = { version = "1.0.134" }
serde_test = { version = "1.0.134" }
sqlx = { version = "0.6.1", features = [ "runtime-tokio-rustls", "postgres", "sqlite", "chrono" ] }
Expand All @@ -38,7 +35,7 @@ rdkafka = { version = "0.28.0", features = [ "cmake-build", "gssapi-vendored", "
rdkafka = { version = "0.28.0", features = [ "cmake-build", "gssapi", "ssl" ] }

[dev-dependencies]
pyo3 = { version = "0.17.2", default-features = false, features = ["macros", "chrono"] }
pyo3 = { version = "0.18.3", default-features = false, features = ["macros", "chrono"] }

[features]
extension-module = ["pyo3/extension-module"]
Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
@@ -1,5 +1,5 @@
[build-system]
requires = ["maturin>=0.12,<0.13"]
requires = ["maturin>=0.13,<=0.15.1"]
build-backend = "maturin"

[project]
Expand Down Expand Up @@ -53,3 +53,5 @@ testpaths = [
"docs",
]

[tool.maturin]
python-source = "pysrc"
26 changes: 13 additions & 13 deletions src/dataflow/mod.rs
Expand Up @@ -73,7 +73,7 @@ impl Dataflow {
/// step_id (str): Uniquely identifies this step for recovery.
///
/// input (bytewax.inputs.Input): Input definition.
#[pyo3(text_signature = "(self, step_id, input)")]
#[pyo3(signature = (step_id, input))]
fn input(&mut self, step_id: StepId, input: Input) {
self.steps.push(Step::Input { step_id, input });
}
Expand All @@ -93,7 +93,7 @@ impl Dataflow {
/// step_id (str): Uniquely identifies this step for recovery.
///
/// output (bytewax.outputs.Output): Output definition.
#[pyo3(text_signature = "(self, step_id, output)")]
#[pyo3(signature = (step_id, output))]
fn output(&mut self, step_id: StepId, output: Output) {
self.steps.push(Step::Output { step_id, output });
}
Expand Down Expand Up @@ -130,7 +130,7 @@ impl Dataflow {
/// Args:
///
/// predicate: `predicate(item: Any) => should_emit: bool`
#[pyo3(text_signature = "(self, predicate)")]
#[pyo3(signature = (predicate))]
fn filter(&mut self, predicate: TdPyCallable) {
self.steps.push(Step::Filter { predicate });
}
Expand All @@ -148,7 +148,7 @@ impl Dataflow {
/// ...
/// >>> flow.filter_map(validate)
///
#[pyo3(text_signature = "(self, mapper)")]
#[pyo3(signature = (mapper))]
fn filter_map(&mut self, mapper: TdPyCallable) {
self.steps.push(Step::FilterMap { mapper });
}
Expand Down Expand Up @@ -184,7 +184,7 @@ impl Dataflow {
/// Args:
///
/// mapper: `mapper(item: Any) => emit: Iterable[Any]`
#[pyo3(text_signature = "(self, mapper)")]
#[pyo3(signature = (mapper))]
fn flat_map(&mut self, mapper: TdPyCallable) {
self.steps.push(Step::FlatMap { mapper });
}
Expand Down Expand Up @@ -215,7 +215,7 @@ impl Dataflow {
/// Args:
///
/// inspector: `inspector(item: Any) => None`
#[pyo3(text_signature = "(self, inspector)")]
#[pyo3(signature = (inspector))]
fn inspect(&mut self, inspector: TdPyCallable) {
self.steps.push(Step::Inspect { inspector });
}
Expand Down Expand Up @@ -248,7 +248,7 @@ impl Dataflow {
/// Args:
///
/// inspector: `inspector(epoch: int, item: Any) => None`
#[pyo3(text_signature = "(self, inspector)")]
#[pyo3(signature = (inspector))]
fn inspect_epoch(&mut self, inspector: TdPyCallable) {
self.steps.push(Step::InspectEpoch { inspector });
}
Expand Down Expand Up @@ -282,7 +282,7 @@ impl Dataflow {
/// Args:
///
/// mapper: `mapper(item: Any) => updated_item: Any`
#[pyo3(text_signature = "(self, mapper)")]
#[pyo3(signature = (mapper))]
fn map(&mut self, mapper: TdPyCallable) {
self.steps.push(Step::Map { mapper });
}
Expand Down Expand Up @@ -357,7 +357,7 @@ impl Dataflow {
///
/// is_complete: `is_complete(updated_accumulator: Any) =>
/// should_emit: bool`
#[pyo3(text_signature = "(self, step_id, reducer, is_complete)")]
#[pyo3(signature = (step_id, reducer, is_complete))]
fn reduce(&mut self, step_id: StepId, reducer: TdPyCallable, is_complete: TdPyCallable) {
self.steps.push(Step::Reduce {
step_id,
Expand Down Expand Up @@ -438,7 +438,7 @@ impl Dataflow {
/// builder: `builder(key: Any) => initial_accumulator: Any`
///
/// folder: `folder(accumulator: Any, value: Any) => updated_accumulator: Any`
#[pyo3(text_signature = "(self, step_id, clock_config, window_config, builder, folder)")]
#[pyo3(signature = (step_id, clock_config, window_config, builder, folder))]
fn fold_window(
&mut self,
step_id: StepId,
Expand Down Expand Up @@ -530,7 +530,7 @@ impl Dataflow {
///
/// reducer: `reducer(accumulator: Any, value: Any) =>
/// updated_accumulator: Any`
#[pyo3(text_signature = "(self, step_id, clock_config, window_config, reducer)")]
#[pyo3(signature = (step_id, clock_config, window_config, reducer))]
fn reduce_window(
&mut self,
step_id: StepId,
Expand Down Expand Up @@ -570,7 +570,7 @@ impl Dataflow {
///
/// window_config (bytewax.window.WindowConfig): Windower
/// config to use. See `bytewax.window`.
#[pyo3(text_signature = "(self, step_id, clock_config, window_config)")]
#[pyo3(signature = (step_id, clock_config, window_config))]
fn collect_window(
&mut self,
step_id: StepId,
Expand Down Expand Up @@ -657,7 +657,7 @@ impl Dataflow {
///
/// mapper: `mapper(state: Any, value: Any) => (updated_state:
/// Any, updated_value: Any)`
#[pyo3(text_signature = "(self, step_id, builder, mapper)")]
#[pyo3(signature = (step_id, builder, mapper))]
fn stateful_map(&mut self, step_id: StepId, builder: TdPyCallable, mapper: TdPyCallable) {
self.steps.push(Step::StatefulMap {
step_id,
Expand Down
6 changes: 3 additions & 3 deletions src/macros.rs
Expand Up @@ -83,7 +83,7 @@ macro_rules! log_func {
/// add_pymethods!(
/// SlidingWindow,
/// parent: WindowConfig,
/// py_args: (length,),
/// signature: (length),
/// args {
/// length: Duration => Duration::zero()
/// }
Expand All @@ -92,13 +92,13 @@ macro_rules! log_func {
macro_rules! add_pymethods {(
$struct:ident,
parent: $parent:ident,
py_args: $py_args:tt,
signature: $signature:tt,
args { $($arg:ident: $arg_type:ty => $default:expr),* }
) => {
#[pyo3::pymethods]
impl $struct {
#[new]
#[args $py_args ]
#[pyo3(signature=$signature)]
pub(crate) fn py_new($($arg: $arg_type),*) -> (Self, $parent) {
(Self { $($arg),* }, $parent {})
}
Expand Down
7 changes: 3 additions & 4 deletions src/recovery/python.rs
Expand Up @@ -118,7 +118,7 @@ pub(crate) struct NoopRecoveryConfig;
add_pymethods!(
NoopRecoveryConfig,
parent: RecoveryConfig,
py_args: (),
signature: (),
args {}
);

Expand Down Expand Up @@ -187,7 +187,6 @@ impl RecoveryBuilder for NoopRecoveryConfig {
/// Config object. Pass this as the `recovery_config` argument to
/// your execution entry point.
#[pyclass(module="bytewax.recovery", extends=RecoveryConfig)]
#[pyo3(text_signature = "(db_dir)")]
#[derive(Clone)]
pub(crate) struct SqliteRecoveryConfig {
#[pyo3(get)]
Expand All @@ -197,7 +196,7 @@ pub(crate) struct SqliteRecoveryConfig {
add_pymethods!(
SqliteRecoveryConfig,
parent: RecoveryConfig,
py_args: (db_dir),
signature: (db_dir),
args {
db_dir: PathBuf => String::new().into()
}
Expand Down Expand Up @@ -306,7 +305,7 @@ pub(crate) struct KafkaRecoveryConfig {
add_pymethods!(
KafkaRecoveryConfig,
parent: RecoveryConfig,
py_args: (brokers, topic_prefix),
signature: (brokers, topic_prefix),
args {
brokers: Vec<String> => Vec::new(),
topic_prefix: String => String::new()
Expand Down