-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Debug operators #29
Debug operators #29
Changes from 3 commits
d411716
e8582cb
18ee9a3
7a11045
4dc52aa
ca2c3c1
0047229
3c80a62
dea1cc1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,29 +4,37 @@ use pyo3::prelude::*; | |
|
||
use crate::pyo3_extensions::{TdPyAny, TdPyCallable, TdPyIterator}; | ||
use crate::with_traceback; | ||
use crate::log_func; | ||
|
||
use log::debug; | ||
|
||
// These are all shims which map the Timely Rust API into equivalent | ||
// calls to Python functions through PyO3. | ||
|
||
pub(crate) fn map(mapper: &TdPyCallable, item: TdPyAny) -> TdPyAny { | ||
debug!("{}, mapper:{:?}, item:{:?}", log_func!(), mapper, item); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice to log out epoch as well in all of these so you can flip a single logging switch and see a complete trace of execution. But that would be a bigger change because it would require changing the Timely operators we use and re-jiggering some of the shim functions, so I don't think we need to do it now. Just a note for posterity! |
||
Python::with_gil(|py| with_traceback!(py, mapper.call1(py, (item,))).into()) | ||
} | ||
|
||
pub(crate) fn flat_map(mapper: &TdPyCallable, item: TdPyAny) -> TdPyIterator { | ||
debug!("{}, mapper:{:?}, item:{:?}", log_func!(), mapper, item); | ||
Python::with_gil(|py| with_traceback!(py, mapper.call1(py, (item,))?.extract(py))) | ||
} | ||
|
||
pub(crate) fn filter(predicate: &TdPyCallable, item: &TdPyAny) -> bool { | ||
debug!("{}, predicate:{:?}, item:{:?}", log_func!(), predicate, item); | ||
Python::with_gil(|py| with_traceback!(py, predicate.call1(py, (item,))?.extract(py))) | ||
} | ||
|
||
pub(crate) fn inspect(inspector: &TdPyCallable, item: &TdPyAny) { | ||
debug!("{}, inspector:{:?}, item:{:?}", log_func!(), inspector, item); | ||
Python::with_gil(|py| { | ||
with_traceback!(py, inspector.call1(py, (item,))); | ||
}); | ||
} | ||
|
||
pub(crate) fn inspect_epoch(inspector: &TdPyCallable, epoch: &u64, item: &TdPyAny) { | ||
debug!("{}, inspector:{:?}, item:{:?}", log_func!(), inspector, item); | ||
Python::with_gil(|py| { | ||
with_traceback!(py, inspector.call1(py, (*epoch, item))); | ||
}); | ||
|
@@ -40,6 +48,7 @@ pub(crate) fn reduce( | |
value: TdPyAny, | ||
) -> (bool, impl IntoIterator<Item = TdPyAny>) { | ||
Python::with_gil(|py| { | ||
debug!("{}, reducer:{:?}, key:{:?}, value:{:?}", log_func!(), reducer, key, value); | ||
blakestier marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let updated_aggregator = match aggregator { | ||
Some(aggregator) => { | ||
with_traceback!(py, reducer.call1(py, (aggregator.clone_ref(py), value))).into() | ||
|
@@ -57,6 +66,7 @@ pub(crate) fn reduce( | |
|
||
if should_emit_and_discard_aggregator { | ||
let emit = (key.clone_ref(py), updated_aggregator).to_object(py).into(); | ||
debug!("{}, reducer:{:?}, key:{:?}, emit:{:?}, complete:{:?}", log_func!(), reducer, key, emit, should_emit_and_discard_aggregator); | ||
blakestier marked this conversation as resolved.
Show resolved
Hide resolved
|
||
(true, Some(emit)) | ||
} else { | ||
(false, None) | ||
|
@@ -70,6 +80,7 @@ pub(crate) fn reduce_epoch( | |
_key: &TdPyAny, | ||
value: TdPyAny, | ||
) { | ||
debug!("{}, reducer:{:?}, key:{:?}, value:{:?}, agg:{:?}", log_func!(), reducer, _key, value, aggregator); | ||
blakestier marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Python::with_gil(|py| { | ||
let updated_aggregator = match aggregator { | ||
Some(aggregator) => { | ||
|
@@ -88,6 +99,7 @@ pub(crate) fn reduce_epoch_local( | |
) { | ||
Python::with_gil(|py| { | ||
for (key, value) in all_key_value_in_epoch { | ||
debug!("{}, reducer:{:?}, key:{:?}, value:{:?}", log_func!(), reducer, key, value); | ||
blakestier marked this conversation as resolved.
Show resolved
Hide resolved
|
||
aggregators | ||
.entry(key.clone_ref(py)) | ||
.and_modify(|aggregator| { | ||
|
@@ -106,11 +118,13 @@ pub(crate) fn stateful_map( | |
key: &TdPyAny, | ||
value: TdPyAny, | ||
) -> (bool, impl IntoIterator<Item = TdPyAny>) { | ||
debug!("{}, mapper:{:?}, key:{:?}, value:{:?}", log_func!(), mapper, key, value); | ||
Python::with_gil(|py| { | ||
let (updated_state, emit_value): (TdPyAny, TdPyAny) = with_traceback!( | ||
py, | ||
mapper.call1(py, (state.clone_ref(py), value))?.extract(py) | ||
); | ||
debug!("{}, mapper:{:?}, key:{:?}, emit:{:?}", log_func!(), mapper, key, emit_value); | ||
|
||
*state = updated_state; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the goal of this macro is to get the name of the containing function? How does this work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is the goal although the rust function isn't as precise as I'd like.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As in, does this print out the name of this function
f()
instead of the calling function likemap()
? Or does that somehow access the calling function? Or is this a kind of hack wheref()
has a fully qualified name likebytewax::map::f()
and so we get access to the enclosing function? I'd like to understand how this works.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Candidly, the internet helped me with this, but I believe you can wrap the function in your own code and thereby access the type of the function which resolves to the fully qualified name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that makes sense. Hence the slice to
-3
to removef()
as the last step.