Skip to content

Commit

Permalink
Merge pull request #29 from bytewax/log
Browse files Browse the repository at this point in the history
Debug operators
  • Loading branch information
blakestier committed Mar 3, 2022
2 parents 3aefe44 + dea1cc1 commit c2ebe99
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 3 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ python-source = "pysrc"

[dependencies]
axum = { version = "0.4.3" }
log = { version = "0.4" }
pyo3 = { version = "0.15.1" }
pyo3-log = { version = "0.5.0" }
rand = { version = "0.8.4" }
scopeguard = { version = "1.1.0" }
serde = { version = "1.0.134" }
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ fn sleep_release_gil(py: Python, secs: u64) {
#[pymodule]
#[pyo3(name = "bytewax")]
fn mod_tiny_dancer(_py: Python, m: &PyModule) -> PyResult<()> {
pyo3_log::init();

execution::register(_py, m)?;
dataflow::register(_py, m)?;

Expand Down
12 changes: 12 additions & 0 deletions src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,15 @@ macro_rules! with_traceback {
}
};
}

#[macro_export]
macro_rules! log_func {
() => {{
fn f() {}
fn type_name_of<T>(_: T) -> &'static str {
std::any::type_name::<T>()
}
let name = type_name_of(f);
&name[..name.len() - 3]
}};
}
95 changes: 92 additions & 3 deletions src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,54 @@ use std::collections::HashMap;

use pyo3::prelude::*;

use crate::log_func;
use crate::pyo3_extensions::{TdPyAny, TdPyCallable, TdPyIterator};
use crate::with_traceback;
use log::debug;
use std::thread;

// These are all shims which map the Timely Rust API into equivalent
// calls to Python functions through PyO3.

pub(crate) fn map(mapper: &TdPyCallable, item: TdPyAny) -> TdPyAny {
debug!("{}, mapper:{:?}, item:{:?}", log_func!(), mapper, item);
Python::with_gil(|py| with_traceback!(py, mapper.call1(py, (item,))).into())
}

pub(crate) fn flat_map(mapper: &TdPyCallable, item: TdPyAny) -> TdPyIterator {
debug!("{}, mapper:{:?}, item:{:?}", log_func!(), mapper, item);
Python::with_gil(|py| with_traceback!(py, mapper.call1(py, (item,))?.extract(py)))
}

pub(crate) fn filter(predicate: &TdPyCallable, item: &TdPyAny) -> bool {
debug!(
"{}, predicate:{:?}, item:{:?}",
log_func!(),
predicate,
item
);
Python::with_gil(|py| with_traceback!(py, predicate.call1(py, (item,))?.extract(py)))
}

pub(crate) fn inspect(inspector: &TdPyCallable, item: &TdPyAny) {
debug!(
"{}, inspector:{:?}, item:{:?}",
log_func!(),
inspector,
item
);
Python::with_gil(|py| {
with_traceback!(py, inspector.call1(py, (item,)));
});
}

pub(crate) fn inspect_epoch(inspector: &TdPyCallable, epoch: &u64, item: &TdPyAny) {
debug!(
"{}, inspector:{:?}, item:{:?}",
log_func!(),
inspector,
item
);
Python::with_gil(|py| {
with_traceback!(py, inspector.call1(py, (*epoch, item)));
});
Expand All @@ -40,6 +63,14 @@ pub(crate) fn reduce(
value: TdPyAny,
) -> (bool, impl IntoIterator<Item = TdPyAny>) {
Python::with_gil(|py| {
debug!(
"{}, reducer:{:?}, key:{:?}, value:{:?}, aggregator={:?})",
log_func!(),
reducer,
key,
value,
aggregator
);
let updated_aggregator = match aggregator {
Some(aggregator) => {
with_traceback!(py, reducer.call1(py, (aggregator.clone_ref(py), value))).into()
Expand All @@ -54,6 +85,14 @@ pub(crate) fn reduce(
);

*aggregator = Some(updated_aggregator.clone_ref(py));
debug!(
"{}, key:{:?}, is_complete:{:?}(updated_agg={:?} => {}",
log_func!(),
key,
is_complete,
updated_aggregator,
should_emit_and_discard_aggregator
);

if should_emit_and_discard_aggregator {
let emit = (key.clone_ref(py), updated_aggregator).to_object(py).into();
Expand All @@ -70,6 +109,14 @@ pub(crate) fn reduce_epoch(
_key: &TdPyAny,
value: TdPyAny,
) {
debug!(
"{}, reducer:{:?}, key:{:?}, value:{:?}, aggregator:{:?}",
log_func!(),
reducer,
_key,
value,
aggregator
);
Python::with_gil(|py| {
let updated_aggregator = match aggregator {
Some(aggregator) => {
Expand All @@ -78,6 +125,13 @@ pub(crate) fn reduce_epoch(
None => value,
};
*aggregator = Some(updated_aggregator);
debug!(
"{}, reducer:{:?}, key:{:?}, updated_aggregator:{:?}",
log_func!(),
reducer,
_key,
aggregator
);
});
}

Expand All @@ -87,13 +141,32 @@ pub(crate) fn reduce_epoch_local(
all_key_value_in_epoch: &Vec<(TdPyAny, TdPyAny)>,
) {
Python::with_gil(|py| {
let _current = thread::current();
let id = _current.id();
for (key, value) in all_key_value_in_epoch {
aggregators
.entry(key.clone_ref(py))
let aggregator = aggregators.entry(key.clone_ref(py));
debug!(
"thread:{:?}, {}, reducer:{:?}, key:{:?}, value:{:?}, aggregator:{:?}",
id,
log_func!(),
reducer,
key,
value,
aggregator
);
aggregator
.and_modify(|aggregator| {
*aggregator =
with_traceback!(py, reducer.call1(py, (aggregator.clone_ref(py), value)))
.into()
.into();
debug!(
"{}, reducer:{:?}, key:{:?}, value:{:?}, updated_aggregator:{:?}",
log_func!(),
reducer,
key,
value,
*aggregator
);
})
.or_insert(value.clone_ref(py));
}
Expand All @@ -106,11 +179,27 @@ pub(crate) fn stateful_map(
key: &TdPyAny,
value: TdPyAny,
) -> (bool, impl IntoIterator<Item = TdPyAny>) {
debug!(
"{}, mapper:{:?}, state:{:?}, key:{:?}, value:{:?}",
log_func!(),
state,
mapper,
key,
value
);
Python::with_gil(|py| {
let (updated_state, emit_value): (TdPyAny, TdPyAny) = with_traceback!(
py,
mapper.call1(py, (state.clone_ref(py), value))?.extract(py)
);
debug!(
"{}, mapper:{:?}, updated_state:{:?}, key:{:?}, emit_value:{:?}",
log_func!(),
updated_state,
mapper,
key,
emit_value
);

*state = updated_state;

Expand Down
11 changes: 11 additions & 0 deletions src/pyo3_extensions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,17 @@ impl std::fmt::Debug for TdPyAny {
}
}

impl fmt::Debug for TdPyCallable {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let s: PyResult<String> = Python::with_gil(|py| {
let self_ = self.0.as_ref(py);
let name: String = self_.getattr("__name__")?.extract()?;
Ok(name)
});
f.write_str(&s.map_err(|_| std::fmt::Error {})?)
}
}

/// Serialize Python objects flowing through Timely that cross
/// process bounds as pickled bytes.
impl serde::Serialize for TdPyAny {
Expand Down

0 comments on commit c2ebe99

Please sign in to comment.