Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debug operators #29

Merged
merged 9 commits into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ python-source = "pysrc"

[dependencies]
axum = { version = "0.4.3" }
log = { version = "0.4" }
pyo3 = { version = "0.15.1" }
pyo3-log = { version = "0.5.0" }
rand = { version = "0.8.4" }
scopeguard = { version = "1.1.0" }
serde = { version = "1.0.134" }
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ fn sleep_release_gil(py: Python, secs: u64) {
#[pymodule]
#[pyo3(name = "bytewax")]
fn mod_tiny_dancer(_py: Python, m: &PyModule) -> PyResult<()> {
pyo3_log::init();

execution::register(_py, m)?;
dataflow::register(_py, m)?;

Expand Down
12 changes: 12 additions & 0 deletions src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,15 @@ macro_rules! with_traceback {
}
};
}

#[macro_export]
macro_rules! log_func {
() => {{
fn f() {}
fn type_name_of<T>(_: T) -> &'static str {
std::any::type_name::<T>()
}
let name = type_name_of(f);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the goal of this macro is to get the name of the containing function? How does this work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is the goal although the rust function isn't as precise as I'd like.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in, does this print out the name of this function f() instead of the calling function like map()? Or does that somehow access the calling function? Or is this a kind of hack where f() has a fully qualified name like bytewax::map::f() and so we get access to the enclosing function? I'd like to understand how this works.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Candidly, the internet helped me with this, but I believe you can wrap the function in your own code and thereby access the type of the function which resolves to the fully qualified name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that makes sense. Hence the slice to -3 to remove f() as the last step.

&name[..name.len() - 3]
}};
}
95 changes: 92 additions & 3 deletions src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,54 @@ use std::collections::HashMap;

use pyo3::prelude::*;

use crate::log_func;
use crate::pyo3_extensions::{TdPyAny, TdPyCallable, TdPyIterator};
use crate::with_traceback;
use log::debug;
use std::thread;

// These are all shims which map the Timely Rust API into equivalent
// calls to Python functions through PyO3.

pub(crate) fn map(mapper: &TdPyCallable, item: TdPyAny) -> TdPyAny {
debug!("{}, mapper:{:?}, item:{:?}", log_func!(), mapper, item);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to log out epoch as well in all of these so you can flip a single logging switch and see a complete trace of execution. But that would be a bigger change because it would require changing the Timely operators we use and re-jiggering some of the shim functions, so I don't think we need to do it now. Just a note for posterity!

Python::with_gil(|py| with_traceback!(py, mapper.call1(py, (item,))).into())
}

pub(crate) fn flat_map(mapper: &TdPyCallable, item: TdPyAny) -> TdPyIterator {
debug!("{}, mapper:{:?}, item:{:?}", log_func!(), mapper, item);
Python::with_gil(|py| with_traceback!(py, mapper.call1(py, (item,))?.extract(py)))
}

pub(crate) fn filter(predicate: &TdPyCallable, item: &TdPyAny) -> bool {
debug!(
"{}, predicate:{:?}, item:{:?}",
log_func!(),
predicate,
item
);
Python::with_gil(|py| with_traceback!(py, predicate.call1(py, (item,))?.extract(py)))
}

pub(crate) fn inspect(inspector: &TdPyCallable, item: &TdPyAny) {
debug!(
"{}, inspector:{:?}, item:{:?}",
log_func!(),
inspector,
item
);
Python::with_gil(|py| {
with_traceback!(py, inspector.call1(py, (item,)));
});
}

pub(crate) fn inspect_epoch(inspector: &TdPyCallable, epoch: &u64, item: &TdPyAny) {
debug!(
"{}, inspector:{:?}, item:{:?}",
log_func!(),
inspector,
item
);
Python::with_gil(|py| {
with_traceback!(py, inspector.call1(py, (*epoch, item)));
});
Expand All @@ -40,6 +63,15 @@ pub(crate) fn reduce(
value: TdPyAny,
) -> (bool, impl IntoIterator<Item = TdPyAny>) {
Python::with_gil(|py| {
debug!(
"{}, reducer:{:?}, key:{:?}, value:{:?}, is_complete:{:?}(agg={:?})",
blakestier marked this conversation as resolved.
Show resolved Hide resolved
log_func!(),
reducer,
key,
value,
is_complete,
aggregator
);
let updated_aggregator = match aggregator {
Some(aggregator) => {
with_traceback!(py, reducer.call1(py, (aggregator.clone_ref(py), value))).into()
Expand All @@ -54,6 +86,15 @@ pub(crate) fn reduce(
);

*aggregator = Some(updated_aggregator.clone_ref(py));
debug!(
"{}, reducer:{:?}, key:{:?}, is_complete:{:?}(updated_agg={:?} => {}",
blakestier marked this conversation as resolved.
Show resolved Hide resolved
log_func!(),
reducer,
key,
is_complete,
updated_aggregator,
should_emit_and_discard_aggregator
);

if should_emit_and_discard_aggregator {
let emit = (key.clone_ref(py), updated_aggregator).to_object(py).into();
Expand All @@ -70,6 +111,14 @@ pub(crate) fn reduce_epoch(
_key: &TdPyAny,
value: TdPyAny,
) {
debug!(
"{}, reducer:{:?}, key:{:?}, value:{:?}, aggregator:{:?}",
log_func!(),
reducer,
_key,
value,
aggregator
);
Python::with_gil(|py| {
let updated_aggregator = match aggregator {
Some(aggregator) => {
Expand All @@ -78,6 +127,13 @@ pub(crate) fn reduce_epoch(
None => value,
};
*aggregator = Some(updated_aggregator);
debug!(
"{}, reducer:{:?}, key:{:?}, updated_aggregator:{:?}",
log_func!(),
reducer,
_key,
aggregator
);
});
}

Expand All @@ -87,13 +143,32 @@ pub(crate) fn reduce_epoch_local(
all_key_value_in_epoch: &Vec<(TdPyAny, TdPyAny)>,
) {
Python::with_gil(|py| {
let _current = thread::current();
let id = _current.id();
for (key, value) in all_key_value_in_epoch {
aggregators
.entry(key.clone_ref(py))
let aggregator = aggregators.entry(key.clone_ref(py));
debug!(
"thread:{:?}, {}, reducer:{:?}, key:{:?}, value:{:?}, aggregator:{:?}",
id,
log_func!(),
reducer,
key,
value,
aggregator
);
aggregator
.and_modify(|aggregator| {
*aggregator =
with_traceback!(py, reducer.call1(py, (aggregator.clone_ref(py), value)))
.into()
.into();
debug!(
"{}, reducer:{:?}, key:{:?}, value:{:?}, updated_aggregator:{:?}",
log_func!(),
reducer,
key,
value,
*aggregator
);
})
.or_insert(value.clone_ref(py));
}
Expand All @@ -106,11 +181,25 @@ pub(crate) fn stateful_map(
key: &TdPyAny,
value: TdPyAny,
) -> (bool, impl IntoIterator<Item = TdPyAny>) {
debug!(
"{}, mapper:{:?}, key:{:?}, value:{:?}",
blakestier marked this conversation as resolved.
Show resolved Hide resolved
log_func!(),
mapper,
key,
value
);
Python::with_gil(|py| {
let (updated_state, emit_value): (TdPyAny, TdPyAny) = with_traceback!(
py,
mapper.call1(py, (state.clone_ref(py), value))?.extract(py)
);
debug!(
"{}, mapper:{:?}, key:{:?}, emit:{:?}",
log_func!(),
mapper,
key,
emit_value
);

*state = updated_state;

Expand Down
11 changes: 11 additions & 0 deletions src/pyo3_extensions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,17 @@ impl std::fmt::Debug for TdPyAny {
}
}

impl fmt::Debug for TdPyCallable {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let s: PyResult<String> = Python::with_gil(|py| {
let self_ = self.0.as_ref(py);
let name: String = self_.getattr("__name__")?.extract()?;
Ok(name)
});
f.write_str(&s.map_err(|_| std::fmt::Error {})?)
}
}

/// Serialize Python objects flowing through Timely that cross
/// process bounds as pickled bytes.
impl serde::Serialize for TdPyAny {
Expand Down