Skip to content

Commit

Permalink
Add debug logging to operators
Browse files Browse the repository at this point in the history
  • Loading branch information
blakestier committed Feb 28, 2022
1 parent e8582cb commit 0e9ef55
Showing 1 changed file with 13 additions and 0 deletions.
13 changes: 13 additions & 0 deletions src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,37 @@ use pyo3::prelude::*;

use crate::pyo3_extensions::{TdPyAny, TdPyCallable, TdPyIterator};
use crate::with_traceback;
use crate::log_func;

use log::debug;

// These are all shims which map the Timely Rust API into equivalent
// calls to Python functions through PyO3.

pub(crate) fn map(mapper: &TdPyCallable, item: TdPyAny) -> TdPyAny {
debug!("{}, mapper:{:?}, item:{:?}", log_func!(), mapper, item);
Python::with_gil(|py| with_traceback!(py, mapper.call1(py, (item,))).into())
}

pub(crate) fn flat_map(mapper: &TdPyCallable, item: TdPyAny) -> TdPyIterator {
debug!("{}, mapper:{:?}, item:{:?}", log_func!(), mapper, item);
Python::with_gil(|py| with_traceback!(py, mapper.call1(py, (item,))?.extract(py)))
}

pub(crate) fn filter(predicate: &TdPyCallable, item: &TdPyAny) -> bool {
debug!("{}, predicate:{:?}, item:{:?}", log_func!(), predicate, item);
Python::with_gil(|py| with_traceback!(py, predicate.call1(py, (item,))?.extract(py)))
}

pub(crate) fn inspect(inspector: &TdPyCallable, item: &TdPyAny) {
debug!("{}, inspector:{:?}, item:{:?}", log_func!(), inspector, item);
Python::with_gil(|py| {
with_traceback!(py, inspector.call1(py, (item,)));
});
}

pub(crate) fn inspect_epoch(inspector: &TdPyCallable, epoch: &u64, item: &TdPyAny) {
debug!("{}, inspector:{:?}, item:{:?}", log_func!(), inspector, item);
Python::with_gil(|py| {
with_traceback!(py, inspector.call1(py, (*epoch, item)));
});
Expand All @@ -40,6 +48,7 @@ pub(crate) fn reduce(
value: TdPyAny,
) -> (bool, impl IntoIterator<Item = TdPyAny>) {
Python::with_gil(|py| {
debug!("{}, reducer:{:?}, key:{:?}, value:{:?}, complete:{:?}", log_func!(), reducer, key, value, is_complete);
let updated_aggregator = match aggregator {
Some(aggregator) => {
with_traceback!(py, reducer.call1(py, (aggregator.clone_ref(py), value))).into()
Expand Down Expand Up @@ -70,6 +79,7 @@ pub(crate) fn reduce_epoch(
_key: &TdPyAny,
value: TdPyAny,
) {
debug!("{}, reducer:{:?}, key:{:?}, value:{:?}, agg:{:?}", log_func!(), reducer, _key, value, aggregator);
Python::with_gil(|py| {
let updated_aggregator = match aggregator {
Some(aggregator) => {
Expand All @@ -88,6 +98,7 @@ pub(crate) fn reduce_epoch_local(
) {
Python::with_gil(|py| {
for (key, value) in all_key_value_in_epoch {
debug!("{}, reducer:{:?}, key:{:?}, value:{:?}", log_func!(), reducer, key, value);
aggregators
.entry(key.clone_ref(py))
.and_modify(|aggregator| {
Expand All @@ -106,11 +117,13 @@ pub(crate) fn stateful_map(
key: &TdPyAny,
value: TdPyAny,
) -> (bool, impl IntoIterator<Item = TdPyAny>) {
debug!("{}, mapper:{:?}, key:{:?}, value:{:?}", log_func!(), mapper, key, value);
Python::with_gil(|py| {
let (updated_state, emit_value): (TdPyAny, TdPyAny) = with_traceback!(
py,
mapper.call1(py, (state.clone_ref(py), value))?.extract(py)
);
debug!("emit:{:?}", emit_value);

*state = updated_state;

Expand Down

0 comments on commit 0e9ef55

Please sign in to comment.