Skip to content

Commit

Permalink
feat(console): add retain-for cmd line arg (console-rs#119)
Browse files Browse the repository at this point in the history
This PR adds a `retain-for` duration argument to the console. Additionally, 
I have modified the proto to not include the total_time for tasks,resources and
async ops and instead contain `created_at` and `dropped_at` fields. A separate 
`console-util` crate has been added that contains the logic for parsing a duration
string. 

Close console-rs#108

Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
Co-authored-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
zaharidichev and hawkw committed Sep 10, 2021
1 parent 81cd611 commit 7231a33
Show file tree
Hide file tree
Showing 12 changed files with 122 additions and 262 deletions.
1 change: 1 addition & 0 deletions console-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ serde_json = "1"
# The parking_lot dependency is renamed, because we want our `parking_lot`
# feature to also enable `tracing-subscriber`'s parking_lot feature flag.
parking_lot_crate = { package = "parking_lot", version = "0.11", optional = true }
humantime = "2.1.0"

[dev-dependencies]

Expand Down
14 changes: 7 additions & 7 deletions console-subscriber/src/aggregator/id_data.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{shrink::ShrinkMap, Closable, Id, Ids, ToProto};
use super::{shrink::ShrinkMap, DroppedAt, Id, Ids, ToProto};
use std::collections::{HashMap, HashSet};
use std::ops::{Deref, DerefMut};
use std::time::{Duration, SystemTime};
Expand Down Expand Up @@ -72,7 +72,7 @@ impl<T> IdData<T> {
}
}

pub(crate) fn drop_closed<R: Closable>(
pub(crate) fn drop_closed<R: DroppedAt>(
&mut self,
stats: &mut IdData<R>,
now: SystemTime,
Expand All @@ -92,16 +92,16 @@ impl<T> IdData<T> {

let mut dropped_ids = HashSet::new();
stats.data.retain_and_shrink(|id, (stats, dirty)| {
if let Some(closed) = stats.closed_at() {
let closed_for = now.duration_since(closed).unwrap_or_default();
if let Some(dropped_at) = stats.dropped_at() {
let dropped_for = now.duration_since(dropped_at).unwrap_or_default();
let should_drop =
// if there are any clients watching, retain all dirty tasks regardless of age
(*dirty && has_watchers)
|| closed_for > retention;
|| dropped_for > retention;
tracing::trace!(
stats.id = ?id,
stats.closed_at = ?closed,
stats.closed_for = ?closed_for,
stats.dropped_at = ?dropped_at,
stats.dropped_for = ?dropped_for,
stats.dirty = *dirty,
should_drop,
);
Expand Down
57 changes: 25 additions & 32 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ pub(crate) struct Flush {
pub(crate) triggered: AtomicBool,
}

// An entity that at some point in time can be closed.
// This generally refers to spans that have been closed
// indicating that a task, async op or a resource is not
// in use anymore
pub(crate) trait Closable {
fn closed_at(&self) -> Option<SystemTime>;
// An entity (e.g Task, Resource) that at some point in
// time can be dropped. This generally refers to spans that
// have been closed indicating that a task, async op or a
// resource is not in use anymore
pub(crate) trait DroppedAt {
fn dropped_at(&self) -> Option<SystemTime>;
}

pub(crate) trait ToProto {
Expand Down Expand Up @@ -164,7 +164,7 @@ struct FieldKey {
#[derive(Default)]
struct ResourceStats {
created_at: Option<SystemTime>,
closed_at: Option<SystemTime>,
dropped_at: Option<SystemTime>,
attributes: HashMap<FieldKey, Attribute>,
}

Expand All @@ -178,7 +178,7 @@ struct Task {
struct TaskStats {
// task stats
created_at: Option<SystemTime>,
closed_at: Option<SystemTime>,
dropped_at: Option<SystemTime>,

// waker stats
wakes: u64,
Expand All @@ -200,27 +200,27 @@ struct AsyncOp {
#[derive(Default)]
struct AsyncOpStats {
created_at: Option<SystemTime>,
closed_at: Option<SystemTime>,
dropped_at: Option<SystemTime>,
resource_id: Option<Id>,
task_id: Option<Id>,
poll_stats: PollStats,
}

impl Closable for ResourceStats {
fn closed_at(&self) -> Option<SystemTime> {
self.closed_at
impl DroppedAt for ResourceStats {
fn dropped_at(&self) -> Option<SystemTime> {
self.dropped_at
}
}

impl Closable for TaskStats {
fn closed_at(&self) -> Option<SystemTime> {
self.closed_at
impl DroppedAt for TaskStats {
fn dropped_at(&self) -> Option<SystemTime> {
self.dropped_at
}
}

impl Closable for AsyncOpStats {
fn closed_at(&self) -> Option<SystemTime> {
self.closed_at
impl DroppedAt for AsyncOpStats {
fn dropped_at(&self) -> Option<SystemTime> {
self.dropped_at
}
}

Expand Down Expand Up @@ -270,7 +270,7 @@ impl Default for TaskStats {
fn default() -> Self {
TaskStats {
created_at: None,
closed_at: None,
dropped_at: None,
wakes: 0,
waker_clones: 0,
waker_drops: 0,
Expand Down Expand Up @@ -643,15 +643,15 @@ impl Aggregator {
Event::Close { id, at } => {
let id = self.ids.id_for(id);
if let Some(mut task_stats) = self.task_stats.update(&id) {
task_stats.closed_at = Some(at);
task_stats.dropped_at = Some(at);
}

if let Some(mut resource_stats) = self.resource_stats.update(&id) {
resource_stats.closed_at = Some(at);
resource_stats.dropped_at = Some(at);
}

if let Some(mut async_op_stats) = self.async_op_stats.update(&id) {
async_op_stats.closed_at = Some(at);
async_op_stats.dropped_at = Some(at);
}
}

Expand Down Expand Up @@ -871,7 +871,7 @@ impl ToProto for TaskStats {
proto::tasks::Stats {
poll_stats: Some(self.poll_stats.to_proto()),
created_at: self.created_at.map(Into::into),
total_time: total_time(self.created_at, self.closed_at).map(Into::into),
dropped_at: self.dropped_at.map(Into::into),
wakes: self.wakes,
waker_clones: self.waker_clones,
self_wakes: self.self_wakes,
Expand Down Expand Up @@ -901,7 +901,7 @@ impl ToProto for ResourceStats {
let attributes = self.attributes.values().cloned().collect();
proto::resources::Stats {
created_at: self.created_at.map(Into::into),
total_time: total_time(self.created_at, self.closed_at).map(Into::into),
dropped_at: self.dropped_at.map(Into::into),
attributes,
}
}
Expand All @@ -926,8 +926,7 @@ impl ToProto for AsyncOpStats {
proto::async_ops::Stats {
poll_stats: Some(self.poll_stats.to_proto()),
created_at: self.created_at.map(Into::into),
total_time: total_time(self.created_at, self.closed_at).map(Into::into),

dropped_at: self.dropped_at.map(Into::into),
resource_id: self.resource_id.map(Into::into),
task_id: self.task_id.map(Into::into),
}
Expand Down Expand Up @@ -986,12 +985,6 @@ fn serialize_histogram(histogram: &Histogram<u64>) -> Result<Vec<u8>, V2Serializ
Ok(buf)
}

fn total_time(created_at: Option<SystemTime>, closed_at: Option<SystemTime>) -> Option<Duration> {
let end = closed_at?;
let start = created_at?;
end.duration_since(start).ok()
}

fn update_attribute(attribute: &mut Attribute, update: AttributeUpdate) {
use proto::field::Value::*;
let attribute_val = attribute.field.as_mut().and_then(|a| a.value.as_mut());
Expand Down
180 changes: 2 additions & 178 deletions console-subscriber/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,187 +153,11 @@ impl Builder {

fn duration_from_env(var_name: &str) -> Option<Duration> {
let var = std::env::var(var_name).ok()?;
match parse_duration(&var) {
Ok(dur) => Some(dur),
match var.parse::<humantime::Duration>() {
Ok(dur) => Some(dur.into()),
Err(e) => panic!(
"failed to parse a duration from `{}={:?}`: {}",
var_name, var, e
),
}
}

fn parse_duration(s: &str) -> Result<Duration, Box<dyn std::error::Error>> {
let s = s.trim();
if let Some(s) = s
.strip_suffix('h')
.or_else(|| s.strip_suffix("hour"))
.or_else(|| s.strip_suffix("hours"))
{
let s = s.trim();
return Ok(s
.parse::<u64>()
.map(|hours| Duration::from_secs(hours * 60 * 60))
.or_else(|_| {
s.parse::<f64>()
.map(|hours| Duration::from_secs_f64(hours * 60.0 * 60.0))
})?);
}

if let Some(s) = s
.strip_suffix('m')
.or_else(|| s.strip_suffix("min"))
.or_else(|| s.strip_suffix("mins"))
.or_else(|| s.strip_suffix("minute"))
.or_else(|| s.strip_suffix("minutes"))
{
let s = s.trim();
return Ok(s
.parse::<u64>()
.map(|mins| Duration::from_secs(mins * 60))
.or_else(|_| {
s.parse::<f64>()
.map(|mins| Duration::from_secs_f64(mins * 60.0))
})?);
}

if let Some(s) = s.strip_suffix("ms") {
return Ok(Duration::from_millis(s.trim().parse()?));
}

if let Some(s) = s.strip_suffix("us") {
return Ok(Duration::from_micros(s.trim().parse()?));
}

// Order matters here -- we have to try `ns` for nanoseconds after we try
// minutes, because `mins` ends in `ns`.
if let Some(s) = s.strip_suffix("ns") {
return Ok(Duration::from_nanos(s.trim().parse()?));
}

if let Some(s) = s
.strip_suffix("sec")
.or_else(|| s.strip_suffix("secs"))
.or_else(|| s.strip_suffix("seconds"))
// Order matters here -- we have to try `s` for seconds _last_, because
// every other plural and subsecond unit also ends in `s`...
.or_else(|| s.strip_suffix('s'))
{
let s = s.trim();
return Ok(s
.parse::<u64>()
.map(Duration::from_secs)
.or_else(|_| s.parse::<f64>().map(Duration::from_secs_f64))?);
}

Err("expected an integer followed by one of {`ns`, `us`, `ms`, `s`, `sec`, `m`, `min`, `h`, `hours`}".into())
}

#[cfg(test)]
mod tests {
use super::*;

fn test_parse_durations(expected: Duration, inputs: &[&str]) {
for input in inputs {
println!("trying: parse_duration({:?}) -> {:?}", input, expected);
match parse_duration(input) {
Err(e) => panic!(
"parse_duration({:?}) -> {} (expected {:?})",
input, e, expected
),
Ok(dur) => assert_eq!(
dur, expected,
"parse_duration({:?}) -> {:?} (expected {:?})",
input, dur, expected
),
}
}
}

#[test]
fn parse_hours() {
test_parse_durations(
Duration::from_secs(3 * 60 * 60),
&["3h", "3 h", " 3 h", "3 hours", "3hours"],
)
}

#[test]
fn parse_mins() {
test_parse_durations(
Duration::from_secs(10 * 60),
&[
"10m",
"10 m",
"10 m",
"10 minutes",
"10minutes",
" 10 minutes",
"10 min",
" 10 min",
"10min",
],
)
}

#[test]
fn parse_secs() {
test_parse_durations(
Duration::from_secs(10),
&[
"10s",
"10 s",
"10 s",
"10 seconds",
"10seconds",
" 10 seconds",
"10 sec",
" 10 sec",
"10sec",
],
)
}

#[test]
fn parse_fractional_hours() {
test_parse_durations(
Duration::from_millis(1500 * 60 * 60),
&["1.5h", "1.5 h", " 1.5 h", "1.5 hours", "1.5hours"],
)
}

#[test]
fn parse_fractional_mins() {
test_parse_durations(
Duration::from_millis(1500 * 60),
&[
"1.5m",
"1.5 m",
"1.5 m",
"1.5 minutes",
"1.5 minutes",
" 1.5 minutes",
"1.5 min",
" 1.5 min",
"1.5min",
],
)
}

#[test]
fn parse_fractional_secs() {
test_parse_durations(
Duration::from_millis(1500),
&[
"1.5s",
"1.5 s",
"1.5 s",
"1.5 seconds",
"1.5 seconds",
" 1.5 seconds",
"1.5 sec",
" 1.5 sec",
"1.5sec",
],
)
}
}
3 changes: 2 additions & 1 deletion console/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ color-eyre = { version = "0.5", features = ["issue-url"] }
hdrhistogram = { version = "7.3.0", default-features = false, features = ["serialization"] }
h2 = "0.3"
regex = "1.5"
once_cell = "1.8"
once_cell = "1.8"
humantime = "2.1.0"

0 comments on commit 7231a33

Please sign in to comment.