Skip to content

Commit

Permalink
Merge pull request #78 from ten3roberts/fix-interleaved-eager
Browse files Browse the repository at this point in the history
Fix interleaved non-deferred spans
  • Loading branch information
oli-obk committed Mar 14, 2024
2 parents fcd9eed + 757f154 commit cdb2c5a
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 70 deletions.
102 changes: 102 additions & 0 deletions examples/concurrent_eager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

use futures::{pin_mut, FutureExt};
use tracing::Instrument;
use tracing_subscriber::{layer::SubscriberExt, registry::Registry};
use tracing_tree::HierarchicalLayer;

fn main() {
let layer = HierarchicalLayer::default()
.with_writer(std::io::stdout)
.with_indent_lines(true)
.with_indent_amount(4)
.with_thread_names(true)
.with_thread_ids(true)
.with_span_retrace(true)
.with_deferred_spans(false)
.with_verbose_entry(true)
.with_targets(true);

let subscriber = Registry::default().with(layer);
tracing::subscriber::set_global_default(subscriber).unwrap();
#[cfg(feature = "tracing-log")]
tracing_log::LogTracer::init().unwrap();

let fut_a = spawn_fut("a", a);
pin_mut!(fut_a);

let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
assert!(fut_a.poll_unpin(&mut cx).is_pending());

let fut_b = spawn_fut("b", b);
pin_mut!(fut_b);

assert!(fut_b.poll_unpin(&mut cx).is_pending());

assert!(fut_a.poll_unpin(&mut cx).is_pending());
assert!(fut_b.poll_unpin(&mut cx).is_pending());

assert!(fut_a.poll_unpin(&mut cx).is_ready());
assert!(fut_b.poll_unpin(&mut cx).is_ready());
}

fn spawn_fut<F: Fn() -> Fut, Fut: Future<Output = ()>>(
key: &'static str,
inner: F,
) -> impl Future<Output = ()> {
let span = tracing::info_span!("spawn_fut", key);

async move {
countdown(1).await;

inner().await;
}
.instrument(span)
}

fn a() -> impl Future<Output = ()> {
let span = tracing::info_span!("a");

async move {
countdown(1).await;
tracing::info!("a");
}
.instrument(span)
}

fn b() -> impl Future<Output = ()> {
let span = tracing::info_span!("b");

async move {
countdown(1).await;
tracing::info!("b");
}
.instrument(span)
}

fn countdown(count: u32) -> impl Future<Output = ()> {
CountdownFuture { count }
}

struct CountdownFuture {
count: u32,
}

impl Future for CountdownFuture {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.count == 0 {
Poll::Ready(())
} else {
self.count -= 1;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
16 changes: 16 additions & 0 deletions examples/concurrent_eager.stdout
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
1:main┐concurrent_eager::spawn_fut key="a"
1:main┐concurrent_eager::spawn_fut key="b"
1:main┐concurrent_eager::spawn_fut key="a"
1:main├───┐concurrent_eager::a
1:main┐concurrent_eager::spawn_fut key="b"
1:main├───┐concurrent_eager::b
1:main┐concurrent_eager::spawn_fut key="a"
1:main├───┐concurrent_eager::a
1:main│ ├─── Xms INFO concurrent_eager a
1:main├───┘
1:main┐concurrent_eager::spawn_fut key="b"
1:main├───┐concurrent_eager::b
1:main│ ├─── Xms INFO concurrent_eager b
1:main├───┘
1:main┘
1:main┘
2 changes: 1 addition & 1 deletion examples/deferred.stdout
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-> This prints before the span open message
1:main┐open(v): deferred::hierarchical-example version=0.1
1:main┐open: deferred::hierarchical-example version=0.1
1:main├─┐open: deferred::server host="localhost", port=8080
1:main│ ├─ Xms INFO deferred starting
1:main│ ├─ Xs INFO deferred listening
Expand Down
4 changes: 2 additions & 2 deletions src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl Buffers {
}

indent_block(
&mut self.current_buf,
&self.current_buf,
&mut self.indent_buf,
indent % config.wraparound,
config.indent_amount,
Expand Down Expand Up @@ -479,7 +479,7 @@ fn indent_block_with_lines(
}

fn indent_block(
block: &mut String,
block: &str,
buf: &mut String,
mut indent: usize,
indent_amount: usize,
Expand Down
133 changes: 66 additions & 67 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use format::{write_span_mode, Buffers, ColorLevel, Config, FmtEvent, SpanMode};

use nu_ansi_term::{Color, Style};
use std::{
fmt::{self, Write as _},
fmt::{self, Write},
io::{self, IsTerminal},
iter::Fuse,
mem,
Expand Down Expand Up @@ -267,69 +267,64 @@ where
Ok(())
}

/// If `span_retrace` ensures that `new_span` is properly printed before an event
/// Ensures that `new_span` and all its ancestors are properly printed before an event
fn write_retrace_span<'a, S>(
&self,
new_span: &SpanRef<'a, S>,
bufs: &mut Buffers,
ctx: &'a Context<S>,
pre_open: bool,
) where
S: Subscriber + for<'new_span> LookupSpan<'new_span>,
{
let should_write = if self.config.deferred_spans {
if let Some(data) = new_span.extensions_mut().get_mut::<Data>() {
!data.written
} else {
false
}
} else {
false
};

// Also handle deferred spans along with retrace since deferred spans may need to print
// multiple spans at once as a whole tree can be deferred
if self.config.span_retrace || should_write {
let old_span_id = bufs.current_span.replace((new_span.id()).clone());
let old_span_id = old_span_id.as_ref();

if Some(&new_span.id()) != old_span_id {
let old_span = old_span_id.as_ref().and_then(|v| ctx.span(v));
let old_path = old_span.as_ref().map(scope_path).into_iter().flatten();

let new_path = scope_path(new_span);

// Print the path from the common base of the two spans
let new_path = DifferenceIter::new(old_path, new_path, |v| v.id());

for (i, span) in new_path.enumerate() {
// Mark traversed spans as *written*
let was_written = if let Some(data) = span.extensions_mut().get_mut::<Data>() {
mem::replace(&mut data.written, true)
} else {
// `on_new_span` was not called, before
// Consider if this should panic instead, which is *technically* correct but is
// bad behavior for a logging layer in production.
false
};

// Print the previous span before entering a new deferred or retraced span
if i == 0 && self.config.verbose_entry {
if let Some(parent) = &span.parent() {
self.write_span_info(parent, bufs, SpanMode::PreOpen);
}
//
// If a another event occurs right after a previous event in the same span, this will
// simply print nothing since the path to the common lowest ancestor is empty
// if self.config.span_retrace || self.config.deferred_spans {
let old_span_id = bufs.current_span.replace((new_span.id()).clone());
let old_span_id = old_span_id.as_ref();
let new_span_id = new_span.id();

if Some(&new_span_id) != old_span_id {
let old_span = old_span_id.as_ref().and_then(|v| ctx.span(v));
let old_path = old_span.as_ref().map(scope_path).into_iter().flatten();

let new_path = scope_path(new_span);

// Print the path from the common base of the two spans
let new_path = DifferenceIter::new(old_path, new_path, |v| v.id());

for (i, span) in new_path.enumerate() {
// Mark traversed spans as *written*
let was_written = if let Some(data) = span.extensions_mut().get_mut::<Data>() {
mem::replace(&mut data.written, true)
} else {
// `on_new_span` was not called, before
// Consider if this should panic instead, which is *technically* correct but is
// bad behavior for a logging layer in production.
false
};

// Print the parent of the first span
let mut verbose = false;
if i == 0 && pre_open {
if let Some(span) = span.parent() {
verbose = true;
self.write_span_info(&span, bufs, SpanMode::PreOpen);
}
let verbose = self.config.verbose_entry && i == 0;

self.write_span_info(
&span,
bufs,
if was_written {
SpanMode::Retrace { verbose }
} else {
SpanMode::Open { verbose }
},
)
}

self.write_span_info(
&span,
bufs,
if was_written {
SpanMode::Retrace { verbose }
} else {
SpanMode::Open { verbose }
},
)
}
}
}
Expand Down Expand Up @@ -491,22 +486,24 @@ where

let bufs = &mut *self.bufs.lock().unwrap();

// Store the most recently entered span
bufs.current_span = Some(span.id());

if self.config.verbose_entry {
if let Some(span) = span.parent() {
self.write_span_info(&span, bufs, SpanMode::PreOpen);
if self.config.span_retrace {
self.write_retrace_span(&span, bufs, &ctx, self.config.verbose_entry);
} else {
if self.config.verbose_entry {
if let Some(span) = span.parent() {
self.write_span_info(&span, bufs, SpanMode::PreOpen);
}
}
// Store the most recently entered span
bufs.current_span = Some(span.id());
self.write_span_info(
&span,
bufs,
SpanMode::Open {
verbose: self.config.verbose_entry,
},
);
}

self.write_span_info(
&span,
bufs,
SpanMode::Open {
verbose: self.config.verbose_entry,
},
);
}

fn on_event(&self, event: &Event<'_>, ctx: Context<S>) {
Expand All @@ -518,7 +515,9 @@ where
let bufs = &mut *guard;

if let Some(new_span) = &span {
self.write_retrace_span(new_span, bufs, &ctx);
if self.config.span_retrace || self.config.deferred_spans {
self.write_retrace_span(new_span, bufs, &ctx, self.config.verbose_entry);
}
}

let mut event_buf = &mut bufs.current_buf;
Expand Down

0 comments on commit cdb2c5a

Please sign in to comment.